## Task: File Ingestion and Schema validation
Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

Read the file ( Present approach of reading the file )

Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

Perform basic validation on data columns : eg: remove special character , white spaces from the col name

As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of
read and write file, column name in YAML

Validate number of columns and column name of ingested file with YAML.

Write the file in pipe separated text file (|) in gz format.

Create a summary of the file:

Total number of rows,

total number of columns

file size

In [1]:
import warnings
warnings.filterwarnings("ignore")

# Computational Efficiency

In [2]:
import os
import time

In [3]:
os.path.getsize('charges.csv')

1968829442

# Read Data File with Dask

In [4]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('charges.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.027932405471801758 sec


# Read Data File with pandas

In [5]:
import pandas as pd

In [6]:
Mylist=[]
chunksize=1000
for chunk in pd.read_csv('charges.csv', chunksize=chunksize):
    Mylist.append(chunk)

In [8]:
start = time.time()
df=pd.concat(Mylist,axis=0)
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  14.75093698501587 sec


# Read Data File with Modin and Ray

In [9]:
import modin.pandas as pdm

In [10]:
import ray
ray.shutdown()
ray.init()

2023-03-28 22:32:16,017	INFO worker.py:1553 -- Started a local Ray instance.


0,1
Python version:,3.10.9
Ray version:,2.3.1


In [11]:
runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'}}

In [12]:
start = time.time()
dfm = pdm.read_csv('charges.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  27.43939709663391 sec


# Dataset Information

In [13]:
df=dd.read_csv('charges.csv')

In [14]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 24 entries, BusinessYear to RowNumber
dtypes: object(10), float64(9), int64(5)

In [16]:
#No. of Rows
len(df.index)

12694445

In [17]:
#No, of Columns
len(df.columns)

24

In [18]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')

In [19]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [20]:
data=df.columns
data

Index(['BusinessYear', 'StateCode', 'IssuerId', 'SourceName', 'VersionNum',
       'ImportDate', 'IssuerId2', 'FederalTIN', 'RateEffectiveDate',
       'RateExpirationDate', 'PlanId', 'RatingAreaId', 'Tobacco', 'Age',
       'IndividualRate', 'IndividualTobaccoRate', 'Couple',
       'PrimarySubscriberAndOneDependent', 'PrimarySubscriberAndTwoDependents',
       'PrimarySubscriberAndThreeOrMoreDependents', 'CoupleAndOneDependent',
       'CoupleAndTwoDependents', 'CoupleAndThreeOrMoreDependents',
       'RowNumber'],
      dtype='object')

# YAML File

In [21]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


# Columns' Name in YAML File 

In [28]:
%%writefile fine.yaml
file_type: csv
dataset_name: file
file_name: charges
table_name: fine2
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - BusinessYear
      - StateCode
      - IssuerId
      - SourceName
      - VersionNum
      - ImportDate
      - IssuerId2
      - FederalTIN
      - RateEffectiveDate
      - RateExpirationDate
      - PlanId
      - RatingAreaId
      - Tobacco
      - Age
      - IndividualRate
      - IndividualTobaccoRate
      - Couple
      - PrimarySubscriberAndOneDependent
      - PrimarySubscriberAndTwoDependents
      - PrimarySubscriberAndThreeOrMoreDependents
      - CoupleAndOneDependent
      - CoupleAndTwoDependents
      - CoupleAndThreeOrMoreDependents
      - RowNumber

Overwriting fine.yaml


In [29]:
# Reading config file
import utility as util
config_data = util.read_config_file('fine.yaml') 

In [30]:
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'charges',
 'table_name': 'fine2',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['BusinessYear - StateCode - IssuerId - SourceName - VersionNum - ImportDate - IssuerId2 - FederalTIN - RateEffectiveDate - RateExpirationDate - PlanId - RatingAreaId - Tobacco - Age - IndividualRate - IndividualTobaccoRate - Couple - PrimarySubscriberAndOneDependent - PrimarySubscriberAndTwoDependents - PrimarySubscriberAndThreeOrMoreDependents - CoupleAndOneDependent - CoupleAndTwoDependents - CoupleAndThreeOrMoreDependents - RowNumber']}

# Validate number of columns and columns' name of ingested file with YAML



In [31]:
from dask import dataframe as dd
df_sample = dd.read_csv("charges.csv",delimiter=',')
df_sample

Unnamed: 0_level_0,BusinessYear,StateCode,IssuerId,SourceName,VersionNum,ImportDate,IssuerId2,FederalTIN,RateEffectiveDate,RateExpirationDate,PlanId,RatingAreaId,Tobacco,Age,IndividualRate,IndividualTobaccoRate,Couple,PrimarySubscriberAndOneDependent,PrimarySubscriberAndTwoDependents,PrimarySubscriberAndThreeOrMoreDependents,CoupleAndOneDependent,CoupleAndTwoDependents,CoupleAndThreeOrMoreDependents,RowNumber
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1
,int64,object,int64,object,int64,object,int64,object,object,object,object,object,object,object,float64,float64,float64,float64,float64,float64,float64,float64,float64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [32]:
#Reading the file using config file
file_type = config_data['file_type']
source_file = config_data['file_name'] + f'.{file_type}'

In [33]:
import pandas as pd
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,BusinessYear,StateCode,IssuerId,SourceName,VersionNum,ImportDate,IssuerId2,FederalTIN,RateEffectiveDate,RateExpirationDate,...,IndividualRate,IndividualTobaccoRate,Couple,PrimarySubscriberAndOneDependent,PrimarySubscriberAndTwoDependents,PrimarySubscriberAndThreeOrMoreDependents,CoupleAndOneDependent,CoupleAndTwoDependents,CoupleAndThreeOrMoreDependents,RowNumber
0,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,29.0,,,,,,,,,14
1,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,14
2,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,15
3,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,15
4,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,16


In [34]:
df.columns = df.columns.str.replace(' ', '')

In [35]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['BusinessYear', 'StateCode', 'IssuerId', 'SourceName', 'VersionNum',
       'ImportDate', 'IssuerId2', 'FederalTIN', 'RateEffectiveDate',
       'RateExpirationDate', 'PlanId', 'RatingAreaId', 'Tobacco', 'Age',
       'IndividualRate', 'IndividualTobaccoRate', 'Couple',
       'PrimarySubscriberAndOneDependent', 'PrimarySubscriberAndTwoDependents',
       'PrimarySubscriberAndThreeOrMoreDependents', 'CoupleAndOneDependent',
       'CoupleAndTwoDependents', 'CoupleAndThreeOrMoreDependents',
       'RowNumber'],
      dtype='object')
columns of YAML are: ['BusinessYear - StateCode - IssuerId - SourceName - VersionNum - ImportDate - IssuerId2 - FederalTIN - RateEffectiveDate - RateExpirationDate - PlanId - RatingAreaId - Tobacco - Age - IndividualRate - IndividualTobaccoRate - Couple - PrimarySubscriberAndOneDependent - PrimarySubscriberAndTwoDependents - PrimarySubscriberAndThreeOrMoreDependents - CoupleAndOneDependent - CoupleAndTwoDependents - CoupleAndThre

# File in pipe seperated text file (|) in gz format

In [38]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('charges.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('charges.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['C:\\Users\\H149675\\charges.csv.gz\\00.part',
 'C:\\Users\\H149675\\charges.csv.gz\\01.part',
 'C:\\Users\\H149675\\charges.csv.gz\\02.part',
 'C:\\Users\\H149675\\charges.csv.gz\\03.part',
 'C:\\Users\\H149675\\charges.csv.gz\\04.part',
 'C:\\Users\\H149675\\charges.csv.gz\\05.part',
 'C:\\Users\\H149675\\charges.csv.gz\\06.part',
 'C:\\Users\\H149675\\charges.csv.gz\\07.part',
 'C:\\Users\\H149675\\charges.csv.gz\\08.part',
 'C:\\Users\\H149675\\charges.csv.gz\\09.part',
 'C:\\Users\\H149675\\charges.csv.gz\\10.part',
 'C:\\Users\\H149675\\charges.csv.gz\\11.part',
 'C:\\Users\\H149675\\charges.csv.gz\\12.part',
 'C:\\Users\\H149675\\charges.csv.gz\\13.part',
 'C:\\Users\\H149675\\charges.csv.gz\\14.part',
 'C:\\Users\\H149675\\charges.csv.gz\\15.part',
 'C:\\Users\\H149675\\charges.csv.gz\\16.part',
 'C:\\Users\\H149675\\charges.csv.gz\\17.part',
 'C:\\Users\\H149675\\charges.csv.gz\\18.part',
 'C:\\Users\\H149675\\charges.csv.gz\\19.part',
 'C:\\Users\\H149675\\charges.csv.gz\\20

# Summary of the file

In [39]:
#number of files in gz format folder
import os
entries = os.listdir('charges.csv.gz/')
for entry in entries:
    print(entry)

00.part
01.part
02.part
03.part
04.part
05.part
06.part
07.part
08.part
09.part
10.part
11.part
12.part
13.part
14.part
15.part
16.part
17.part
18.part
19.part
20.part
21.part
22.part
23.part
24.part
25.part
26.part
27.part
28.part
29.part


In [40]:
#size of the gz format folder
os.path.getsize('charges.csv.gz')

12288