# EraEx: Combined CSV Ingestion (Colab)

This notebook reads **BOTH** CSV files and converts them to Parquet:
1. `dataset.csv` (existing data)
2. `ndjson_converted.csv` (from notebook 01)

In [None]:
%pip install -r requirements.txt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from pathlib import Path
import polars as pl
from datetime import date

PROJECT_DIR = Path('/content/drive/MyDrive/EraEx')
RAW_DIR = PROJECT_DIR / 'data' / 'raw'
PROCESSED_DIR = PROJECT_DIR / 'data' / 'processed'

PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

YEAR_RANGE = range(2012, 2019)

CSV_FILES = [
    RAW_DIR / 'dataset.csv',
    RAW_DIR / 'ndjson_converted.csv'
]

print('CSV files to process:')
for f in CSV_FILES:
    if f.exists():
        print(f'  ✓ {f.name} ({f.stat().st_size / 1e9:.2f} GB)')
    else:
        print(f'  ✗ {f.name} (NOT FOUND)')

In [None]:
COLUMN_MAPPING = {
    'id': 'track_id',
    'soundcloud_id': 'track_id',
    'user': 'artist',
    'username': 'artist',
    'tag_list': 'tags',
    'plays': 'playback_count',
    'url': 'permalink_url',
    'date': 'created_at',
}

In [None]:
def ingest_csv(csv_path: Path, batch_counter: int = 0):
    if not csv_path.exists():
        print(f'Skipping {csv_path.name} (not found)')
        return batch_counter, {}
    
    print(f'\nProcessing: {csv_path.name}')
    
    stats = {year: 0 for year in YEAR_RANGE}
    
    reader = pl.read_csv_batched(
        csv_path,
        batch_size=500000,
        ignore_errors=True,
        truncate_ragged_lines=True,
        infer_schema_length=10000
    )
    
    while True:
        batches = reader.next_batches(1)
        if not batches:
            break
        
        df = batches[0]
        
        rename_map = {}
        for old_col in df.columns:
            old_lower = old_col.lower().strip()
            if old_lower in COLUMN_MAPPING:
                rename_map[old_col] = COLUMN_MAPPING[old_lower]
        if rename_map:
            df = df.rename(rename_map)
        
        if 'track_id' not in df.columns:
            if 'id' in df.columns:
                df = df.rename({'id': 'track_id'})
        
        if 'track_id' in df.columns:
            df = df.with_columns([pl.col('track_id').cast(pl.Utf8)])
        
        if 'year' not in df.columns and 'created_at' in df.columns:
            df = df.with_columns([
                pl.col('created_at').cast(pl.Utf8).str.slice(0, 4).cast(pl.Int32, strict=False).alias('year')
            ])
        
        df = df.with_columns([pl.lit(date.today()).alias('ingest_date')])
        
        for year in YEAR_RANGE:
            year_df = df.filter(pl.col('year') == year)
            if year_df.height == 0:
                continue
            
            stats[year] += year_df.height
            
            year_dir = PROCESSED_DIR / f'year={year}'
            year_dir.mkdir(parents=True, exist_ok=True)
            
            batch_counter += 1
            out_path = year_dir / f'batch_{batch_counter:05d}.parquet'
            
            year_df = year_df.with_columns([pl.col('ingest_date').cast(pl.Date)])
            year_df.write_parquet(out_path)
        
        total_so_far = sum(stats.values())
        if total_so_far % 500000 == 0:
            print(f'  Processed: {total_so_far:,} rows')
    
    return batch_counter, stats

In [None]:
batch_counter = 0
all_stats = {}

for csv_file in CSV_FILES:
    batch_counter, stats = ingest_csv(csv_file, batch_counter)
    all_stats[csv_file.name] = stats
    print(f"  Total: {sum(stats.values()):,} rows")

In [None]:
print('\n' + '=' * 50)
print('INGEST COMPLETE')
print('=' * 50)

grand_total = 0
for year_dir in sorted(PROCESSED_DIR.glob('year=*')):
    parquet_files = list(year_dir.glob('*.parquet'))
    total_rows = sum(pl.scan_parquet(f).select(pl.count()).collect().item() for f in parquet_files)
    grand_total += total_rows
    print(f'{year_dir.name}: {total_rows:,} rows in {len(parquet_files)} files')

print(f'\nGrand Total: {grand_total:,} rows')