## Notes and Tips\n\n- **Rate Limiting**: The functions include built-in rate limiting (sleep every 20 requests) to avoid API throttling\n- **Error Handling**: Failed requests are logged and can be retried separately\n- **Data Structure**: \n  - `SYMPHONIES.csv` contains basic symphony metadata\n  - `OOS.csv` contains detailed symphony information from API\n  - `BACKTEST.csv` contains full backtest statistics\n- **File Organization**: Data is organized by date in `bin/SYMPHONIES-YYYY-MM-DD/` directories"

In [None]:
# Test fetching a single symphony
test_symphony_id = \"2gX2Ch9EsG5MmJ9yjAvN\"  # OG Boring TQQQ Trendz
print(f\"Testing fetch for symphony: {test_symphony_id}\")\n\nok, status_code, data = fetch_symphony(test_symphony_id)\nif ok:\n    print(f\"Success! Symphony name: {data.get('symphony_name', 'Unknown')}\")\n    print(f\"Data keys: {list(data.keys())[:5]}...\")  # Show first 5 keys\nelse:\n    print(f\"Failed with status code: {status_code}\")"

## Testing Individual Functions\nTest the utility functions with a small sample:"

In [None]:
# Step 2: Load existing data (if available)
try:
    df_symphonies = pd.read_csv(get_csv_name('SYMPHONIES'))
    df_oos = pd.read_csv(get_csv_name('OOS'))
    print(f"Loaded symphonies: {len(df_symphonies)} rows")
    print(f"Loaded OOS data: {len(df_oos)} rows")
    
    # Display sample data
    print("\\nSample symphony data:")
    print(df_symphonies[['symphony_sid', 'title', 'name']].head(3))
    
except FileNotFoundError as e:
    print(f"Data files not found: {e}")
    print("Run the main workflow first to generate data.")

In [None]:
# Step 1: Set up dates and directories
start_date, end_date = get_start_end_date()
print(f"Date range: {start_date} to {end_date}")

# Check if data directory exists
import os
data_dir = f"bin/SYMPHONIES-{end_date}"
if os.path.exists(data_dir):
    print(f"Data directory exists: {data_dir}")
else:
    print(f"Data directory will be created: {data_dir}")

# Example Usage of Extracted Utility Functions
This notebook shows how the Jupyter notebooks can be updated to use the modular functions.

## Imports

In [None]:
# Updated imports for composer_db.ipynb
from composer_api import fetch_symphony, fetch_backtest_raw
from data_processing import (
    get_symphonies, symphonies_to_df, response_to_dataframe,
    get_backtest_and_symphony_name, convert_sid_dict_to_df, merge_dicts
)
from file_utils import get_csv_name, write_json, read_json, get_start_end_date
from quant_analysis import calculate_quantstats_metrics, calculate_oos_stats

import pandas as pd
import time
from os import listdir
from os.path import isfile, join

## Utility Functions

In [None]:
# Processing Discord exports (replaces cell 3 in composer_db.ipynb)
def process_discord_exports(dirpath):
    """Process all Discord export files and extract symphonies."""
    jsonfiles = [join(dirpath, f) for f in listdir(dirpath) if isfile(join(dirpath, f))]
    
    symphonies_dict = {}
    for target_file in jsonfiles:
        print(f"Processing file: {target_file}")
        symphonies_dict.update(get_symphonies(target_file))
    
    return symphonies_dict

In [None]:
# Batch fetch symphonies (replaces cell 6 in composer_db.ipynb)
def batch_fetch_symphonies(symphony_sid_list):
    """Fetch all symphony data with rate limiting."""
    response_list = []
    failure_list = []
    
    for idx, sid in enumerate(symphony_sid_list):
        if idx % 20 == 0:
            print(f'Sleeping at index {idx} with failure {len(failure_list)}')
            time.sleep(1)
        
        ok_status_code, status_code, json_data = fetch_symphony(sid)
        if ok_status_code:
            response_list.append(json_data)
        else:
            failure_list.append((idx, sid, status_code))
    
    return response_list, failure_list

In [None]:
# Batch fetch backtests (replaces cell 14 in composer_db.ipynb)
def batch_fetch_backtests(df, start_date, end_date):
    """Fetch all backtest data with rate limiting."""
    rlist, flist = [], []
    
    for idx, row in df.iterrows():
        if idx % 20 == 0:
            print(f'Sleeping at index {idx} with failure {len(flist)}')
            time.sleep(1)
        
        sid = row['symphony_sid']
        filename = f"bin/BT-{end_date}/{sid}.json"
        ok_status_code, status_code, jsond = fetch_backtest_raw(sid, start_date, end_date)
        write_json(jsond, filename)
        
        if ok_status_code:
            rlist.append(jsond)
        else:
            flist.append((idx, sid, status_code))
    
    return rlist, flist

In [None]:
# Process backtest results (replaces cell 16 in composer_db.ipynb)
def process_backtest_results(jsonfiles):
    """Process all backtest JSON files."""
    dict_allocation, dict_return, dict_stats, dict_name = {}, {}, {}, {}
    
    for index, jsonfile in enumerate(jsonfiles):
        if index % 20 == 0:
            print(f'Index: {index}')
        try:
            jsond = read_json(jsonfile)
            df_allocations, df_return, stats, symphony_name, id = get_backtest_and_symphony_name(jsond)
            dict_name[id] = symphony_name
            dict_stats[id] = stats
            dict_return[id] = df_return
            dict_allocation[id] = df_allocations
        except:
            print(f'Fail jsonfile {jsonfile}')
    
    return dict_allocation, dict_return, dict_stats, dict_name

## Main Workflow

In [None]:
# Complete data collection workflow
def main_data_collection_workflow():
    """Complete data collection workflow."""
    start_date, end_date = get_start_end_date()
    
    # Process Discord exports
    dirpath = '~/source/discord/archive/20241229'
    symphonies_dict = process_discord_exports(dirpath)
    
    # Convert to DataFrame and save
    df = symphonies_to_df(symphonies_dict)
    df.to_csv(get_csv_name('SYMPHONIES'))
    
    # Fetch symphony data
    symphony_sid_list = df['symphony_sid'].values
    response_list, failure_list = batch_fetch_symphonies(symphony_sid_list)
    
    # Convert responses to DataFrame and save
    df_response = response_to_dataframe(response_list)
    df_response.to_csv(get_csv_name('OOS'))
    
    # Fetch backtest data
    rlist, flist = batch_fetch_backtests(df, start_date, end_date)
    
    # Process backtest results
    jsonfiles = [f"bin/BT-{end_date}/{sid}.json" for sid in df['symphony_sid'].values]
    dict_allocation, dict_return, dict_stats, dict_name = process_backtest_results(jsonfiles)
    
    # Calculate quantstats metrics
    dict_quant_stats = calculate_quantstats_metrics(dict_return)
    
    # Calculate OOS stats
    sid_to_oos = dict(df_response[['symphony_sid', 'backtest_start_date']].values)
    dict_quant_oos_stats = calculate_oos_stats(dict_return, sid_to_oos)
    
    # Save final results
    dict_backtest_stats = merge_dicts(dict_stats, dict_quant_stats)
    df_backtest_stats = convert_sid_dict_to_df(dict_name, dict_backtest_stats)
    df_backtest_stats.to_csv(get_csv_name('BACKTEST'))
    
    df_oos_stats = convert_sid_dict_to_df(dict_name, dict_quant_oos_stats)
    df_oos_stats.to_csv(get_csv_name('OOS'))
    
    return df_backtest_stats, df_oos_stats

## Example Usage

In [None]:
# Run the complete workflow
df_backtest_stats, df_oos_stats = main_data_collection_workflow()

# Display basic statistics
print(f"Backtest stats shape: {df_backtest_stats.shape}")
print(f"OOS stats shape: {df_oos_stats.shape}")

# Display first few rows
df_backtest_stats.head()

## Alternative: Step-by-Step Execution\nFor testing and debugging, you can run each step individually:"