In [15]:
import polars as pl
import gzip
from io import StringIO
import os
import psutil
from multiprocessing import Pool, cpu_count
import threading
import time
import pyarrow as pa
import pyarrow.parquet as pq
from typing import Iterator, List, Tuple
import logging
from contextlib import contextmanager

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [16]:
input_file = "../data/corpus/msmarco-docs.json.gz"
output_dir = "../data/corpus/parquet_chunks/"
os.makedirs(output_dir, exist_ok=True)

In [17]:
@contextmanager
def monitor_memory(interval: int = 5):
    """Context manager for memory monitoring."""
    stop_thread = threading.Event()
    
    def _monitor():
        while not stop_thread.is_set():
            mem = psutil.virtual_memory()
            swap = psutil.swap_memory()
            logger.info(
                f"RAM: {mem.used/1e9:.1f}/{mem.total/1e9:.1f}GB "
                f"Swap: {swap.used/1e9:.1f}/{swap.total/1e9:.1f}GB"
            )
            time.sleep(interval)
    
    thread = threading.Thread(target=_monitor, daemon=True)
    try:
        thread.start()
        yield
    finally:
        stop_thread.set()
        thread.join(timeout=1)

In [18]:
def estimate_optimal_chunk_size(
    sample_size: int = 1000,
    target_chunk_memory: float = 0.2,  # GB
    input_file: str = None
) -> int:
    """Estimate optimal chunk size based on sample data."""
    if not input_file:
        return 10000  # Default fallback
        
    total_memory = psutil.virtual_memory().total / 1e9  # GB
    target_memory = total_memory * target_chunk_memory
    
    # Sample some lines to estimate average row size
    with gzip.open(input_file, 'rt') as f:
        sample = [next(f) for _ in range(sample_size)]
    
    avg_row_size = sum(len(line.encode()) for line in sample) / len(sample)
    chunk_size = int((target_memory * 1e9) / avg_row_size)
    
    return max(1000, min(chunk_size, 30000))

In [19]:
def read_compressed_json(
    file_path: str,
    chunk_size: int
) -> Iterator[List[str]]:
    """Generator to efficiently read compressed JSON file in chunks."""
    batch = []
    with gzip.open(file_path, 'rt') as f:
        for line in f:
            batch.append(line.strip())
            if len(batch) >= chunk_size:
                yield batch
                batch = []
    if batch:
        yield batch

def process_chunk(chunk_data: Tuple[List[str], int, str]) -> None:
    """Process a single chunk of data.
    
    Args:
        chunk_data: Tuple containing (data_batch, chunk_index, output_directory)
    """
    batch, chunk_idx, output_dir = chunk_data
    try:
        # Convert directly to Arrow Table for better memory efficiency
        df = pl.read_ndjson(StringIO("\n".join(batch)))
        table = df.to_arrow()
        
        chunk_file = os.path.join(output_dir, f"chunk_{chunk_idx:05d}.parquet")
        pq.write_table(
            table,
            chunk_file,
            compression='snappy',
            row_group_size=30000
        )
        logger.info(f"Saved chunk {chunk_idx} to {chunk_file}")
        
    except Exception as e:
        logger.error(f"Error processing chunk {chunk_idx}: {str(e)}")
        raise

In [20]:
def convert_json_to_parquet(
    input_file: str,
    output_dir: str,
    chunk_size: int = estimate_optimal_chunk_size(input_file=input_file),
    n_workers: int = max(1, cpu_count() - 1)
) -> None:
    """Main conversion function with improved error handling and resource management."""
    
    logger.info(f"Starting conversion with chunk_size={chunk_size}, workers={n_workers}")
    
    with monitor_memory():
        try:
            chunks = read_compressed_json(input_file, chunk_size)
            with Pool(n_workers) as pool:
                # Process chunks with progress tracking
                for i, _ in enumerate(
                    pool.imap_unordered(
                        process_chunk,
                        ((chunk, idx, output_dir) for idx, chunk in enumerate(chunks))
                    )
                ):
                    if (i + 1) % 10 == 0:
                        logger.info(f"Processed {i + 1} chunks")
                        
        except Exception as e:
            logger.error(f"Conversion failed: {str(e)}")
            raise
        finally:
            logger.info("Conversion completed")

In [21]:
convert_json_to_parquet(input_file, output_dir)


INFO:__main__:Starting conversion with chunk_size=30000, workers=7
INFO:__main__:RAM: 1.8/7.2GB Swap: 1.0/36.5GB
INFO:__main__:RAM: 3.6/7.2GB Swap: 1.0/36.5GB
INFO:__main__:Saved chunk 0 to ../data/corpus/parquet_chunks/chunk_00000.parquet
INFO:__main__:Saved chunk 1 to ../data/corpus/parquet_chunks/chunk_00001.parquet
INFO:__main__:RAM: 4.4/7.2GB Swap: 1.0/36.5GB
INFO:__main__:Saved chunk 2 to ../data/corpus/parquet_chunks/chunk_00002.parquet
INFO:__main__:Saved chunk 3 to ../data/corpus/parquet_chunks/chunk_00003.parquet
INFO:__main__:RAM: 4.4/7.2GB Swap: 1.0/36.5GB
INFO:__main__:Saved chunk 4 to ../data/corpus/parquet_chunks/chunk_00004.parquet
INFO:__main__:RAM: 5.1/7.2GB Swap: 1.3/36.5GB
INFO:__main__:Saved chunk 5 to ../data/corpus/parquet_chunks/chunk_00005.parquet
INFO:__main__:Saved chunk 6 to ../data/corpus/parquet_chunks/chunk_00006.parquet
INFO:__main__:RAM: 6.1/7.2GB Swap: 1.5/36.5GB
INFO:__main__:Saved chunk 7 to ../data/corpus/parquet_chunks/chunk_00007.parquet
INFO:__ma

In [22]:
import pyarrow.parquet as pq
import pyarrow as pa
from tqdm import tqdm
import glob

def merge_parquet_files_streaming(output_dir: str, final_path: str) -> None:
    """Merge Parquet chunks into a single file incrementally."""
    chunk_files = glob.glob(os.path.join(output_dir, "*.parquet"))
    logger.info(f"Found {len(chunk_files)} chunks to merge")

    with tqdm(total=len(chunk_files), desc="Merging chunks") as pbar:
        writer = None
        for file in chunk_files:
            table = pq.read_table(file)  # Read as Arrow Table
            if writer is None:
                writer = pq.ParquetWriter(final_path, table.schema, compression="snappy")
            writer.write_table(table)  # Append to the output file
            pbar.update(1)
        
        if writer:
            writer.close()  # Finalize the Parquet file
    
    logger.info(f"Merged all chunks into: {final_path}")


In [23]:
output_dir = "../data/corpus/cleaned_chunks/"
merge_parquet_files_streaming(output_dir, "../data/corpus/cleaned_msmarco-docs.parquet")

INFO:__main__:Found 108 chunks to merge


Merging chunks: 100%|██████████| 108/108 [02:40<00:00,  1.49s/it]
INFO:__main__:Merged all chunks into: ../data/corpus/cleaned_msmarco-docs.parquet
