In [8]:
import os
import sys
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

def merge_shuffle_split_csvs(input_folder, output_folder, block_size=(1024*1024*1024*10), seed=42):
    """
    Merge all CSV files in the input folder, shuffle the merged data, and split the result into 10GB blocks using Dask.

    Parameters:
    input_folder (str): Path to the folder containing the CSV files to be merged.
    output_folder (str): Path to the folder where the output files will be saved.
    block_size (int): Maximum size of each output file in bytes (default: 10GB).
    seed (int): Random seed for reproducibility (default: 42).
    """
    # Set up Dask client
    cluster = LocalCluster(n_workers=4, threads_per_worker=2, memory_limit="8GB")
    client = Client(cluster)

    # Read all CSV files into a Dask DataFrame
    dfs = [dd.read_csv(os.path.join(input_folder, filename)) for filename in os.listdir(input_folder) if filename.endswith('.csv')]
    merged_df = dd.concat(dfs, ignore_index=True)

    # Shuffle the merged DataFrame
    np.random.seed(seed)
    merged_df = merged_df.sample(frac=1, random_state=seed).reset_index(drop=True)

    # Split the shuffled DataFrame into blocks
    block_count = 0
    current_block_size = 0
    current_block = []
    for _, row in merged_df.iterrows():
        row_size = sys.getsizeof(row.to_string())
        if current_block_size + row_size <= block_size:
            current_block.append(row)
            current_block_size += row_size
        else:
            block_count += 1
            block_filename = os.path.join(output_folder, f'output_block_{block_count}.csv')
            pd.DataFrame(current_block).to_csv(block_filename, index=False)
            print(f'Wrote {len(current_block)} rows to {block_filename}')
            current_block = [row]
            current_block_size = row_size

    # Write the final block
    if current_block:
        block_count += 1
        block_filename = os.path.join(output_folder, f'output_block_{block_count}.csv')
        pd.DataFrame(current_block).to_csv(block_filename, index=False)
        print(f'Wrote {len(current_block)} rows to {block_filename}')

    print(f'Created {block_count} output files in {output_folder}')

    # Close the Dask client
    client.close()
    cluster.close()




In [None]:

merge_shuffle_split_csvs("dataset\CSECICIDS2018_improved", "dataset\CSECICIDS2018_improved", block_size=1024*1024*1024*10, seed=42)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 53723 instead
