# Merge 110 CSV files into a large dataset for further processing.

## Intro

**Case:**

The dataset of 4 million rows was extracted from GCP, in small portions, about 40,000 rows in CSV format. Totaling 110 files. The files are named as follows:

* data_000000000001.csv
* data_000000000002.csv

    ...

* data_000000000110.csv

Develop a method for uploading all these 110 files using Jupyter Notebook and merging them into a single dataset for further processing.

**Plan:**

1. Import necessary libraries for merging CSV files.
2. Time different methods of merging and select the fastest. Methods to compare:
    * Pandas;
    * Multiprocessing.
3. Select the fastest and most convenient method.

Notes:
1. 110 CSV files for testing were generated before, see 'create_csv_files.ipynb';
2. All the methods will merge all the CSV files in a given directory, so if a number of CSV files doesn't match 110 the methods are still usable;
3. This work is done under the assumption that merging needs to be performed in Jupyter. There are other options to concatenate files in a given directory like using console commands in OS.

## Imports

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import os
import pandas as pd
from multiprocessing import Pool
import time

Check amount of CPU cores the computer has.

In [3]:
cores = os.cpu_count()
cores

2

We have 2 cores to work with.

Constants.

In [4]:
DIRECTORY_PATH = '/content/drive/MyDrive/test_thumos/csv/'
OUTPUT_PATH_CSV = '/content/drive/MyDrive/test_thumos/merged_df.csv'
OUTPUT_PATH_PA = '/content/drive/MyDrive/test_thumos/merged_df.parquet'
NPARTITIONS = cores
PROCESSES = cores
REPETITIONS = 5

## Timing

Making a function that would time proposed methods.

In [5]:
def timing(func, *args, repetitions=REPETITIONS, **kwargs):
    '''
    Timing function takes a function to time, arguments for that function and the number of repetitions.
    It returns the result of the function and the time it took a method to complete the task.
    '''
    total_time = 0
    for _ in range(repetitions):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        total_time += end_time - start_time
    average_time = total_time / repetitions
    return result, average_time

## Merging methods

### Pandas

Using most popular library for data handling in Python without any additional libraries.

In [6]:
def pandas_merge(directory_path):
    '''
    The function takes a path to CSV files to merge and returns a merged DataFrame.
    '''
    merged_df_pandas = pd.DataFrame()
    file_list = [os.path.join(directory_path, filename) for filename in os.listdir(directory_path) if filename.endswith('.csv')]
    for filename in file_list:
        df = pd.read_csv(filename)
        merged_df_pandas = pd.concat([merged_df_pandas, df], ignore_index=True)
    return merged_df_pandas

### Multiprocessing

Adding parallel tasking using Multiprocessing.

In [7]:
def multiprocessing_merge(directory_path, processes):
    '''
    The function takes a path to CSV files to merge and returns a merged DataFrame.
    '''
    pool = Pool(processes=processes)
    file_list = [os.path.join(directory_path, filename) for filename in os.listdir(directory_path) if filename.endswith('.csv')]
    data_frames = pool.map(pd.read_csv, file_list)
    merged_df_mult = pd.concat(data_frames, ignore_index=True)
    return merged_df_mult

## Testing

Checking number of rows in all CSV file in a directory.

In [8]:
rows_all = 0

file_list = [os.path.join(DIRECTORY_PATH, filename) for filename in os.listdir(DIRECTORY_PATH) if filename.endswith('.csv')]
for filename in file_list:
    try:
        df = pd.read_csv(filename)
        rows = len(df)
        rows_all += rows
    except Exception as e:
        print(f'File: {filename}, Error: {str(e)}')

In [9]:
print(f'Number of rows in all CSV files: {rows_all}')

Number of rows in all CSV files: 4400000


**Pandas**

In [10]:
merged_df_pandas, pandas_execution_time = timing(pandas_merge, directory_path=DIRECTORY_PATH)

In [11]:
if len(merged_df_pandas) == rows_all:
    print('Merge has been successful.')
else: print('Error')

Merge has been successful.


**Multiprocessing**

In [12]:
merged_df_multiprocessing, multiprocessing_execution_time = timing(multiprocessing_merge,
                                                                   directory_path=DIRECTORY_PATH,
                                                                   processes=PROCESSES)

In [13]:
if len(merged_df_multiprocessing) == rows_all:
    print('Merge has been successful.')
else: print('Error')

Merge has been successful.


**Results**

In [14]:
print(f'Pandas took {pandas_execution_time:.2f} seconds.')
print(f'Multiprocessing took {multiprocessing_execution_time:.2f} seconds.')

Pandas took 7.61 seconds.
Multiprocessing took 4.27 seconds.


## Result

The best method according to this experiment is Multiprocessing with Pandas.

**The final code would be:**

In [15]:
import os
import pandas as pd
from multiprocessing import Pool

In [16]:
def multiprocessing_merge(directory_path, processes):
    '''
    The function takes a path to CSV files to merge and returns a merged DataFrame.
    '''
    pool = Pool(processes=processes)
    file_list = [os.path.join(directory_path, filename) for filename in os.listdir(directory_path) if filename.endswith('.csv')]
    data_frames = pool.map(pd.read_csv, file_list)
    merged_df_mult = pd.concat(data_frames, ignore_index=True)
    return merged_df_mult

In [17]:
merged_df = multiprocessing_merge(directory_path=DIRECTORY_PATH, processes=PROCESSES)

In [18]:
display(merged_df.head())
display(merged_df.tail())

Unnamed: 0,Column_1,Column_2,Column_3
0,0.984022,0.415212,0.838954
1,0.561004,0.896267,0.9224
2,0.571279,0.69283,0.548028
3,0.584213,0.81662,0.693794
4,0.520904,0.208829,0.983191


Unnamed: 0,Column_1,Column_2,Column_3
4399995,0.19707,0.987627,0.691759
4399996,0.289735,0.490333,0.311303
4399997,0.037902,0.883471,0.4178
4399998,0.269393,0.913051,0.773729
4399999,0.533581,0.162291,0.872319


In [19]:
merged_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4400000 entries, 0 to 4399999
Data columns (total 3 columns):
 #   Column    Dtype  
---  ------    -----  
 0   Column_1  float64
 1   Column_2  float64
 2   Column_3  float64
dtypes: float64(3)
memory usage: 100.7 MB


In [20]:
rows_all = 0

for filename in file_list:
    try:
        df = pd.read_csv(filename)
        rows = len(df)
        rows_all += rows
    except Exception as e:
        print(f'File: {filename}, Error: {str(e)}')

In [21]:
print(f'Number of rows in all CSV files: {rows_all}')

Number of rows in all CSV files: 4400000


In [22]:
if len(merged_df) == rows_all:
    print('Merge has been successful.')
else: print('Error')

Merge has been successful.


If it's needed to save merged CSV file the code below would do it.

In [23]:
merged_df.to_csv(OUTPUT_PATH_CSV, index=False)

In [24]:
if os.path.exists(OUTPUT_PATH_CSV):
    print(f'CSV file saved successfully to: {OUTPUT_PATH_CSV}')
else:
    print('CSV file not saved.')

CSV file saved successfully to: /content/drive/MyDrive/test_thumos/merged_df.csv


If the merged file size is to large we can save it as .parquet file.

In [25]:
merged_df.to_parquet(OUTPUT_PATH_PA, index=False)


In [26]:
if os.path.exists(OUTPUT_PATH_PA):
    print(f'CSV file saved successfully to: {OUTPUT_PATH_PA}')
else:
    print('CSV file not saved.')

CSV file saved successfully to: /content/drive/MyDrive/test_thumos/merged_df.parquet


Comparison of file_sizes.

In [27]:
!du -h {OUTPUT_PATH_CSV} {OUTPUT_PATH_PA}

238M	/content/drive/MyDrive/test_thumos/merged_df.csv
102M	/content/drive/MyDrive/test_thumos/merged_df.parquet


Best method for merging dataframes is Multiprocessing with Pandas. To save storae space it's better to save merged dataframe as .parquet.