# Data Setup and Processing

This notebook handles:
1. Decompressing GZ files to JSON
2. Converting JSON files to Parquet in chunks

In [None]:
import duckdb
import os
import gzip
import glob
import shutil

# Create necessary directories
os.makedirs('/data/json', exist_ok=True)
os.makedirs('/data/parquet', exist_ok=True)

## 1. Decompress GZ Files

In [None]:
def decompress_gz_files():
    gz_files = glob.glob('/data/elasticsearch/*.json.gz')
    
    for gz_file in gz_files:
        base_name = os.path.basename(gz_file)
        json_name = base_name[:-3]  # Remove .gz extension
        json_path = os.path.join('/data/json', json_name)
        
        print(f"🔄 Decompressing {base_name}...")
        
        with gzip.open(gz_file, 'rb') as f_in:
            with open(json_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
                
        print(f"✅ Created {json_name}")

decompress_gz_files()

## 2. Convert JSON to Parquet in Chunks

In [None]:
def convert_to_parquet(json_file):
    print(f"\n🔄 Processing {os.path.basename(json_file)}")
    
    # Connect to DuckDB
    con = duckdb.connect()
    
    # Count total rows
    total_rows = con.execute(f"SELECT COUNT(*) FROM read_ndjson_objects('{json_file}');").fetchone()[0]
    chunk_size = total_rows // 10
    print(f"📊 Total rows: {total_rows} | Chunk size: {chunk_size}")
    
    # Process in 10 chunks
    offset = 0
    base_name = os.path.basename(json_file)
    file_name = os.path.splitext(base_name)[0]
    
    for i in range(10):
        print(f"🔄 Processing chunk {i+1}/10 (Offset: {offset})...")
        
        query = f"""
        CREATE OR REPLACE TABLE batch_data AS
        SELECT * FROM read_ndjson_objects('{json_file}')
        LIMIT {chunk_size} OFFSET {offset};
        """
        con.execute(query)
        
        count = con.execute("SELECT COUNT(*) FROM batch_data;").fetchone()[0]
        if count == 0:
            print("✅ No more rows to process.")
            break
        
        # Save chunk as Parquet
        parquet_file = f"/data/parquet/{file_name}_{i+1}.parquet"
        con.execute(f"COPY batch_data TO '{parquet_file}' (FORMAT 'parquet', COMPRESSION 'zstd');")
        print(f"✅ Saved: {os.path.basename(parquet_file)} ({count} rows)")
        
        offset += count
    
    con.close()
    print(f"✅ Completed processing {base_name}")

# Process all JSON files
json_files = glob.glob('/data/json/*.json')
for json_file in json_files:
    convert_to_parquet(json_file)