## Task: File Ingestion and Schema validation

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

* Create a summary of the file:

    Total number of rows,

    total number of columns

    file size

In [1]:
import os
import time

In [2]:
#Size of the file
os.path.getsize('custom_1988_2020.csv')

4544707885

### Read in the data with Dask

In [5]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('custom_1988_2020.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.03575897216796875 sec


### Read in the data with Pandas

In [7]:
import pandas as pd
#start = time.time()
#df = pd.read_csv('custom_1988_2020.csv')
#end = time.time()
#print("Read csv with pandas: ",(end-start),"sec")

### Read in the data with Modin and Ray

In [19]:
#import modin.pandas as pd
#import ray
#ray.shutdown()
#ray.init()
#start = time.time()
#df = pd.read_csv('custom_1988_2020.csv')
#end = time.time()
#print("Read csv with modin and ray: ",(end-start),"sec")

### Here Dask is better than Pandas, Modin and Ray, as other had MemoryError

In [17]:
from dask import dataframe as dd
df = dd.read_csv('custom_1988_2020.csv',delimiter=',')

In [18]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 8 entries, 198801 to 34353
dtypes: int64(8)

In [20]:
#No. of Rows
len(df.index)

113607321

In [21]:
#No, of Columns
len(df.columns)

8

In [22]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')



In [23]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [24]:
data=df.columns
data

Index(['198801', '1', '103', '100', '000000190', '0', '35843', '34353'], dtype='object')

### Validation

In [25]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [26]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing utility.py


In [42]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: Rate
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - BusinessYear
      - StateCode
      - IssuerId
      - SourceName
      - VersionNum
      - ImportDate
      - IssuerId2
      - FederalTIN
      - RateEffectiveDate
      - RateExpirationDate
      - PlanId
      - RatingAreaId
      - Tobacco
      - Age
      - IndividualRate
      - IndividualTobaccoRate
      - Couple
      - PrimarySubscriberAndOneDependent
      - PrimarySubscriberAndTwoDependents
      - PrimarySubscriberAndThreeOrMoreDependents
      - CoupleAndOneDependent
      - CoupleAndTwoDependents
      - CoupleAndThreeOrMoreDependents
      - RowNumber

Overwriting store.yaml


In [45]:
# Reading config file
#import utility as util
#config_data = util.read_config_file('store.yaml')

In [46]:
#data of config file
#config_data

In [47]:
# Reading process of the file using Dask
from dask import dataframe as dd
df_sample = dd.read_csv('custom_1988_2020.csv',delimiter=',')
df_sample.head()

Unnamed: 0,198801,1,103,100,000000190,0,35843,34353
0,198801,1,103,100,120991000,0,1590,4154
1,198801,1,103,100,210390900,0,4500,2565
2,198801,1,103,100,220890200,0,3000,757
3,198801,1,103,100,240220000,0,26000,40668
4,198801,1,103,100,250410000,0,5,8070
