# Week 6: File ingestion and schema validation

- Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

- Read the file ( Present approach of reading the file )

- Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

- Perform basic validation on data columns : eg: remove special character , white spaces from the col name

- As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML

- Validate number of columns and column name of ingested file with YAML.

- Write the file in pipe separated text file (|) in gz format.

- Create a summary of the file:

 Total number of rows,

 total number of columns

 file size


In [1]:
!pip install -U modin[all]
!pip install -U modin[all] --upgrade
!pip install ray




In [19]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import time
import os
import yaml


In [20]:
file_path ='/content/Train_UrbanTraffic_Flow.csv'
df = pd.read_csv(file_path)
df.head()

Unnamed: 0.1,Unnamed: 0,timestep,location,traffic,prev_1,prev_2,prev_3,prev_4,prev_5,prev_6,...,hour_24,no_roads,featb_1,featb_2,featb_3,featb_4,featc_1,featc_2,featc_3,featc_4
0,0,0,0,0.050911,0.09248,0.096684,0.070061,0.065857,0.078468,0.071462,...,0.0,3.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
1,1,0,1,0.04624,0.097151,0.102289,0.081738,0.0766,0.072396,0.067725,...,0.0,4.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
2,2,0,2,0.050444,0.115367,0.110696,0.105558,0.095283,0.067725,0.080336,...,0.0,3.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
3,3,0,3,0.044839,0.112097,0.102756,0.092013,0.083606,0.062121,0.070995,...,0.0,3.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
4,4,0,4,0.044839,0.127043,0.150864,0.131714,0.106025,0.099019,0.090612,...,0.0,5.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0


In [21]:
df.shape

(45396, 52)

## Reading Methods and Computational Efficiency:

### Method 1 Pandas

In [22]:
# Measure loading time with Pandas
start_time_pandas = time.time()
df_pandas = pd.read_csv(file_path)
end_time_pandas = time.time()
print(f"Time taken with Pandas: {end_time_pandas - start_time_pandas} seconds")

Time taken with Pandas: 1.0703744888305664 seconds


### Method 2 Dask


In [23]:
# Measure loading time with Dask
start_time_dask = time.time()
df_dask = dd.read_csv(file_path)
df_dask = df_dask.compute()
end_time_dask = time.time()
print(f"Time taken with Dask: {end_time_dask - start_time_dask} seconds")

Time taken with Dask: 0.7394263744354248 seconds


### Method 3 Modin

In [24]:
# Measure loading time with Modin
start_time_modin = time.time()
df_modin = mpd.read_csv(file_path)
end_time_modin = time.time()
print(f"Time taken with Modin: {end_time_modin - start_time_modin} seconds")

Time taken with Modin: 0.7587616443634033 seconds


### Clean the Column Names:

In [25]:
df.columns = df.columns.str.replace('[^a-zA-Z0-9]+', '_')
df.columns = df.columns.str.strip().str.lower()

# Check for Missing Values
missing_values = df.isnull().sum()

# Check for Duplicates
duplicates = df[df.duplicated()]

# Print Column Names
print("Column Names:")
column_names = df.columns.tolist()
print(column_names)

# Write Column Names to YAML
yaml_file_path = '/content/column_names.yaml'
with open(yaml_file_path, 'w') as yaml_file:
    yaml.dump(column_names, yaml_file, default_flow_style=False)

# Subset the DataFrame
subset_df = df.iloc[:1000]

# Save Subset to Compressed CSV
output_file_path = '/content/output_file.csv.gz'
subset_df.to_csv(output_file_path, sep='|', compression='gzip', index=False)

# Get File Size
file_size = os.path.getsize(output_file_path)

# Get Total Number of Rows and Columns
num_rows = len(df)
num_cols = len(df.columns)


Column Names:
['unnamed: 0', 'timestep', 'location', 'traffic', 'prev_1', 'prev_2', 'prev_3', 'prev_4', 'prev_5', 'prev_6', 'prev_7', 'prev_8', 'prev_9', 'prev_10', 'feata_1', 'feata_2', 'feata_3', 'feata_4', 'feata_5', 'hour_1', 'hour_2', 'hour_3', 'hour_4', 'hour_5', 'hour_6', 'hour_7', 'hour_8', 'hour_9', 'hour_10', 'hour_11', 'hour_12', 'hour_13', 'hour_14', 'hour_15', 'hour_16', 'hour_17', 'hour_18', 'hour_19', 'hour_20', 'hour_21', 'hour_22', 'hour_23', 'hour_24', 'no_roads', 'featb_1', 'featb_2', 'featb_3', 'featb_4', 'featc_1', 'featc_2', 'featc_3', 'featc_4']


### Print Summary

In [26]:
# Print Summary
print(f"Total number of rows: {num_rows}")
print(f"Total number of columns: {num_cols}")
print(f"File size: {file_size} bytes")

Total number of rows: 45396
Total number of columns: 52
File size: 19104 bytes
