


## Task:

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.


Write Utility File

In [None]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Write YAML File

In [9]:
%%writefile file.yaml
file_type: txt
dataset_name: testfile
file_name: test_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - age
    - sex
    - bmi
    - children
    - smoker
    - region
    - charges

Overwriting file.yaml


In [10]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [11]:
config_data['inbound_delimiter']

','

In [12]:
#inspecting data of config file
config_data

{'file_type': 'txt',
 'dataset_name': 'testfile',
 'file_name': 'test_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges']}

In [13]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("test_data.txt",delimiter=',')
df_sample.head()

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,female,27.9,0,yes,southwest,16884.924
1,18,male,33.77,1,no,southeast,1725.5523
2,28,male,33.0,3,no,southeast,4449.462
3,33,male,22.705,0,no,northwest,21984.47061
4,32,male,28.88,0,no,northwest,3866.8552


In [14]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,female,27.9,0,yes,southwest,16884.924
1,18,male,33.77,1,no,southeast,1725.5523
2,28,male,33.0,3,no,southeast,4449.462
3,33,male,22.705,0,no,northwest,21984.47061
4,32,male,28.88,0,no,northwest,3866.8552


In [15]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [16]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges'], dtype='object')
columns of YAML are: ['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges']


In [26]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [27]:

#write a pandas dataframe to gzipped CSV file
df.to_csv('test_data.txt.gz', sep ='|')

In [24]:
config_data['outbound_delimiter']

'|'

In [30]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1338 entries, 0 to 1337
Data columns (total 7 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   age       1338 non-null   int64  
 1   sex       1338 non-null   object 
 2   bmi       1338 non-null   float64
 3   children  1338 non-null   int64  
 4   smoker    1338 non-null   object 
 5   region    1338 non-null   object 
 6   charges   1338 non-null   float64
dtypes: float64(2), int64(2), object(3)
memory usage: 73.3+ KB


In [31]:
round(df)

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,female,28.0,0,yes,southwest,16885.0
1,18,male,34.0,1,no,southeast,1726.0
2,28,male,33.0,3,no,southeast,4449.0
3,33,male,23.0,0,no,northwest,21984.0
4,32,male,29.0,0,no,northwest,3867.0
...,...,...,...,...,...,...,...
1333,50,male,31.0,3,no,northwest,10601.0
1334,18,female,32.0,0,no,northeast,2206.0
1335,18,female,37.0,0,no,southeast,1630.0
1336,21,female,26.0,0,no,southwest,2008.0


In [32]:
df.describe()

Unnamed: 0,age,bmi,children,charges
count,1338.0,1338.0,1338.0,1338.0
mean,39.207025,30.663397,1.094918,13270.422265
std,14.04996,6.098187,1.205493,12110.011237
min,18.0,15.96,0.0,1121.8739
25%,27.0,26.29625,0.0,4740.28715
50%,39.0,30.4,1.0,9382.033
75%,51.0,34.69375,2.0,16639.912515
max,64.0,53.13,5.0,63770.42801
