In [1]:
import os
import json
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from tqdm import tqdm
from pathlib import Path

# Directory containing the JSON files
DATA_DIR = "DataUnzip"
OUTPUT_DIR = "parquet_database"

# Progress tracking file
PROGRESS_FILE = "progress_log.json"
ERROR_LOG_FILE = "error_log.json"

# Partition column (e.g., receivedate if available)
PARTITION_COLUMN = "receivedate"
MAX_PARTITIONS = 1024  # Threshold for maximum allowable partitions


def load_progress():
    """Load progress from log file."""
    if os.path.exists(PROGRESS_FILE):
        with open(PROGRESS_FILE, "r") as f:
            return json.load(f)
    return {"processed_files": [], "processed_size": 0}

def save_progress(progress):
    """Save progress to log file."""
    with open(PROGRESS_FILE, "w") as f:
        json.dump(progress, f)

def log_error(file_path, error_message):
    """Log errors to the error log file."""
    error_log = []
    if os.path.exists(ERROR_LOG_FILE):
        with open(ERROR_LOG_FILE, "r") as f:
            error_log = json.load(f)
    error_log.append({"file": file_path, "error": error_message})
    with open(ERROR_LOG_FILE, "w") as f:
        json.dump(error_log, f, indent=4)

def get_total_size_and_files(data_dir):
    """Get total size and list of all JSON files."""
    total_size = 0
    all_files = []
    for root, _, files in os.walk(data_dir):
        for file in files:
            if file.endswith(".json"):
                full_path = os.path.join(root, file)
                total_size += os.path.getsize(full_path)
                all_files.append(full_path)
    return total_size, all_files

def process_file_to_dataframe(file_path):
    """Process a single JSON file into a Pandas DataFrame."""
    with open(file_path, "r") as f:
        data = json.load(f)
        
        # Flatten JSON into a Pandas DataFrame
        if isinstance(data, list):
            df = pd.json_normalize(data)
        elif isinstance(data, dict) and "results" in data:
            df = pd.json_normalize(data["results"])
        else:
            df = pd.json_normalize([data])  # Single record
        
        # Convert receivedate to Year-Month for lower cardinality
        if PARTITION_COLUMN in df.columns:
            df[PARTITION_COLUMN] = pd.to_datetime(
                df[PARTITION_COLUMN], format='%Y%m%d', errors='coerce'
            ).dt.to_period('M').astype(str)  # Convert to YYYY-MM format
        return df

def save_to_parquet(df, output_dir, partition_column=None):
    """Save DataFrame to Parquet with optional partitioning."""
    # Determine the number of unique partitions
    num_partitions = df[partition_column].nunique() if partition_column in df.columns else 0

    # Skip partitioning if too many partitions
    if partition_column and num_partitions > MAX_PARTITIONS:
        print(f"Warning: Skipping partitioning for file due to {num_partitions} unique partitions.")
        table = pa.Table.from_pandas(df)
        pq.write_table(table, os.path.join(output_dir, "non_partitioned.parquet"), existing_data_behavior="overwrite_or_ignore")
    else:
        table = pa.Table.from_pandas(df)
        pq.write_to_dataset(
            table,
            root_path=output_dir,
            partition_cols=[partition_column] if partition_column else None,
            existing_data_behavior="overwrite_or_ignore"
        )

def main():
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    # Load progress
    progress = load_progress()

    # Get total size and list of files
    total_size, all_files = get_total_size_and_files(DATA_DIR)
    processed_size = progress["processed_size"]
    processed_files = set(progress["processed_files"])

    print(f"Total files: {len(all_files)}, Total size: {total_size / (1024 ** 3):.2f} GB")

    # Initialize progress bar
    with tqdm(total=total_size, initial=processed_size, unit="B", unit_scale=True, desc="Processing JSONs") as pbar:
        for file_path in all_files:
            file_name = Path(file_path).name

            if file_name in processed_files:
                continue  # Skip already processed files

            try:
                # Convert JSON to Pandas DataFrame
                df = process_file_to_dataframe(file_path)

                # Save to partitioned Parquet
                save_to_parquet(df, OUTPUT_DIR, partition_column=PARTITION_COLUMN)

                file_size = os.path.getsize(file_path)
                processed_size += file_size
                processed_files.add(file_name)

                # Update progress
                progress["processed_size"] = processed_size
                progress["processed_files"] = list(processed_files)
                save_progress(progress)

                pbar.update(file_size)
            except Exception as e:
                error_message = str(e)
                log_error(file_path, error_message)
                print(f"Error processing {file_path}: {error_message}")

    # Verify completion
    completion_percentage = len(processed_files) / len(all_files) * 100
    print(f"Processing complete! {completion_percentage:.2f}% of files have been converted to Parquet.")
    if len(processed_files) < len(all_files):
        print(f"Missing files: {len(all_files) - len(processed_files)}")

if __name__ == "__main__":
    main()


Total files: 1569, Total size: 480.86 GB


Processing JSONs:   4%|▎         | 18.5G/516G [03:30<1:31:13, 91.0MB/s]

Error processing DataUnzip/226-drug-event-0003-of-0012.json/._drug-event-0003-of-0012.json: 'utf-8' codec can't decode byte 0xb0 in position 37: invalid start byte


Processing JSONs:  30%|██▉       | 153G/516G [32:38<1:17:15, 78.3MB/s] 


Processing complete! 34.54% of files have been converted to Parquet.
Missing files: 1027
