# 02 · Data Conversion: CSV → Parquet → DuckDB

This notebook handles the conversion of raw CSV data to optimized Parquet format and creates a DuckDB database for fast queries.

## Conversion Strategy
1. **Stream Processing**: Read CSV in chunks to handle large file
2. **Data Sanitization**: Clean and standardize barcode sequences
3. **Partitioning**: Split into multiple Parquet files for parallel processing
4. **Database Views**: Create DuckDB views for efficient querying


In [None]:
# Import required libraries
import duckdb
import pandas as pd
from pathlib import Path
import sys

root = Path("/home/mch/dna")
sys.path.append(str(root / "scripts"))

# Paths
DATA_DIR = root / "DNA-Data for Telhai" / "2023-05-11"
ARTIFACTS = root / "artifacts"
clusters_csv = DATA_DIR / "clusters.csv"
parquet_dir = ARTIFACTS / "clusters_parquet"
db_path = ARTIFACTS / "dna.duckdb"

print(f"Source CSV: {clusters_csv}")
print(f"Target Parquet: {parquet_dir}")
print(f"Target DB: {db_path}")

In [None]:
# Option 1: Run the conversion script
# Uncomment to run the full conversion (may take a few minutes)
# !python /home/mch/dna/scripts/convert_to_parquet.py 2>&1 | tee /home/mch/dna/logs/conversion.log

In [None]:
# Option 2: Manual conversion with progress tracking
from tqdm.notebook import tqdm

def convert_csv_to_parquet(csv_path, parquet_dir, chunksize=500_000):
    """Convert CSV to Parquet with chunking and progress bar."""
    parquet_dir.mkdir(parents=True, exist_ok=True)
    
    # Count total rows for progress bar
    with open(csv_path, 'r') as f:
        total_rows = sum(1 for _ in f) - 1
    
    chunks = pd.read_csv(csv_path, chunksize=chunksize)
    
    with tqdm(total=total_rows, desc="Converting") as pbar:
        for i, chunk in enumerate(chunks):
            # Sanitize barcode column
            if 'barcode' in chunk.columns:
                chunk['barcode'] = (chunk['barcode']
                                    .astype('string')
                                    .str.upper()
                                    .str.replace(r"[^ACGT]", "", regex=True))
            
            # Write to parquet
            out_path = parquet_dir / f'part_{i:05d}.parquet'
            chunk.to_parquet(out_path, index=False)
            pbar.update(len(chunk))
    
    return i + 1  # Return number of parts created

# Uncomment to run conversion
# num_parts = convert_csv_to_parquet(clusters_csv, parquet_dir)
# print(f"Created {num_parts} parquet files")

In [None]:
# Connect to DuckDB and create/verify views
con = duckdb.connect(str(db_path))

# Install and load parquet extension
con.execute("INSTALL parquet; LOAD parquet;")

# Create main view
con.execute(
    "CREATE OR REPLACE VIEW clusters AS SELECT * FROM read_parquet(?)",
    [str(parquet_dir / "*.parquet")]
)

# Verify the view
result = con.sql("SELECT COUNT(*) as total_rows FROM clusters")
print("Database view created successfully!")
result.show()

In [None]:
# Create additional analytical views
con.execute("""
    CREATE OR REPLACE VIEW barcode_stats AS
    SELECT 
        COUNT(*) as total_rows,
        COUNT(DISTINCT barcode) as unique_barcodes,
        COUNT(CASE WHEN barcode IS NULL OR barcode = '' THEN 1 END) as null_barcodes,
        COUNT(CASE WHEN barcode IS NOT NULL AND barcode != '' THEN 1 END) as valid_barcodes
    FROM clusters
""")

con.execute("""
    CREATE OR REPLACE VIEW barcode_lengths AS
    SELECT 
        LENGTH(barcode) as barcode_length,
        COUNT(*) as count
    FROM clusters
    WHERE barcode IS NOT NULL AND barcode != ''
    GROUP BY 1
    ORDER BY 2 DESC
""")

print("Analytical views created:")
print("- barcode_stats: Overall statistics")
print("- barcode_lengths: Distribution of barcode lengths")

# Show stats
con.sql("SELECT * FROM barcode_stats").show()

In [None]:
# Close connection
con.close()
print("Database setup complete!")