In [None]:
# !pip install pyarrow --quiet

In [1]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import gzip
import concurrent.futures

In [2]:
DATA_DIR_1, OUTPUT_DIR_1 = "amazon_reviews", "amazon_reviews_parquet"
DATA_DIR_2, OUTPUT_DIR_2  = "amazon_metadata", "amazon_metadata_parquet"

os.makedirs(OUTPUT_DIR_1, exist_ok=True)
os.makedirs(OUTPUT_DIR_2, exist_ok=True)

In [5]:
def convert_to_parquet(file_path, output_dir):
    
    with gzip.open(file_path) as f:
        data = [json.loads(line) for line in f]
    df = pd.DataFrame(data)

    table = pa.Table.from_pandas(df)

    parquet_file = os.path.join(output_dir, os.path.basename(file_path).replace('.json.gz', '.parquet'))
    pq.write_table(table, parquet_file)

#### Reviews

In [6]:
%%time
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
    futures = []
    for subdir, dirs, files in os.walk(DATA_DIR_1):
        for file in files:
            if file.endswith('.json.gz'):
                file_path = os.path.join(subdir, file)
                
                futures.append(executor.submit(convert_to_parquet, file_path, OUTPUT_DIR_1))

    for future in concurrent.futures.as_completed(futures):
        try:
            future.result() 
        except Exception as exc:
            print(exc)

CPU times: total: 51min 51s
Wall time: 41min 11s


#### Metadata

In [13]:
%%time
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
    futures = []
    for subdir, dirs, files in os.walk(DATA_DIR_2):
        for file in files:
            if file.endswith('.json.gz'):
                file_path = os.path.join(subdir, file)
                
                futures.append(executor.submit(convert_to_parquet, file_path, OUTPUT_DIR_2))

    for future in concurrent.futures.as_completed(futures):
        try:
            future.result() 
        except Exception as exc:
            print(exc)

CPU times: total: 1h 41min 24s
Wall time: 1h 35min 30s
