# The Notebook that will be converted to task runner Doit


1. Map over your “scrape-slices” → invoke Lambda or Batch.
2. Wait (Check S3) → ensure all raw shards land.
3. EMR Step → run your Spark/Dask ETL.
4. JDBC Load → write processed tables into Aurora.
5. Visualize via Dask Cloud Resources
6. Push to Flask/frontend resource
7. Notify via SNS or email.

## config

In [1]:
#!/usr/bin/env python3
import sys
from pathlib import Path

# Add the 'src' directory to sys.path
sys.path.append(str(Path.cwd() / "src"))

import os
import boto3
import botocore
from settings import config

s3_resource = boto3.resource('s3')

REGION       = config("REGION")
BUCKET       = config("BUCKET")
LOCAL_MANUAL_DATA_DIR    = config("LOCAL_MANUAL_DATA_DIR")


In [38]:
BUCKET

'finalproject-macs30123-baileymeche'

In [2]:

def ensure_s3_bucket(bucket_name: str, region: str = REGION):
    """Create the S3 bucket if it doesn't exist, otherwise do nothing."""
    s3 = boto3.client("s3", region_name=region)
    try:
        s3.head_bucket(Bucket=bucket_name)
        print(f"[S3] Bucket '{bucket_name}' already exists.")
    except botocore.exceptions.ClientError as e:
        code = int(e.response["Error"]["Code"])
        if code == 404:
            print(f"[S3] Creating bucket '{bucket_name}'...")
            kwargs = {}
            if region != "us-east-1":
                kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
            s3.create_bucket(Bucket=bucket_name, **kwargs)
            print(f"[S3] Bucket '{bucket_name}' created.")
        else:
            raise

def upload_directory(local_dir: str, bucket: str, prefix: str, region: str):
    """Recursively upload all files under local_dir to s3://bucket/prefix/..."""
    s3 = boto3.client("s3", region_name=region)

    if not os.path.isdir(local_dir):
        raise RuntimeError(f"Local directory '{local_dir}' not found.")

    # Ensure prefix ends with "/" if not empty
    if prefix and not prefix.endswith("/"):
        prefix += "/"

    for root, _, files in os.walk(local_dir):
        for fname in files:
            local_path = os.path.join(root, fname)
            rel_path = os.path.relpath(local_path, local_dir)
            s3_key = prefix + rel_path.replace(os.path.sep, "/")
            print(f"[S3] Uploading {local_path} → s3://{bucket}/{s3_key}")
            try:
                s3.upload_file(local_path, bucket, s3_key)
            except Exception as e:
                print(f"❌ Failed to upload {local_path}: {e}")

    print(f"[S3] All files from '{local_dir}' uploaded under '{prefix}'.")

# CREATE RAW BUCKET directory 
ensure_s3_bucket(BUCKET)
upload_directory(LOCAL_MANUAL_DATA_DIR, BUCKET, S3_PREFIX, REGION)

[S3] Creating bucket 'finalproject-macs30123-baileymeche'...
[S3] Bucket 'finalproject-macs30123-baileymeche' created.
[S3] Uploading C:/Users/baile/Box Sync/sp25/MACS 30123/final-project-baileymeche/data_manual\data_README.md → s3://finalproject-macs30123-baileymeche/MANUAL_DATA/data_README.md
[S3] Uploading C:/Users/baile/Box Sync/sp25/MACS 30123/final-project-baileymeche/data_manual\treasury_inflation_swaps.csv → s3://finalproject-macs30123-baileymeche/MANUAL_DATA/treasury_inflation_swaps.csv
[S3] All files from 'C:/Users/baile/Box Sync/sp25/MACS 30123/final-project-baileymeche/data_manual' uploaded under 'MANUAL_DATA/'.


# Ingest & Scrape (Serverless + Batch)
- Fan‐out with SQS → Lambda

    - Push one message per “slice” (e.g. Fed curve for Jan – Mar 2025, TIPS for Apr – Jun, swaps for tenor = 2 yr, etc.).

    - A small Lambda (512 MB, 1–2 vCPU) picks up each message, runs your pull_* scraper for that slice, and writes a Parquet shard to s3://…/raw/<module>/<slice>.parquet.

- Edge Case: Longer pulls
    - If you ever hit the 15 min/3 GB Lambda limit (e.g. a multi-year WRDS query), switch that particular task to an AWS Batch array job with the same code in a Docker container. You still drive it via SQS messages, but now each slice runs on a spot-backed EC2.

In [2]:
#check bucket contents
bucket = BUCKET
bucket_resource = s3_resource.Bucket(bucket)
[obj.key for obj in bucket_resource.objects.all()]

['raw/100_Portfolios_10x10_sheet0.parquet',
 'raw/100_Portfolios_10x10_sheet1.parquet',
 'raw/100_Portfolios_10x10_sheet2.parquet',
 'raw/100_Portfolios_10x10_sheet3.parquet',
 'raw/100_Portfolios_10x10_sheet4.parquet',
 'raw/100_Portfolios_10x10_sheet5.parquet',
 'raw/100_Portfolios_10x10_sheet6.parquet',
 'raw/100_Portfolios_10x10_sheet7.parquet',
 'raw/100_Portfolios_10x10_sheet8.parquet',
 'raw/100_Portfolios_10x10_sheet9.parquet',
 'raw/25_Portfolios_5x5_sheet0.parquet',
 'raw/25_Portfolios_5x5_sheet1.parquet',
 'raw/25_Portfolios_5x5_sheet2.parquet',
 'raw/25_Portfolios_5x5_sheet3.parquet',
 'raw/25_Portfolios_5x5_sheet4.parquet',
 'raw/25_Portfolios_5x5_sheet5.parquet',
 'raw/25_Portfolios_5x5_sheet6.parquet',
 'raw/25_Portfolios_5x5_sheet7.parquet',
 'raw/25_Portfolios_5x5_sheet8.parquet',
 'raw/25_Portfolios_5x5_sheet9.parquet',
 'raw/6_Portfolios_2x3_sheet0.parquet',
 'raw/6_Portfolios_2x3_sheet1.parquet',
 'raw/6_Portfolios_2x3_sheet2.parquet',
 'raw/6_Portfolios_2x3_sheet3.

In [None]:
import boto3
import pandas as pd
import time
from io import BytesIO

s3 = boto3.client('s3')

def s3_parquet_preview(bucket, key, rows=10):
    '''
    Downloads Parquet file from S3 using boto3 and reads preview into a DataFrame.
    '''
    response = s3.get_object(Bucket=bucket, Key=key)
    file_stream = BytesIO(response['Body'].read())
    df = pd.read_parquet(file_stream, engine='fastparquet')  # or 'fastparquet'
    return df.head(rows)


t0 = time.time()
df = s3_parquet_preview(bucket=BUCKET,
                        key='raw/fed_yield_curve.parquet')
print(time.time() - t0, 'seconds')

print(df.head())



 lambda_src/serial_scraper.py
 lambda_src/scraper_lambda.py

# RDS MySQL: Cleaned Results 


# ETL & Analysis (EMR + Dask) for Heavy‐Lifting

- EMR PySpark Step
    - Spin up a small EMR cluster (1 master + 3–5 m5.xlarge cores, spot if you like).
    - One Spark job reads raw.* tables via the S3 connector, does your merges + compute_tips_treasury() logic at scale, and writes out s3://…/processed/tips_treasury.parquet (and the equivalent for your 5 modules).

- Dask for custom kernels

    - If you have bespoke Python loops (e.g. bootstraps, simulations), launch a Dask cluster on the same EMR (or on a separate set of spot EC2s) with dask-ec2 or EMR’s Dask support.
    - Use Numba/CuDF if you need GPU acceleration (p3 instances) for your heaviest routines.

## Data directory for functions 
### TIPS Treasury 
* `S3_MANUAL_DATA` / "treasury_inflation_swaps.csv"   -> `import_inflation_swap_data()`
* `S3_DATA` / "fed_yield_curve.parquet"               ->  `import_treasury_yields()`
* `S3_DATA` / "fed_tips_yield_curve.parquet"          -> `import_tips_yields()` 
* `import_inflation_swap_data(), import_treasury_yields(), import_tips_yields()`-> `compute_tips_treasury()` -> [OUTPUT] / "tips_treasury_implied_rf.parquet"
### Equity Spot Futures
```python
targets = [
        PROCESSED_DIR / "all_indices_calendar_spreads.csv",
        PROCESSED_DIR / "INDU_calendar_spread.csv",
        PROCESSED_DIR / "SPX_calendar_spread.csv",
        PROCESSED_DIR / "NDX_calendar_spread.csv",
    ]
```
```scss
futures_data_processing.py
 ├─► `S3_MANUAL_DATA` / "bloomberg_historical_data.parquet"
 ├─► process_index_futures(raw_data, futures_codes) 
 │     ├─► parse_contract_month_year(cs)                [utility]
 │     └─► get_third_friday(year, month)                [utility]
 └─► merge_calendar_spreads(all_futures)    
       ├─► pd.merge(term1, term2) 
       └─► to_csv(… → OUTPUT)                           : targets
```
```python
targets = [
        PROCESSED_DIR / "cleaned_ois_rates.csv"
    ]
```
```scss
OIS_data_processing.py
 ├─► check os.path.exists(INPUT_FILE) : S3_MANUAL_DATA / "bloomberg_historical_data.parquet"
 ├─► process_ois_data(filepath=INPUT_FILE)
      ├─► pd.read_parquet(filepath)                
      └─► ois_df.to_csv(output_path, index=True)   : targets
```
```python
targets = [
        PROCESSED_DIR / "SPX_Forward_Rates.csv",
        PROCESSED_DIR / "NDX_Forward_Rates.csv",
        PROCESSED_DIR / "INDU_Forward_Rates.csv",
        OUTPUT_DIR / "all_indices_spread_to_2020.png",
        OUTPUT_DIR / "all_indices_spread_to_present.png"
    ]
```
```scss
Spread_calculations.py
 ├─► process_index_forward_rates(index_code)
 │     ├─► pd.read_csv(fut_file)                    PROCESSED_DIR / "<INDEX>_Calendar_spread.csv"
 │     ├─► pd.read_csv(ois_file)                    PROCESSED_DIR / "cleaned_ois_rates.csv"
 │     ├─► build_daily_dividends(index_code)
 │     │      └─► pd.read_parquet(input_file)       "bloomberg_historical_data.parquet"
 │     ├─► barndorff_nielsen_filter(…)              [pure transform]
 │     └─► merged_df.to_csv(out_file)               PROCESSED_DIR / "<INDEX>_Forward_Rates.csv"
 └─► plot_all_indices(results)
        └─► _plot(…, "to_2020")                     : targets
        └─► _plot(…, "to_present")                  : targets
```
### CIP 
```scss
$project‐map: (
  files: (
    // Notebooks & scripts
    main_cip_notebook:   "src/main_cip.ipynb",
    cip_analysis_script: "src/cip_analysis.py",
    pull_cip_module:     "src/pull_bloomberg_cip_data.py",
    settings_module:     "src/settings.py",
    dir_funcs_module:    "src/directory_functions.py",
    manual_data_excel:   "data_manual/CIP_2025.xlsx"
  ),

$module‐dependency‐map: (
  data_fetch: ( ),
  plots: ( data_fetch ),
  summary‐stats code (compute_cip, compute_cip_statistics, save_cip_statistics_as_html)
  stats: ( data_fetch, plots ),
  notebooks: ( data_fetch, plots, stats )

clean_data.py
 ├─► pull_bloomberg_cip_data.load_raw('2025-03-01')   MANUAL_DATA_DIR / "CIP_2025.xlsx"
 ├─► df.to_csv(output_file="tidy_data.csv")           OUTPUT_DIR / "tidy_data.csv"

 module pull_bloomberg_cip_data.py
 ├─► settings.BLOOMBERG (config)        
 
download()
 ├─► requests.get(url)
 ├─► open("./data_manual/CIP_2025.xlsx","wb")   [I/O write: "./data_manual/CIP_2025.xlsx"]
 └─► pd.read_excel(target_file)                 [I/O read: "./data_manual/CIP_2025.xlsx"]

fetch_bloomberg_historical_data(start, end)
 └─► blp.bdh(...) (no file I/O) → DataFrames

plot_cip(end)
 ├─ if BLOOMBERG=False:
 │     ├─► pd.read_excel("./data_manual/CIP_2025.xlsx")  
 │     └─► plt.savefig(f"spread_plot_{yr}.pdf/png")  [I/O write: "spread_plot_{yr}.pdf", "spread_plot_{yr}.png"]
 └─ else:
       └─► fetch_bloomberg_historical_data(...)

load_raw(end, plot)
 ├─ if BLOOMBERG=False:
 │     ├─► pd.read_excel(...) or download()    [I/O read/write: Excel paths or download]
 │     └─► returns df_merged in memory
 └─ else:
       └─► fetch_bloomberg_historical_data(...)

compute_cip(end)
 └─► load_raw(end) → in-memory DataFrame (no file I/O)

load_raw_pieces(end, excel, plot)
 ├─ if BLOOMBERG=False:
 │     └─► pd.read_excel(...)              [I/O read: Excel paths]
 └─ else:
       └─► fetch_bloomberg_historical_data(...)
```
```python
 "targets": [
            str(OUTPUT_DIR / "cip_summary_overall.html"),
            str(OUTPUT_DIR / "cip_correlation_matrix.html"),
            str(OUTPUT_DIR / "cip_annual_statistics.html"),
        ]
```
```scss
cip_analysis.py
 ├─ save_cip_statistics_as_html(stats_dict)
 ├─ compute_cip(end='2020-01-01')
 │    └─ load_raw(end)             # from pull_bloomberg_cip_data
 └─ compute_cip_statistics(cip_data)
 {
  "overall_statistics": DataFrame,
  "correlation_matrix": DataFrame,
  "annual_statistics": DataFrame
}
```


### Treasury Spot 
  

* **Reads**

  * `${DATA_DIR}/treasury_df.csv`
  * `${DATA_DIR}/ois_df.csv`
  * `${DATA_DIR}/last_day_df.csv`
* **Writes**

  * `${OUTPUT_DIR}/arbitrage_spread_2.pdf`
  * `${OUTPUT_DIR}/arbitrage_spread_5.pdf`
  * `${OUTPUT_DIR}/arbitrage_spread_10.pdf`
  * `${OUTPUT_DIR}/arbitrage_spread_20.pdf`
  * `${OUTPUT_DIR}/arbitrage_spread_30.pdf`
  * `${DATA_DIR}/treasury_sf_output.csv`
* **In-memory**

  * `df_long`, intermediate columns, and final `df_out`, `df_wide`

### Market Expectations

```scss
./src/pull_ken_french_data.py  ←──  task_pull_ken_french_data ──> Excel portfolios
       (data fetch & cleaning)

./src/pull_CRSP_index.py      ←──  task_pull_CRSP_index ──>  crsp_value_weighted_index.csv
       (data fetch & cleaning)

Notebooks:
  01_Market_...ipynb      \
  run_regressions.ipynb   ├─> task_convert_notebooks_to_scripts ──> `_*.py`
                          └─> task_run_notebooks  
                                ├─> **execute notebook code** (analysis/regressions)
                                ├─> HTML export
                                └─> copy & clear outputs

reports/project.tex  ←──  task_compile_latex_docs  ──> project.pdf


# Cleanup

In [5]:
def cleanup(bucket_name):
    bucket = s3_resource.Bucket(bucket_name)
    for item in bucket.objects.all():
        item.delete()
        
cleanup(BUCKET)

In [9]:


rds = boto3.client("rds", region_name=REGION)
ec2 = boto3.client("ec2", region_name=REGION)
sm  = boto3.client("secretsmanager", region_name=REGION)
s3  = boto3.resource("s3", region_name=REGION)

def delete_db_instance(instance_id):
    try:
        print(f"[RDS] Deleting instance '{instance_id}'...")
        rds.delete_db_instance(
            DBInstanceIdentifier=instance_id,
            SkipFinalSnapshot=True,
            DeleteAutomatedBackups=True
        )
        waiter = rds.get_waiter("db_instance_deleted")
        waiter.wait(DBInstanceIdentifier=instance_id)
        print(f"[RDS] Instance '{instance_id}' deleted.")
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "DBInstanceNotFound":
            print(f"[RDS] Instance '{instance_id}' not found; skipping.")
        else:
            raise

def delete_db_cluster(cluster_id):
    try:
        print(f"[RDS] Deleting cluster '{cluster_id}'...")
        rds.delete_db_cluster(
            DBClusterIdentifier=cluster_id,
            SkipFinalSnapshot=True
        )
        waiter = rds.get_waiter("db_cluster_deleted")
        waiter.wait(DBClusterIdentifier=cluster_id)
        print(f"[RDS] Cluster '{cluster_id}' deleted.")
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "DBClusterNotFoundFault":
            print(f"[RDS] Cluster '{cluster_id}' not found; skipping.")
        else:
            raise

def delete_subnet_group(group_name):
    try:
        print(f"[RDS] Deleting subnet group '{group_name}'...")
        rds.delete_db_subnet_group(DBSubnetGroupName=group_name)
        print(f"[RDS] Subnet group '{group_name}' deleted.")
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "DBSubnetGroupNotFoundFault":
            print(f"[RDS] Subnet group '{group_name}' not found; skipping.")
        else:
            raise

def delete_secret(secret_name):
    try:
        print(f"[Secrets] Deleting secret '{secret_name}'...")
        sm.delete_secret(SecretId=secret_name, ForceDeleteWithoutRecovery=True)
        print(f"[Secrets] Secret '{secret_name}' deleted.")
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "ResourceNotFoundException":
            print(f"[Secrets] Secret '{secret_name}' not found; skipping.")
        else:
            raise

def delete_security_group(name, vpc_id):
    # find by name+vpc
    resp = ec2.describe_security_groups(Filters=[
        {"Name":"group-name","Values":[name]},
        {"Name":"vpc-id","Values":[vpc_id]}
    ])
    groups = resp["SecurityGroups"]
    if not groups:
        print(f"[EC2] SG '{name}' not found; skipping.")
        return
    sg_id = groups[0]["GroupId"]
    # revoke any ingress rules
    perms = groups[0].get("IpPermissions", [])
    if perms:
        print(f"[EC2] Revoking ingress on {sg_id}...")
        ec2.revoke_security_group_ingress(GroupId=sg_id, IpPermissions=perms)
    # delete
    try:
        print(f"[EC2] Deleting SG '{sg_id}'...")
        ec2.delete_security_group(GroupId=sg_id)
        print(f"[EC2] SG '{sg_id}' deleted.")
    except botocore.exceptions.ClientError as e:
        print(f"[EC2] Could not delete SG '{sg_id}': {e}")

def empty_and_delete_bucket(bucket_name):
    bucket = s3.Bucket(bucket_name)
    # delete all objects
    print(f"[S3] Emptying bucket '{bucket_name}'...")
    bucket.objects.all().delete()
    # delete all versions (if versioning enabled)
    try:
        bucket.object_versions.delete()
    except Exception:
        pass
    # delete bucket itself
    try:
        print(f"[S3] Deleting bucket '{bucket_name}'...")
        bucket.delete()
        print(f"[S3] Bucket '{bucket_name}' deleted.")
    except botocore.exceptions.ClientError as e:
        print(f"[S3] Could not delete bucket '{bucket_name}': {e}")

# 1) Tear down RDS
# delete_db_instance(INSTANCE_ID)
# delete_db_cluster(CLUSTER_ID)
# delete_subnet_group(SUBNET_GROUP)

# # 2) Delete Secrets Manager entry
# delete_secret(SECRET_NAME)

# # 3) Delete Security Group
# delete_security_group(SG_NAME, VPC_ID)

# 4) Delete S3 bucket
empty_and_delete_bucket(BUCKET)

print("\n=== TEARDOWN COMPLETE ===")


[S3] Emptying bucket 'finalproject-macs30123-baileymeche'...
[S3] Deleting bucket 'finalproject-macs30123-baileymeche'...
[S3] Bucket 'finalproject-macs30123-baileymeche' deleted.

=== TEARDOWN COMPLETE ===


In [13]:
# teardown.ipynb cell

import boto3
import time
from botocore.exceptions import ClientError

# === Configuration ===
REGION      = "us-east-1"
LAMBDA_NAME = "ScraperWorker"
BUCKET_NAME = "finalproject-macs30123-baileymeche"
RDS_ID      = "my-scrape-db"

# === Clients ===
lam = boto3.client("lambda", region_name=REGION)
s3  = boto3.resource("s3", region_name=REGION)
rds = boto3.client("rds", region_name=REGION)

# 1️⃣ Delete Lambda function
print(f"→ Deleting Lambda function '{LAMBDA_NAME}'…")
try:
    lam.delete_function(FunctionName=LAMBDA_NAME)
    print("   Lambda deleted.")
except ClientError as e:
    print("   Error deleting Lambda:", e.response["Error"]["Message"])

# 2️⃣ Empty and (optionally) delete S3 bucket
bucket = s3.Bucket(BUCKET_NAME)
print(f"→ Emptying bucket '{BUCKET_NAME}'…")
# delete all objects (and versions, if versioned)
try:
    # If versioning enabled, you need to delete all versions
    bucket.object_versions.delete()
    print("   All objects deleted.")
    # Now delete bucket itself
    bucket.delete()
    print(f"   Bucket '{BUCKET_NAME}' deleted.")
except ClientError as e:
    print("   Error cleaning/deleting bucket:", e.response["Error"]["Message"])

# 3️⃣ Delete RDS instance
print(f"→ Deleting RDS instance '{RDS_ID}'…")
try:
    rds.delete_db_instance(
        DBInstanceIdentifier=RDS_ID,
        SkipFinalSnapshot=True,   # change to False if you want to keep a final snapshot
        DeleteAutomatedBackups=True
    )
    print("   Delete initiated. Waiting for deletion to complete…")
    waiter = rds.get_waiter("db_instance_deleted")
    waiter.wait(DBInstanceIdentifier=RDS_ID)
    print("   RDS instance deleted.")
except ClientError as e:
    print("   Error deleting RDS:", e.response["Error"]["Message"])


→ Deleting Lambda function 'ScraperWorker'…
   Error deleting Lambda: Function not found: arn:aws:lambda:us-east-1:325826323478:function:ScraperWorker
→ Emptying bucket 'finalproject-macs30123-baileymeche'…
   Error cleaning/deleting bucket: The specified bucket does not exist
→ Deleting RDS instance 'my-scrape-db'…
   Delete initiated. Waiting for deletion to complete…
   RDS instance deleted.
