## File Ingestion and Schema validation

#### The dataset
Survival Analysis Synthetic Data (6GB)

Link: https://www.kaggle.com/datasets/louise2001/survival-analysis-synthetic-data?resource=download

In [19]:
import os
import time
from dask import dataframe
import pandas as pd
import logging
import yaml
import re
import utility as util

In [2]:
# File size
os.path.getsize('C:/Users/assoma/Desktop/survival_data.csv')

6525517581

In [4]:
# Reading with dask
start = time.time()
dask_df = dataframe.read_csv('C:/Users/assoma/Desktop/survival_data.csv')
end = time.time()
print("Time to read the csv file with dask: ",(end-start),"sec")

Time to read the csv file with dask:  0.032982587814331055 sec


In [None]:
# Reading with modin and ray
import modin.pandas as mpd
import ray
ray.shutdown()
ray.init()
start = time.time()
modin_df = mpd.read_csv('C:/Users/assoma/Desktop/survival_data.csv')
end = time.time()
print("Time to read the csv file with modin and ray: ",(end-start),"sec")

In [4]:
# Reading with pandas
start = time.time()
pandas_df = pd.read_csv('C:/Users/assoma/Desktop/survival_data.csv')
end = time.time()
print("Time to read the csv file with pandas: ",(end-start),"sec")

Desk was the fastest

In [5]:
# Check the data set
dask_df.head()

Unnamed: 0.1,Unnamed: 0,age_start_observed,age_end,is_truncated,is_censored,is_dead,date_start_observed,date_end_observed
0,15113102,0.0,9.097335,False,True,False,1908-11-17,1917-12-22
1,41505894,0.0,64.486689,False,True,False,1828-09-13,1893-03-10
2,24774171,0.0,33.071552,False,True,False,1911-02-07,1944-03-04
3,97834936,34.834566,68.778258,True,True,False,1820-01-01,1853-12-10
4,45793809,0.0,95.948358,False,False,True,1870-05-29,1966-05-11


In [6]:
dask_df.info

<bound method DataFrame.info of Dask DataFrame Structure:
                Unnamed: 0 age_start_observed  age_end is_truncated is_censored is_dead date_start_observed date_end_observed
npartitions=102                                                                                                              
                     int64            float64  float64         bool        bool    bool              object            object
                       ...                ...      ...          ...         ...     ...                 ...               ...
...                    ...                ...      ...          ...         ...     ...                 ...               ...
                       ...                ...      ...          ...         ...     ...                 ...               ...
                       ...                ...      ...          ...         ...     ...                 ...               ...
Dask Name: read-csv, 102 tasks>

In [7]:
# No. of rows
len(dask_df.index)

88809774

In [8]:
# No. of columns
len(dask_df.columns)

8

In [10]:
# Rename the first column
dask_df.columns=dask_df.rename(columns = {'Unnamed: 0':'id'})

In [11]:
# Remove special character from columns
dask_df.columns=dask_df.columns.str.replace('[_]',' ')

  dask_df.columns=dask_df.columns.str.replace('[_]',' ')


In [12]:
# Capitlize the first letter of each column
dask_df.columns = dask_df.columns.str.title()

In [13]:
# Remove white space from columns
dask_df.columns = dask_df.columns.str.replace(' ', '')

In [14]:
dask_df.head()

Unnamed: 0,Id,AgeStartObserved,AgeEnd,IsTruncated,IsCensored,IsDead,DateStartObserved,DateEndObserved
0,15113102,0.0,9.097335,False,True,False,1908-11-17,1917-12-22
1,41505894,0.0,64.486689,False,True,False,1828-09-13,1893-03-10
2,24774171,0.0,33.071552,False,True,False,1911-02-07,1944-03-04
3,97834936,34.834566,68.778258,True,True,False,1820-01-01,1853-12-10
4,45793809,0.0,95.948358,False,False,True,1870-05-29,1966-05-11


In [15]:
dask_df.columns

Index(['Id', 'AgeStartObserved', 'AgeEnd', 'IsTruncated', 'IsCensored',
       'IsDead', 'DateStartObserved', 'DateEndObserved'],
      dtype='object')

## Validation

In [16]:
%%writefile utility.py


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


Write YML file

In [17]:
%%writefile file.yaml
file_type: csv
dataset_name: file
file_name: survival_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Id
    - AgeStartObserved
    - AgeEnd
    - IsTruncated
    - IsCensored
    - IsDead
    - DateStartObserved
    - DateEndObserved

Overwriting file.yaml


In [None]:
# Read config file
config_data = util.read_config_file("file.yaml")

In [None]:
config_data['inbound_delimiter']

In [None]:
#inspecting data of config file
config_data

In [None]:
# Normal reading process of the file
df = dataframe.read_csv('C:/Users/assoma/Desktop/survival_data.csv')
df.head()

In [None]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

In [None]:
#validate the header of the file
util.col_header_val(df,config_data)

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

In [None]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

In [25]:
# Write csv in gz format in pipe separated text file (|)
dask_df.to_csv('survival_data.csv.gz',
          sep='|',
          header=True,
          index=False,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['c:/Users/assoma/Desktop/survival_data.csv.gz\\000.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\001.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\002.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\003.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\004.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\005.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\006.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\007.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\008.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\009.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\010.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\011.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\012.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\013.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\014.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\015.part',
 'c:/Users/assoma/Desktop/survival_data.csv.gz\\016.part