# DATA GLACIER - WEEK 6

**Task:**


*   Take any csv/text file of 2+ GB of your choice.
*   Read the file ( Present approach of reading the file )
*   Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency
*   Perform basic validation on data columns : eg: remove special character , white spaces from the col name
*   As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML
*   Validate number of columns and column name of ingested file with YAML.
*   Write the file in pipe separated text file (|) in gz format.
*   Create a summary of the file:
      Total number of rows, total number of columns, file size

**Data Ingestion sample code walkthrough**



*   Create a utility file
*   Config file creation
*   Data ingestion pipeline

### Importing libraries

In [1]:
import time
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import yaml
import os
import ray
ray.init()
from IPython.display import display, HTML
import yaml

2023-10-10 16:53:56,570	INFO worker.py:1642 -- Started a local Ray instance.


### Reading the file

In [2]:
file_path = '/Users/handeatasagun/PycharmProjects/z.csv'

In [3]:
# Size of the file
###################
os.path.getsize(file_path)

3396350301

In [4]:
def measure_time(method, label):
    start_time = time.time()
    df = method(file_path)
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Time taken by {label}: {elapsed_time:.2f} seconds")

# Method 1: Pandas
##################
def load_with_pandas(file_path):
    return pd.read_csv(file_path)

# Method 2: Dask
##################
def load_with_dask(file_path):
    df = dd.read_csv(file_path)
    df.compute()
    return df

# Method 3: Modin
#################
def load_with_modin(file_path):
    return mpd.read_csv(file_path)

measure_time(load_with_pandas, "Pandas")
measure_time(load_with_dask, "Dask")
measure_time(load_with_modin, "Modin")

Time taken by Pandas: 14.71 seconds
Time taken by Dask: 11.10 seconds
Time taken by Modin: 6.54 seconds


**- It was observed that Modin is faster than the others for this dataset.**

### Basic validation

In [5]:
df = mpd.read_csv(file_path)

In [6]:
# Exploratory data analaysis
#####################################
def check_df(dataframe, head=5):
    display(HTML(f"<h3>Types</h3>{dataframe.dtypes.to_frame().to_html()}"))
    display(HTML(f"<h3>Head</h3>{dataframe.head(head).to_html()}"))
    display(HTML(f"<h3>Shape</h3>{dataframe.shape}"))
    display(HTML(f"<h3>NA</h3>{dataframe.isnull().sum().to_frame().to_html()}"))
    display(HTML(f"<h3>Quantiles</h3>{dataframe.describe([0.25, 0.50, 0.95]).T.to_html()}"))

    
check_df(df)

Unnamed: 0,0
t_dat,object
customer_id,object
article_id,int64
price,float64
sales_channel_id,int64


Please refer to https://modin.readthedocs.io/en/stable/supported_apis/defaulting_to_pandas.html for explanation.


Unnamed: 0,t_dat,customer_id,article_id,price,sales_channel_id
0,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0cad8ffe7ad4a1091e318,663713001,0.050831,2
1,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0cad8ffe7ad4a1091e318,541518023,0.030492,2
2,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699338c5570910a014cc2,505221004,0.015237,2
3,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699338c5570910a014cc2,685687003,0.016932,2
4,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699338c5570910a014cc2,685687004,0.016932,2




Unnamed: 0,0
t_dat,0
customer_id,0
article_id,0
price,0
sales_channel_id,0




Unnamed: 0,count,mean,std,min,25%,50%,95%,max
article_id,31788324.0,696227200.0,133448000.0,108775000.0,632803000.0,714582000.0,870293000.0,956217000.0
price,31788324.0,0.02782927,0.01918113,1.694915e-05,0.01581356,0.02540678,0.05930508,0.5915254
sales_channel_id,31788324.0,1.704028,0.4564786,1.0,1.0,2.0,2.0,2.0


In [7]:
def clean_column_names(df):
    
    # Remove special characters
    df.columns = df.columns.str.replace('[^a-zA-Z0-9]+', '_')
    
    # Remove specific characters like '#', '@', '&'
    df.columns = df.columns.str.replace('[#@&]', '')

    # Remove whitespaces
    df.columns = df.columns.str.replace(' ', '')

    # Convert column names to lowercase
    df.columns = df.columns.str.strip().str.lower()

    return df


df = clean_column_names(df)

In [8]:
df.columns

Index(['t_dat', 'customer_id', 'article_id', 'price', 'sales_channel_id'], dtype='object')

### Write utility.py

In [9]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df, table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


### Write YAML file

In [10]:
%%writefile file.yaml
file_type: csv
file_name: z
inbound_delimiter: ","
outbound_delimiter: "|"
columns:
    - t_dat
    - customer_id
    - article_id
    - price
    - sales_channel_id

Overwriting file.yaml


In [11]:
# Reading config file
#################################
import utility as util
config_data = util.read_config_file("file.yaml")

In [12]:
# Data of config file
#################################
config_data

{'file_type': 'csv',
 'file_name': 'z',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'columns': ['t_dat',
  'customer_id',
  'article_id',
  'price',
  'sales_channel_id']}

In [13]:
# Read the file
#####################################
df = mpd.read_csv(file_path)
df.head()

[2m[36m(raylet)[0m Spilled 2255 MiB, 27 objects, write throughput 1814 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.


Unnamed: 0,t_dat,customer_id,article_id,price,sales_channel_id
0,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,663713001,0.050831,2
1,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,541518023,0.030492,2
2,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221004,0.015237,2
3,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687003,0.016932,2
4,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687004,0.016932,2


In [14]:
# Read the file using config file
#####################################
file_type = config_data['file_type']
source_file = "/Users/handeatasagun/PycharmProjects/" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file, delimiter=config_data['inbound_delimiter'])
df.head()

Unnamed: 0,t_dat,customer_id,article_id,price,sales_channel_id
0,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,663713001,0.050831,2
1,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,541518023,0.030492,2
2,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221004,0.015237,2
3,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687003,0.016932,2
4,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687004,0.016932,2


In [15]:
# Validate the header of the file
#####################################
util.col_header_val(df,config_data)

column name and column length validation passed


1

### Write the file in gz. format

In [16]:
df.to_csv('z.csv.gz', sep='|', compression='gzip', index=False)

### Summary

In [17]:
# Create a summary of the file
file_path = 'z.csv.gz'

# Read the DataFrame without using 'with'
df = pd.read_csv(file_path, sep='|', compression='gzip')

# Calculate the total number of rows and columns
num_rows, num_columns = df.shape

# Get the file size (in bytes)
file_size = os.path.getsize(file_path)

# Print the summary information
print(f"Total Number of Rows: {num_rows}")
print(f"Total Number of Columns: {num_columns}")
print(f"File Size: {file_size}")

Total Number of Rows: 31788324
Total Number of Columns: 5
File Size: 595380774
