In [1]:
import duckdb
import pandas as pd
import glob
import os

# Connect to DuckDB
conn = duckdb.connect()

print("DuckDB connection established")

DuckDB connection established


In [2]:
# Define parquet file glob patterns to merge
parquet_globs = [
    "/home/lewis/.cache/huggingface/hub/datasets--Trelis--log-programs-20250805/snapshots/d34fb29527e27a4ab818366d0617f9c545abd643/data/*.parquet",
    "/home/lewis/.cache/huggingface/hub/datasets--julien31--soar_arc_train_5M/snapshots/cca6061b73c3e80bde9ad3278ad3cfc959bb59de/*.parquet",
    "/home/lewis/code/trelis-arc/hodel_programs.parquet",
    "/home/lewis/code/trelis-arc/soar_log_programs_v2.parquet"
]

# Collect all parquet files from the glob patterns
print("Collecting parquet files from glob patterns...")
all_parquet_files = []

for i, pattern in enumerate(parquet_globs):
    files = glob.glob(pattern)
    if files:
        all_parquet_files.extend(files)
        print(f"Pattern {i+1}: {len(files)} files found")
        print(f"  {pattern}")
    else:
        print(f"Pattern {i+1}: No files found")
        print(f"  {pattern}")

print(f"\nTotal parquet files found: {len(all_parquet_files)}")

# Show some sample files
if all_parquet_files:
    print("\nSample files:")
    for f in all_parquet_files[:5]:
        print(f"  {f}")
    if len(all_parquet_files) > 5:
        print(f"  ... and {len(all_parquet_files) - 5} more")

Collecting parquet files from glob patterns...
Pattern 1: 11 files found
  /home/lewis/.cache/huggingface/hub/datasets--Trelis--log-programs-20250805/snapshots/d34fb29527e27a4ab818366d0617f9c545abd643/data/*.parquet
Pattern 2: 5 files found
  /home/lewis/.cache/huggingface/hub/datasets--julien31--soar_arc_train_5M/snapshots/cca6061b73c3e80bde9ad3278ad3cfc959bb59de/*.parquet
Pattern 3: 1 files found
  /home/lewis/code/trelis-arc/hodel_programs.parquet
Pattern 4: 1 files found
  /home/lewis/code/trelis-arc/soar_log_programs_v2.parquet

Total parquet files found: 18

Sample files:
  /home/lewis/.cache/huggingface/hub/datasets--Trelis--log-programs-20250805/snapshots/d34fb29527e27a4ab818366d0617f9c545abd643/data/train-00010-of-00011.parquet
  /home/lewis/.cache/huggingface/hub/datasets--Trelis--log-programs-20250805/snapshots/d34fb29527e27a4ab818366d0617f9c545abd643/data/train-00001-of-00011.parquet
  /home/lewis/.cache/huggingface/hub/datasets--Trelis--log-programs-20250805/snapshots/d34f

In [3]:
# Ready for file-by-file processing
print(f"Ready to process {len(all_parquet_files)} parquet files")
print("File-by-file processing approach will handle memory efficiently!")

Ready to process 18 parquet files
File-by-file processing approach will handle memory efficiently!


In [4]:
# Check schema by sampling a few files
print("Checking schemas by sampling files...")

# Sample up to 5 files to check schema consistency
sample_size = min(5, len(all_parquet_files))
sample_files = all_parquet_files[:sample_size]

schemas = []
for i, sample_file in enumerate(sample_files):
    try:
        print(f"\nChecking schema for file {i+1}:")
        print(f"  {sample_file}")
        
        # Create a temporary view for this single file
        conn.execute(f"DROP VIEW IF EXISTS temp_sample")
        conn.execute(f"CREATE VIEW temp_sample AS SELECT * FROM read_parquet('{sample_file}')")
        
        # Get column info
        schema_info = conn.execute("DESCRIBE temp_sample").fetchall()
        column_names = [col[0] for col in schema_info]
        schemas.append((sample_file, column_names))
        
        print(f"  Columns: {column_names}")
            
        # Get row count for this sample file
        count = conn.execute("SELECT COUNT(*) FROM temp_sample").fetchone()[0]
        print(f"  Row count: {count:,}")
        
        # Clean up
        conn.execute("DROP VIEW temp_sample")
        
    except Exception as e:
        print(f"Error with file {i+1}: {e}")

print(f"\nSchemas collected from {len(schemas)} sample files")

# Check if all sampled files have the same schema
if len(schemas) > 1:
    first_schema = schemas[0][1]
    schema_consistent = all(schema[1] == first_schema for schema in schemas)
    print(f"Schema consistency across samples: {'✅ Consistent' if schema_consistent else '❌ Inconsistent'}")
    
    if not schema_consistent:
        print("Schema differences found:")
        for i, (file_path, columns) in enumerate(schemas):
            print(f"  File {i+1}: {columns}")
else:
    print("Only one sample file checked - cannot verify consistency")

Checking schemas by sampling files...

Checking schema for file 1:
  /home/lewis/.cache/huggingface/hub/datasets--Trelis--log-programs-20250805/snapshots/d34fb29527e27a4ab818366d0617f9c545abd643/data/train-00010-of-00011.parquet
  Columns: ['task_id', 'reasoning', 'code', 'correct_train_input', 'correct_test_input', 'predicted_train_output', 'predicted_test_output', 'train_input', 'test_input', 'model', 'generation']
  Row count: 30,165

Checking schema for file 2:
  /home/lewis/.cache/huggingface/hub/datasets--Trelis--log-programs-20250805/snapshots/d34fb29527e27a4ab818366d0617f9c545abd643/data/train-00001-of-00011.parquet
  Columns: ['task_id', 'reasoning', 'code', 'correct_train_input', 'correct_test_input', 'predicted_train_output', 'predicted_test_output', 'train_input', 'test_input', 'model', 'generation']
  Row count: 30,166

Checking schema for file 3:
  /home/lewis/.cache/huggingface/hub/datasets--Trelis--log-programs-20250805/snapshots/d34fb29527e27a4ab818366d0617f9c545abd643

In [5]:
# Check if files have the minimum required columns for merging
print("\nChecking for minimum required columns...")

# Basic required columns for any merge (can be customized)
basic_required = ['code', 'task_id']

if schemas:
    # Use the first schema as reference
    available_columns = [col.lower() for col in schemas[0][1]]
    has_basic_required = all(req_col in available_columns for req_col in basic_required)
    
    print(f"Available columns: {schemas[0][1]}")
    print(f"Has basic required columns {basic_required}: {'✅ Yes' if has_basic_required else '❌ No'}")
    
    if has_basic_required:
        print(f"✅ Files are compatible for merging")
        compatible_files = all_parquet_files
        print(f"Files to process: {len(compatible_files)}")
        
        # Show sample data
        print(f"\nSample data from first file:")
        try:
            conn.execute(f"DROP VIEW IF EXISTS temp_sample")
            conn.execute(f"CREATE VIEW temp_sample AS SELECT * FROM read_parquet('{all_parquet_files[0]}')")
            sample = conn.execute("SELECT code, task_id FROM temp_sample LIMIT 3").fetchall()
            for row in sample:
                print(f"  code: {row[0][:50]}{'...' if len(str(row[0])) > 50 else ''}, task_id: {row[1]}")
            conn.execute("DROP VIEW temp_sample")
        except Exception as e:
            print(f"Error sampling: {e}")
    else:
        print(f"❌ Files missing required columns. Available: {available_columns}")
        compatible_files = []
else:
    print("❌ No schema information available")
    compatible_files = []


Checking for minimum required columns...
Available columns: ['task_id', 'reasoning', 'code', 'correct_train_input', 'correct_test_input', 'predicted_train_output', 'predicted_test_output', 'train_input', 'test_input', 'model', 'generation']
Has basic required columns ['code', 'task_id']: ✅ Yes
✅ Files are compatible for merging
Files to process: 18

Sample data from first file:
  code: def transform(grid):
    transformed = []
    for ..., task_id: 6f473927
  code: def transform(input_grid):
    
    
    
    tran..., task_id: 6f473927
  code: def transform(grid):
    transformed = []
    for ..., task_id: 6f473927


In [None]:
# Partitioned merge: process 1/10th of each file at a time
output_path = "/home/lewis/code/trelis-arc/king_programs.parquet"
NUM_PARTITIONS = 10  # Process files in 10 partitions

# Define the columns we want to select from each file - ALL MUST BE PRESENT
required_columns = ['task_id', 'code', 'predicted_train_output', 'predicted_test_output', 
                   'correct_train_input', 'correct_test_input', 'model']

if compatible_files:
    print(f"Partitioned merge of {len(compatible_files)} parquet files...")
    print(f"Number of partitions: {NUM_PARTITIONS}")
    print(f"Required columns: {required_columns}")
    print("Note: No deduplication will be performed - raw merge only")
    
    # Check that files have ALL required columns using the first file as reference
    if schemas:
        available_columns = [col for col in schemas[0][1]]
        
        print(f"\nAvailable columns in files:")
        print(f"  {available_columns}")
        
        # Check which required columns are missing
        missing = [col for col in required_columns if col not in available_columns]
        if missing:
            print(f"  ❌ MISSING REQUIRED COLUMNS: {missing}")
            print(f"All files must have ALL required columns: {required_columns}")
            print("Please fix the source data or adjust the required columns list.")
        else:
            print(f"  ✅ All required columns present")
            
            print(f"\nProceeding with partitioned processing...")
            
            select_clause = ", ".join(required_columns)
            
            # First, get row counts for all files to calculate partition sizes
            print("\nAnalyzing file sizes for partitioning...")
            file_info = []
            total_files_with_data = 0
            
            for i, file_path in enumerate(compatible_files):
                try:
                    conn.execute(f"DROP VIEW IF EXISTS temp_count")
                    conn.execute(f"CREATE VIEW temp_count AS SELECT {select_clause} FROM read_parquet('{file_path}')")
                    
                    file_count = conn.execute("SELECT COUNT(*) FROM temp_count").fetchone()[0]
                    conn.execute("DROP VIEW temp_count")
                    
                    if file_count > 0:
                        file_info.append((file_path, file_count))
                        total_files_with_data += 1
                        print(f"  File {i+1}: {os.path.basename(file_path)} - {file_count:,} rows")
                    else:
                        print(f"  File {i+1}: {os.path.basename(file_path)} - EMPTY (skipping)")
                        
                except Exception as e:
                    print(f"  File {i+1}: {os.path.basename(file_path)} - ERROR: {e}")
            
            if not file_info:
                print("❌ No files with data found!")
            else:
                total_records = sum(count for _, count in file_info)
                print(f"\nFiles with data: {len(file_info)}")
                print(f"Total records to process: {total_records:,}")
                
                # Process each partition
                partition_outputs = []
                
                for partition_num in range(NUM_PARTITIONS):
                    print(f"\n=== Processing Partition {partition_num + 1}/{NUM_PARTITIONS} ===")
                    
                    partition_file = f"/tmp/king_programs_partition_{partition_num}.parquet"
                    partition_outputs.append(partition_file)
                    
                    # Create views for this partition from each file
                    partition_views = []
                    partition_records = 0
                    
                    for file_idx, (file_path, file_count) in enumerate(file_info):
                        # Calculate partition boundaries for this file
                        partition_size = file_count // NUM_PARTITIONS
                        start_offset = partition_num * partition_size
                        
                        # Handle the last partition (include remainder)
                        if partition_num == NUM_PARTITIONS - 1:
                            limit = file_count - start_offset  # Take everything remaining
                        else:
                            limit = partition_size
                        
                        if limit <= 0:
                            continue
                            
                        view_name = f"partition_{partition_num}_file_{file_idx}"
                        partition_views.append(view_name)
                        
                        print(f"  File {file_idx + 1}: rows {start_offset:,} to {start_offset + limit:,}")
                        
                        try:
                            conn.execute(f"DROP VIEW IF EXISTS {view_name}")
                            conn.execute(f"""
                                CREATE VIEW {view_name} AS 
                                SELECT {select_clause} 
                                FROM read_parquet('{file_path}')
                                LIMIT {limit} OFFSET {start_offset}
                            """)
                            
                            # Count actual records in this partition
                            actual_count = conn.execute(f"SELECT COUNT(*) FROM {view_name}").fetchone()[0]
                            partition_records += actual_count
                            
                        except Exception as e:
                            print(f"    Error creating view for {os.path.basename(file_path)}: {e}")
                            partition_views.remove(view_name)
                    
                    if partition_views:
                        print(f"  Partition {partition_num + 1} total records: {partition_records:,}")
                        
                        # Create UNION of all partition views
                        union_query = " UNION ALL ".join([f"SELECT * FROM {view}" for view in partition_views])
                        
                        print(f"  Writing partition {partition_num + 1} to disk...")
                        conn.execute(f"COPY ({union_query}) TO '{partition_file}' (FORMAT PARQUET)")
                        
                        # Clean up partition views
                        for view in partition_views:
                            conn.execute(f"DROP VIEW IF EXISTS {view}")
                        
                        print(f"  ✅ Partition {partition_num + 1} completed")
                    else:
                        print(f"  ⚠️ Partition {partition_num + 1} has no data")
                
                # Now combine all partitions into final output
                print(f"\n=== Combining {NUM_PARTITIONS} partitions into final output ===")
                
                existing_partitions = [p for p in partition_outputs if os.path.exists(p)]
                
                if existing_partitions:
                    print(f"Found {len(existing_partitions)} partitions to combine")
                    
                    # Create final union query
                    partition_reads = [f"SELECT * FROM read_parquet('{p}')" for p in existing_partitions]
                    final_union = " UNION ALL ".join(partition_reads)
                    
                    print(f"Writing final merged file to {output_path}...")
                    conn.execute(f"COPY ({final_union}) TO '{output_path}' (FORMAT PARQUET)")
                    
                    # Clean up partition files
                    print("Cleaning up partition files...")
                    for partition_file in existing_partitions:
                        try:
                            os.remove(partition_file)
                        except Exception as e:
                            print(f"  Warning: Could not remove {partition_file}: {e}")
                    
                    print("✅ Partitioned merge completed successfully!")
                    print(f"Output written to: {output_path}")
                    
                    # Verify the output file
                    if os.path.exists(output_path):
                        file_size = os.path.getsize(output_path) / (1024 * 1024)  # Size in MB
                        print(f"Output file size: {file_size:.1f} MB")
                        
                        # Quick verification - check final count
                        final_count = conn.execute(f"SELECT COUNT(*) FROM read_parquet('{output_path}')").fetchone()[0]
                        print(f"Final record count: {final_count:,}")
                        print(f"Expected record count: {total_records:,}")
                        
                        if final_count == total_records:
                            print("✅ Record count matches - merge successful!")
                        else:
                            print(f"⚠️ Record count mismatch - some data may be missing")
                    
                    # Show final schema
                    print("\nFinal merged dataset schema:")
                    final_schema = conn.execute(f"DESCRIBE (SELECT * FROM read_parquet('{output_path}') LIMIT 1)").fetchall()
                    for col_name, col_type, null, key, default, extra in final_schema:
                        print(f"  {col_name}: {col_type}")
                else:
                    print("❌ No partitions were created successfully!")
    else:
        print("❌ No schema information available to validate columns")
        
else:
    print("❌ No compatible files found for merging!")
    print("Please check the schema compatibility issues above.")

Ultra memory-efficient batch merge of 18 parquet files...
Batch size: 100,000 rows
Required columns: ['task_id', 'code', 'predicted_train_output', 'predicted_test_output', 'correct_train_input', 'correct_test_input', 'model']
Note: No deduplication will be performed - raw merge only

Available columns in files:
  ['task_id', 'reasoning', 'code', 'correct_train_input', 'correct_test_input', 'predicted_train_output', 'predicted_test_output', 'train_input', 'test_input', 'model', 'generation']
  ✅ All required columns present

Proceeding with ultra memory-efficient batch processing...

Processing file 1/18
  File: train-00010-of-00011.parquet
  Total records in file: 30,165
  Processing in 1 batches of 100,000 rows
    Batch 1/1: rows 0 to 30,165
    Creating initial parquet file...
    Batch processed: 30,165 rows
  File completed: 30,165 rows processed
  Cumulative records: 30,165

Processing file 2/18
  File: train-00001-of-00011.parquet
  Total records in file: 30,166
  Processing in 