# Homework Starter — Stage 15: Orchestration & System Design
Sparsh Patel 08/28/2025

## 1) Project Task Decomposition
List 4–8 tasks. Add more rows as needed.

In [None]:


import pandas as pd


tasks = [
    {
        'task': 'ingest',
        'inputs': '/data/kaggle_api.ext',
        'outputs': 'stock_prices_raw.json',
        'idempotent': True
    },
    {
        'task': 'clean',
        'inputs': 'stock_prices_raw.json',
        'outputs': 'stock_prices_clean.json',
        'idempotent': True
    },
    {
        'task': 'calculate_metrics',
        'inputs': 'stock_prices_clean.json',
        'outputs': 'stock_metrics.json',
        'idempotent': True
    },
    {
        'task': 'simulate_portfolios',
        'inputs': 'stock_metrics.json',
        'outputs': 'portfolio_results.json',
        'idempotent': False
    },
    {
        'task': 'rank_stocks',
        'inputs': 'portfolio_results.json',
        'outputs': 'top_stocks.json',
        'idempotent': True
    },
    {
        'task': 'report',
        'inputs': 'top_stocks.json',
        'outputs': 'report.txt',
        'idempotent': True
    }
]


df = pd.DataFrame(tasks)
print(df)

                  task                   inputs                  outputs  \
0               ingest     /data/kaggle_api.ext    stock_prices_raw.json   
1                clean    stock_prices_raw.json  stock_prices_clean.json   
2    calculate_metrics  stock_prices_clean.json       stock_metrics.json   
3  simulate_portfolios       stock_metrics.json   portfolio_results.json   
4          rank_stocks   portfolio_results.json          top_stocks.json   
5               report          top_stocks.json               report.txt   

   idempotent  
0        True  
1        True  
2        True  
3       False  
4        True  
5        True  


## 2) Dependencies (DAG)


In [None]:


import pandas as pd


dependencies = [
    {
        'task': 'ingest',
        'depends_on': None
    },
    {
        'task': 'clean',
        'depends_on': 'ingest'
    },
    {
        'task': 'calculate_metrics',
        'depends_on': 'clean'
    },
    {
        'task': 'simulate_portfolios',
        'depends_on': 'calculate_metrics'
    },
    {
        'task': 'rank_stocks',
        'depends_on': 'simulate_portfolios'
    },
    {
        'task': 'report',
        'depends_on': 'rank_stocks'
    }
]


df = pd.DataFrame(dependencies)
print(df)

                  task           depends_on
0               ingest                 None
1                clean               ingest
2    calculate_metrics                clean
3  simulate_portfolios    calculate_metrics
4          rank_stocks  simulate_portfolios
5               report          rank_stocks


## 3) Logging & Checkpoints Plan
Specify what you will log and where you will checkpoint for each task.

In [None]:


import pandas as pd


logging_strategy = [
    {
        'task': 'ingest',
        'log_level': 'INFO',
        'log_file': 'logs/ingest.log',
        'metrics_logged': 'rows_fetched, api_response_time, data_size_mb'
    },
    {
        'task': 'clean', 
        'log_level': 'INFO',
        'log_file': 'logs/clean.log',
        'metrics_logged': 'rows_before, rows_after, missing_values_filled'
    },
    {
        'task': 'calculate_metrics',
        'log_level': 'INFO', 
        'log_file': 'logs/metrics.log',
        'metrics_logged': 'stocks_processed, calculation_time, correlation_matrix_size'
    },
    {
        'task': 'simulate_portfolios',
        'log_level': 'DEBUG',
        'log_file': 'logs/simulation.log', 
        'metrics_logged': 'portfolios_generated, avg_sharpe_ratio, simulation_time'
    },
    {
        'task': 'rank_stocks',
        'log_level': 'INFO',
        'log_file': 'logs/ranking.log',
        'metrics_logged': 'top_portfolios_selected, unique_stocks_found, ranking_time'
    },
    {
        'task': 'report',
        'log_level': 'INFO',
        'log_file': 'logs/report.log',
        'metrics_logged': 'report_size_kb, top_stocks_count, generation_time'
    }
]

print("Logging Strategy:")
print("=" * 50)
df_logging = pd.DataFrame(logging_strategy)
print(df_logging)

print("\n\nCheckpoint Strategy:")
print("=" * 50)


checkpoints = [
    {
        'task': 'ingest',
        'checkpoint_file': 'checkpoints/stock_prices_raw.checkpoint',
        'recovery_action': 'retry_kaggle_api',
        'validation': 'check_data_completeness'
    },
    {
        'task': 'clean',
        'checkpoint_file': 'checkpoints/stock_prices_clean.checkpoint', 
        'recovery_action': 'reprocess_from_raw',
        'validation': 'validate_data_quality'
    },
    {
        'task': 'calculate_metrics',
        'checkpoint_file': 'checkpoints/stock_metrics.checkpoint',
        'recovery_action': 'recalculate_from_clean',
        'validation': 'check_metric_ranges'
    },
    {
        'task': 'simulate_portfolios',
        'checkpoint_file': 'checkpoints/portfolio_results.checkpoint',
        'recovery_action': 'rerun_simulation',
        'validation': 'validate_portfolio_constraints'
    },
    {
        'task': 'rank_stocks',
        'checkpoint_file': 'checkpoints/top_stocks.checkpoint',
        'recovery_action': 'rerank_from_portfolios', 
        'validation': 'check_top_10_stocks'
    },
    {
        'task': 'report',
        'checkpoint_file': 'checkpoints/report.checkpoint',
        'recovery_action': 'regenerate_report',
        'validation': 'validate_report_format'
    }
]

df_checkpoints = pd.DataFrame(checkpoints)
print(df_checkpoints)

Logging Strategy:
                  task log_level             log_file  \
0               ingest      INFO      logs/ingest.log   
1                clean      INFO       logs/clean.log   
2    calculate_metrics      INFO     logs/metrics.log   
3  simulate_portfolios     DEBUG  logs/simulation.log   
4          rank_stocks      INFO     logs/ranking.log   
5               report      INFO      logs/report.log   

                                      metrics_logged  
0      rows_fetched, api_response_time, data_size_mb  
1     rows_before, rows_after, missing_values_filled  
2  stocks_processed, calculation_time, correlatio...  
3  portfolios_generated, avg_sharpe_ratio, simula...  
4  top_portfolios_selected, unique_stocks_found, ...  
5  report_size_kb, top_stocks_count, generation_time  


Checkpoint Strategy:
                  task                            checkpoint_file  \
0               ingest    checkpoints/stock_prices_raw.checkpoint   
1                clean  checkpoints/

## 4) Right-Sizing Automation


In [None]:


import pandas as pd

automation_strategy = [
    {
        'task': 'ingest',
        'automation_status': 'Automate Now',
        'frequency': 'Daily',
        'rationale': 'Market data changes daily, straightforward API call, minimal human judgment needed'
    },
    {
        'task': 'clean',
        'automation_status': 'Automate Now', 
        'frequency': 'Daily',
        'rationale': 'Deterministic data cleaning rules, well-defined validation checks, stable process'
    },
    {
        'task': 'calculate_metrics',
        'automation_status': 'Automate Now',
        'frequency': 'Daily', 
        'rationale': 'Mathematical calculations (CAGR, Sharpe, volatility) - no human input required'
    },
    {
        'task': 'simulate_portfolios',
        'automation_status': 'Manual for Now',
        'frequency': 'Weekly',
        'rationale': 'Random simulation parameters need validation, market regime changes require human oversight'
    },
    {
        'task': 'rank_stocks',
        'automation_status': 'Manual for Now',
        'frequency': 'Weekly',
        'rationale': 'Stock selection has investment implications, requires human review of top 10 before publishing'
    },
    {
        'task': 'report',
        'automation_status': 'Manual for Now', 
        'frequency': 'Weekly',
        'rationale': 'Investment recommendations need compliance review and stakeholder validation'
    }
]

print("Automation Strategy:")
print("=" * 70)
df_automation = pd.DataFrame(automation_strategy)
print(df_automation)

print("\n\nAutomation Summary:")
print("=" * 30)


automate_now = sum(1 for task in automation_strategy if task['automation_status'] == 'Automate Now')
manual_now = sum(1 for task in automation_strategy if task['automation_status'] == 'Manual for Now')

print(f"Tasks to Automate Now: {automate_now}/6")
print(f"Tasks Staying Manual: {manual_now}/6")

print("\n\nFuture Automation Candidates (6-12 months):")
print("=" * 50)

future_automation = [
    {
        'task': 'simulate_portfolios',
        'automation_trigger': 'After 3 months of stable performance validation',
        'prerequisites': 'Automated anomaly detection, parameter validation rules'
    },
    {
        'task': 'rank_stocks', 
        'automation_trigger': 'After regulatory approval and compliance framework',
        'prerequisites': 'Automated backtesting, performance attribution system'
    },
    {
        'task': 'report',
        'automation_trigger': 'After stakeholder confidence in automated recommendations', 
        'prerequisites': 'Automated compliance checks, email distribution system'
    }
]

df_future = pd.DataFrame(future_automation)
print(df_future)

Automation Strategy:
                  task automation_status frequency  \
0               ingest      Automate Now     Daily   
1                clean      Automate Now     Daily   
2    calculate_metrics      Automate Now     Daily   
3  simulate_portfolios    Manual for Now    Weekly   
4          rank_stocks    Manual for Now    Weekly   
5               report    Manual for Now    Weekly   

                                           rationale  
0  Market data changes daily, straightforward API...  
1  Deterministic data cleaning rules, well-define...  
2  Mathematical calculations (CAGR, Sharpe, volat...  
3  Random simulation parameters need validation, ...  
4  Stock selection has investment implications, r...  
5  Investment recommendations need compliance rev...  


Automation Summary:
Tasks to Automate Now: 3/6
Tasks Staying Manual: 3/6


Future Automation Candidates (6-12 months):
                  task                                 automation_trigger  \
0  simulate_portf