<h3>Amendments Log</h3>
<table style="width:100%">
  <thead>
    <tr>
      <th style="text-align:left">Version</th>
      <th style="text-align:left">Amended By</th>
      <th style="text-align:left">Date</th>
      <th style="text-align:left">Description</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>1.1</td>
      <td>Gary Manley</td>
      <td>2025-12-02</td>
      <td>Added scheduling logic (Daily/Weekly/Monthly) filters.</td>
    </tr>
    <tr>
      <td>1.0</td>
      <td>Gary Manley</td>
      <td>2025-11-30</td>
      <td>Initial Version</td>
    </tr>
  </tbody>
</table>

# Orchestrator

**Objective:** Master controller for the ETL pipeline.

**Function:** 
1. Connects to MotherDuck `pipeline_control` table.
2. Reads the active steps.
3. **Filters steps based on Frequency** (Daily, Weekly, Monthly) vs Today's date.
4. Uses `papermill` to execute the remaining notebooks in sequence.

In [None]:
# 1. SETUP & IMPORTS
import duckdb
import pandas as pd
import papermill as pm
import os
import sys
import time
from datetime import datetime
from dotenv import load_dotenv

# Using forward slashes for path safety on Windows/Linux compatibility
vLocalEnvPath = r"C:/Users/garym/Documents/GitHub/MovieReleases/.env"

if os.path.exists(vLocalEnvPath):
    # Local Mode: Load from specific file
    load_dotenv(dotenv_path=vLocalEnvPath)
    print(f"Loaded local environment from {vLocalEnvPath}")
else:
    # CI/CD Mode (GitHub Actions)
    print("Local .env not found. Assuming CI/CD environment (Secrets already loaded).")

vMdToken = os.getenv("MOTHERDUCK_TOKEN")
if not vMdToken: raise RuntimeError("MOTHERDUCK_TOKEN missing")

print(f"--- STARTING PIPELINE AT {datetime.now()} ---")

In [None]:
# PARAMETERS / CONSTANTS
cNotebookName = "orchestrate_pipeline.ipynb"

## 3. Fetch & Filter Schedule
We fetch all active jobs, then filter them based on the current day/date matching the `job_frequency` pattern.

In [None]:
try:
    print("Connecting to MotherDuck to fetch schedule...")
    vCon = duckdb.connect(f"md:?motherduck_token={vMdToken}")
    
    # Read the control table
    # We now fetch job_frequency and schedule_number
    try:
        vSql = """
            SELECT 
                step_id, 
                notebook_path, 
                description, 
                COALESCE(job_frequency, 'DAILY') as job_frequency, 
                COALESCE(schedule_number, 0) as schedule_number
            FROM MovieReleases.main.pipeline_control 
            WHERE is_active = TRUE 
            ORDER BY step_id ASC
        """
        dfAllSteps = vCon.sql(vSql).df()
    except Exception as e:
        print("Pipeline Control table not found or schema mismatch. Please run setup SQL.")
        print(f"Error: {e}")
        dfAllSteps = pd.DataFrame()

    vCon.close()
    
    if dfAllSteps.empty:
        print("No active steps found. Exiting.")
    else:
        print(f"Found {len(dfAllSteps)} potentially active steps.")
        
        # --- SCHEDULING LOGIC ---
        vToday = datetime.now()
        vIsoWeekDay = vToday.isoweekday() # 1=Mon, 7=Sun
        vDayOfMonth = vToday.day
        
        print(f"Date Check: Weekday={vIsoWeekDay}, DayOfMonth={vDayOfMonth}")
        
        def f_should_run(row):
            vFreq = str(row['job_frequency']).upper()
            vNum = int(row['schedule_number'])
            
            if vFreq == 'DAILY':
                return True
            elif vFreq == 'WEEKLY':
                # Runs only if today matches the schedule number (1-7)
                return vNum == vIsoWeekDay
            elif vFreq == 'MONTHLY':
                # Runs only if today matches the day of month (1-31)
                return vNum == vDayOfMonth
            
            return False

        # Filter the DataFrame
        dfSchedule = dfAllSteps[dfAllSteps.apply(f_should_run, axis=1)].copy()
        print(f"Steps scheduled for TODAY: {len(dfSchedule)}")

except Exception as e:
    raise RuntimeError(f"Failed to fetch pipeline schedule: {e}")

## 4. Execute Pipeline Loop

In [None]:
vHasErrors = False

if not dfSchedule.empty:
    for vIndex, vRow in dfSchedule.iterrows():
        vStepId = vRow['step_id']
        vNotebook = vRow['notebook_path']
        vDesc = vRow['description']
        
        print(f"\n>>> EXECUTION STEP {vStepId}: {vNotebook}")
        print(f"    Description: {vDesc}")
        
        # Define output path for logs
        vLogDir = "logs"
        os.makedirs(vLogDir, exist_ok=True)
        vOutputNotebook = os.path.join(vLogDir, f"out_{vNotebook}")
        
        try:
            vStart = time.time()
            
            # PAPERMILL: Runs the notebook
            pm.execute_notebook(
                input_path=vNotebook,
                output_path=vOutputNotebook,
                parameters=dict(vResetTable=False),
                kernel_name='python3',
                progress_bar=False, 
                stdout_file=sys.stdout
            )
            
            vEnd = time.time()
            print(f"    [SUCCESS] Step {vStepId} completed in {round(vEnd - vStart, 2)}s")
            
        except Exception as e:
            print(f"    [FAILURE] Step {vStepId} failed: {e}")
            print(f"    Check output notebook: {vOutputNotebook}")
            vHasErrors = True
            break

if vHasErrors:
    raise RuntimeError("Pipeline Failed")
else:
    print("\n--- PIPELINE SUCCESS ---")