In [1]:
import gc
import os
import psutil
import pyarrow as pa
import pandas as pd

In [2]:
pd.set_option('display.max_columns', None)

In [3]:
# Function to get memory usage
def get_memory_usage():
    process = psutil.Process()
    mem_info = process.memory_info()
    return mem_info.rss / (1024 * 1024)  # Convert to megabytes

In [4]:
# Define the directory path
data_dir = os.path.join('C:\\', 'Users', 'KonuTech', 'zoomcamp-capstone-01', 'data')
data_dir

'C:\\Users\\KonuTech\\zoomcamp-capstone-01\\data'

In [5]:
parquet_dir = os.path.join(data_dir, 'parquet_partitions')
parquet_dir

'C:\\Users\\KonuTech\\zoomcamp-capstone-01\\data\\parquet_partitions'

In [6]:
# List of file names to remove
files_to_remove = ['test_data.parquet']
# Remove the files if they exist
for file_name in files_to_remove:
    file_path = os.path.join(data_dir, file_name)
    if os.path.exists(file_path):
        os.remove(file_path)

In [7]:
if os.path.exists(parquet_dir) and os.path.isdir(parquet_dir):
    # Directory exists, list all files in the directory
    file_list = os.listdir(parquet_dir)
    
    if file_list:
        print("List of files in the directory:")
        for filename in file_list:
            print(filename)
        
        # Delete all files in the directory
        for filename in file_list:
            file_path = os.path.join(parquet_dir, filename)
            os.remove(file_path)
            print(f"Deleted: {filename}")
    else:
        print("The directory is empty, no files to delete.")
else:
    print(f"The directory '{parquet_dir}' does not exist.")

The directory is empty, no files to delete.


In [8]:
# Step 1: Read the CSV file in chunks
csv_file = 'test_data.csv'

In [9]:
# Step 2: Initialize an empty list to store the Parquet partition file paths
parquet_file_paths = []

In [10]:
chunk_size = 100000  # Adjust the chunk size as needed
i = 0  # Initialize the chunk number
cumulative_rows = 0  # Initialize the cumulative row count

if not os.path.exists(parquet_dir):
    os.makedirs(parquet_dir)

In [11]:
# Create a TextFileReader, which is iterable with chunks of 10,000 rows.
csv_iterator = pd.read_csv(os.path.join(data_dir, csv_file), iterator=True, chunksize=chunk_size)

parquet_file_paths = []  # Initialize the list to store Parquet partition file paths

In [12]:
# Iterate through the CSV file in chunks using pd.read_csv
for chunk in csv_iterator:
    # Display memory usage before reading the chunk
    before_memory = get_memory_usage()

    # Count and print the number of rows in the chunk
    num_rows = len(chunk)
    cumulative_rows += num_rows  # Accumulate the row count
    print(f"Processing chunk {i}, rows: {num_rows}, cumulative rows: {cumulative_rows}")

    # Save the chunk as a Parquet partition
    parquet_partition_file = os.path.join(parquet_dir, f'chunk_{i}.parquet')
    chunk.to_parquet(parquet_partition_file, index=False)

    # Append the Parquet partition file path to the list
    parquet_file_paths.append(parquet_partition_file)

    # Display memory usage after reading and saving the chunk
    after_memory = get_memory_usage()
    print(f"Memory usage before chunk: {before_memory:.2f} MB")
    print(f"Memory usage after chunk: {after_memory:.2f} MB")

    i += 1  # Increment the chunk number

Processing chunk 0, rows: 100000, cumulative rows: 100000
Memory usage before chunk: 253.40 MB
Memory usage after chunk: 296.80 MB
Processing chunk 1, rows: 100000, cumulative rows: 200000
Memory usage before chunk: 298.02 MB
Memory usage after chunk: 317.38 MB
Processing chunk 2, rows: 100000, cumulative rows: 300000
Memory usage before chunk: 316.78 MB
Memory usage after chunk: 333.97 MB
Processing chunk 3, rows: 100000, cumulative rows: 400000
Memory usage before chunk: 332.84 MB
Memory usage after chunk: 335.82 MB
Processing chunk 4, rows: 100000, cumulative rows: 500000
Memory usage before chunk: 334.64 MB
Memory usage after chunk: 352.76 MB
Processing chunk 5, rows: 100000, cumulative rows: 600000
Memory usage before chunk: 352.62 MB
Memory usage after chunk: 367.92 MB
Processing chunk 6, rows: 100000, cumulative rows: 700000
Memory usage before chunk: 365.99 MB
Memory usage after chunk: 352.37 MB
Processing chunk 7, rows: 100000, cumulative rows: 800000
Memory usage before chunk

In [13]:
# Concatenate the Parquet partitions into a single DataFrame
parquet_partitions = [pd.read_parquet(partition) for partition in parquet_file_paths]
df = pd.concat(parquet_partitions, ignore_index=True)

In [14]:
# Trigger garbage collection to clear unreferenced objects
gc.collect()

18

In [15]:
df.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11363762 entries, 0 to 11363761
Data columns (total 190 columns):
 #    Column       Dtype  
---   ------       -----  
 0    customer_ID  object 
 1    S_2          object 
 2    P_2          float64
 3    D_39         float64
 4    B_1          float64
 5    B_2          float64
 6    R_1          float64
 7    S_3          float64
 8    D_41         float64
 9    B_3          float64
 10   D_42         float64
 11   D_43         float64
 12   D_44         float64
 13   B_4          float64
 14   D_45         float64
 15   B_5          float64
 16   R_2          float64
 17   D_46         float64
 18   D_47         float64
 19   D_48         float64
 20   D_49         float64
 21   B_6          float64
 22   B_7          float64
 23   B_8          float64
 24   D_50         float64
 25   D_51         float64
 26   B_9          float64
 27   R_3          float64
 28   D_52         float64
 29   P_3          float64
 30   B_10         f

In [18]:
# Step 4: Convert and save the combined DataFrame as a single Parquet file
combined_parquet_file = 'test_data.parquet'

In [19]:
df.to_parquet(os.path.join(data_dir, combined_parquet_file))

In [20]:
# Step 5: Remove individual Parquet partitions
for partition_file in parquet_file_paths:
    os.remove(partition_file)