In [2]:
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import matplotlib as plt
import gc
import psutil
import os

import Parquet files, convert to Pandas Dataframe, and merge into one file, for ease of exploratory analysis and cleaning

In [3]:
def get_memory_usage():
    process = psutil.Process()
    mem_info = process.memory_info()
    return mem_info.rss / (1024 ** 3)  # Return memory usage in GB

print(f"Initial memory usage: {get_memory_usage():.2f} GB")

# create a list of parquet file paths
file_paths = []
file_start = "./raw_data/yellow_tripdata_2023-"
for m in range(1, 13):
    month = str(m).zfill(2)
    path = f"{file_start}{month}.parquet"
    file_paths.append(path)

# adjust chunk size as needed to keep memory in usable limits
chunk_size = 500000

output_file = "./raw_data/y_2023.parquet"

# ensure the output file doesn't already exist
if os.path.exists(output_file):
    os.remove(output_file)

# initialize ParquetWriter
writer = None
schema = None
schema_fields = None

# process each file in chunks and write to the output file incrementally
for file_path in file_paths:
    parquet_file = pq.ParquetFile(file_path)
    total_rows = parquet_file.metadata.num_rows

    for start_row in range(0, total_rows, chunk_size):
        end_row = min(start_row + chunk_size, total_rows)
        # read the chunk into a DataFrame
        table = parquet_file.read_row_group(0, columns=None, use_threads=True, use_pandas_metadata=True)
        df = table.to_pandas().iloc[start_row:end_row]

        # standardize schema if not yet initialized
        if schema is None:
            schema = pa.Table.from_pandas(df).schema
            schema_fields = {field.name: field for field in schema}

        # add any missing columns to DataFrame
        for field_name, field in schema_fields.items():
            if field_name not in df.columns:
                df[field_name] = None

        # ensure the chunk conforms to the schema
        table = pa.Table.from_pandas(df, schema=schema)

        # write the chunk to the Parquet file
        if writer is None:
            # open the ParquetWriter only for the first chunk
            writer = pq.ParquetWriter(output_file, schema)
        writer.write_table(table)

        # delete DataFrame to free up memory
        del df

        print(f"Processed rows {start_row} to {end_row} from {file_path}. Current memory usage: {get_memory_usage():.2f} GB")

if writer:
    writer.close()

print(f"Merged Parquet file written to {output_file}. Current memory usage: {get_memory_usage():.2f} GB")

Initial memory usage: 0.17 GB
Processed rows 0 to 500000 from ./raw_data/yellow_tripdata_2023-01.parquet. Current memory usage: 1.23 GB
Processed rows 500000 to 1000000 from ./raw_data/yellow_tripdata_2023-01.parquet. Current memory usage: 1.70 GB
Processed rows 1000000 to 1500000 from ./raw_data/yellow_tripdata_2023-01.parquet. Current memory usage: 1.97 GB
Processed rows 1500000 to 2000000 from ./raw_data/yellow_tripdata_2023-01.parquet. Current memory usage: 1.67 GB
Processed rows 2000000 to 2500000 from ./raw_data/yellow_tripdata_2023-01.parquet. Current memory usage: 1.53 GB
Processed rows 2500000 to 3000000 from ./raw_data/yellow_tripdata_2023-01.parquet. Current memory usage: 1.56 GB
Processed rows 3000000 to 3066766 from ./raw_data/yellow_tripdata_2023-01.parquet. Current memory usage: 1.88 GB
Processed rows 0 to 500000 from ./raw_data/yellow_tripdata_2023-02.parquet. Current memory usage: 1.63 GB
Processed rows 500000 to 1000000 from ./raw_data/yellow_tripdata_2023-02.parquet.