In [None]:
import os
import dask.dataframe as dd
from dask.distributed import Client
import concurrent.futures
import psutil
import pandas as pd

def log_memory_usage():
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    print(f"Memory usage: {mem_info.rss / (1024 * 1024):.2f} MB")

def read_parquet_file(filepath):
    return dd.read_parquet(filepath, engine='pyarrow')

def read_and_merge_parquet_files(directory):
    log_memory_usage()

    # Initialize Dask client
    client = Client(n_workers=12, threads_per_worker=1)

    # Get all parquet files
    files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.parquet')]

    # Read files in parallel
    with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
        futures = [executor.submit(read_parquet_file, file) for file in files]
        dfs = [future.result() for future in concurrent.futures.as_completed(futures)]

    # Concatenate DataFrames
    result_df = dd.concat(dfs, axis=0).persist()

    log_memory_usage()

    # Compute the result
    result_df = result_df.compute()

    client.close()

    log_memory_usage()

    return result_df

def main():
    directory = '/mnt/f/parquet/ZZ/FWU1/HHE.D'
    merged_dataframe = read_and_merge_parquet_files(directory)

    if merged_dataframe is not None:
        print("Merged DataFrame shape:", merged_dataframe.shape)
        print("\nDataFrame Information:")
        print(merged_dataframe.info())
        print(merged_dataframe.head())
        print(merged_dataframe.tail())

        print("\nData Statistics:")
        print(merged_dataframe.describe())
    else:
        print("Failed to merge DataFrames.")

if __name__ == "__main__":
    main()


Total number of files found: 65

Found 65 files in /mnt/f/parquet/ZZ/FWU1/HHE.D
Batch 1 processed.


: 