<h1 style="text-align:center; color:blue">File ingestion and schema validation </h1>

<hr>
<hr>


#### Overwrite Utility File.

In [1]:

%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Writing testutility.py


<hr>

#### Trying different methods of file reading

1-Pandas

In [2]:
import pandas as pd
import time
start = time.time()
pd_df = pd.read_csv("survival_data.csv")
end = time.time()
print("Time it took to read the file using panads: ",(end-start),"sec")

  exec(code_obj, self.user_global_ns, self.user_ns)


Time it took to read the file using panads:  14.468481063842773 sec


2-Dask

In [3]:
from dask import dataframe as dd
import time
start = time.time()
dask_df = dd.read_csv("survival_data.csv")
end = time.time()
print("Time it took to read the file using Dask: ",(end-start),"sec")

Time it took to read the file using Dask:  0.06254386901855469 sec


#### Dask was the fastest one by a lot of time

## Remove special character , white spaces.


In [4]:
from dask import dataframe as dd
df = dd.read_csv("survival_data.csv")
df.head()

Unnamed: 0,Client id,age_start_observed,age_end,is_truncated,is_censored,is_dead,date_start_observed,date_end_observed
0,15113102,0.0,9.097335,False,True,False,1908-11-17,1917-12-22
1,41505894,0.0,64.486689,False,True,False,1828-09-13,1893-03-10
2,24774171,0.0,33.071552,False,True,False,1911-02-07,1944-03-04
3,97834936,34.834566,68.778258,True,True,False,1820-01-01,1853-12-10
4,45793809,0.0,95.948358,False,False,True,1870-05-29,1966-05-11


In [5]:
print(df.columns)


Index(['Client id', 'age_start_observed', 'age_end', 'is_truncated',
       'is_censored', 'is_dead', 'date_start_observed', 'date_end_observed'],
      dtype='object')


In [18]:
#df.columns=df.columns.str.replace('[#,@,&,_]','')
#df.columns = df.columns.str.replace(' ', '')
#print(df.columns)

<hr>

## Overwrite YAML File.


In [19]:
%%writefile file.yaml

file_type: csv
dataset_name: Survival Analysis Synthetic Data
file_name: survival_data.csv
table_name: survival_data
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    -Client id
    -age_start_observed
    -age_end
    -is_truncated
    -is_censored
    -is_dead
    -date_start_observed
    -date_end_observed

Writing file.yaml


In [20]:
 # Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [21]:
config_data['inbound_delimiter']

','

In [22]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'Survival Analysis Synthetic Data',
 'file_name': 'survival_data.csv',
 'table_name': 'survival_data',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': '-Client id -age_start_observed -age_end -is_truncated -is_censored -is_dead -date_start_observed -date_end_observed'}

In [23]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("survival_data.csv",delimiter=',')
df_sample.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,Client id,age_start_observed,age_end,is_truncated,is_censored,is_dead,date_start_observed,date_end_observed
0,15113102,0.0,9.097335,False,True,False,1908-11-17,1917-12-22
1,41505894,0.0,64.486689,False,True,False,1828-09-13,1893-03-10
2,24774171,0.0,33.071552,False,True,False,1911-02-07,1944-03-04
3,97834936,34.834566,68.778258,True,True,False,1820-01-01,1853-12-10
4,45793809,0.0,95.948358,False,False,True,1870-05-29,1966-05-11


In [24]:
#read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name']
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)
  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,Client id,age_start_observed,age_end,is_truncated,is_censored,is_dead,date_start_observed,date_end_observed
0,15113102,0.0,9.097335,False,True,False,1908-11-17,1917-12-22
1,41505894,0.0,64.486689,False,True,False,1828-09-13,1893-03-10
2,24774171,0.0,33.071552,False,True,False,1911-02-07,1944-03-04
3,97834936,34.834566,68.778258,True,True,False,1820-01-01,1853-12-10
4,45793809,0.0,95.948358,False,False,True,1870-05-29,1966-05-11


<hr>

#### Validation


In [25]:
util.col_header_val(df ,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['is_censored', 'date_end_observed', 'date_start_observed', 'is_dead', 'age_end', 'age_start_observed', 'is_truncated', 'client_id']
Following YAML columns are not in the file uploaded ['r', ' ', 'v', 'b', 'g', 'e', 'n', 'a', 's', '-', 'd', 'i', 'l', '_', 'u', 'o', 'c', 't']


0

In [26]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['client_id', 'age_start_observed', 'age_end', 'is_truncated',
       'is_censored', 'is_dead', 'date_start_observed', 'date_end_observed'],
      dtype='object')
columns of YAML are: -Client id -age_start_observed -age_end -is_truncated -is_censored -is_dead -date_start_observed -date_end_observed


##### Since we have the same columns on both files, the validation passed

<hr>

#### Write the file in pipe separated text file (|) in gz format

In [27]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv("survival_data.csv",delimiter=',')
df.to_csv('survival_data.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['C:/Users/user/Downloads/survival_data.csv/survival_data.csv.gz\\0.part',
 'C:/Users/user/Downloads/survival_data.csv/survival_data.csv.gz\\1.part',
 'C:/Users/user/Downloads/survival_data.csv/survival_data.csv.gz\\2.part',
 'C:/Users/user/Downloads/survival_data.csv/survival_data.csv.gz\\3.part',
 'C:/Users/user/Downloads/survival_data.csv/survival_data.csv.gz\\4.part',
 'C:/Users/user/Downloads/survival_data.csv/survival_data.csv.gz\\5.part',
 'C:/Users/user/Downloads/survival_data.csv/survival_data.csv.gz\\6.part',
 'C:/Users/user/Downloads/survival_data.csv/survival_data.csv.gz\\7.part',
 'C:/Users/user/Downloads/survival_data.csv/survival_data.csv.gz\\8.part']

# Summary

In [28]:
    rows_num=len(df)
    print("Number of rows in dataset: ", rows_num)
    print("----------------------------------------------------------------------------")
    columns_num=len(df.columns)
    print("Number of columns in dataset: ", columns_num)
    print("----------------------------------------------------------------------------")
    elements_num=df.size
    print("Get the number of elements: ", elements_num)
    print("----------------------------------------------------------------------------")
    print("Describtion of the data set: ", df.info())

    
    # The size of the CSV
    import os
    file_size = os.path.getsize("survival_data.csv")
    print("File Size is :", file_size, "bytes")

Number of rows in dataset:  3718224
----------------------------------------------------------------------------
Number of columns in dataset:  8
----------------------------------------------------------------------------
Get the number of elements:  dd.Scalar<size-ag..., dtype=int32>
----------------------------------------------------------------------------
<class 'dask.dataframe.core.DataFrame'>
Columns: 8 entries, Client id to date_end_observed
dtypes: object(2), bool(3), float64(2), int64(1)Describtion of the data set:  None
File Size is : 273223689 bytes


#### Comparing Files 

In [29]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['Client id', 'age_start_observed', 'age_end', 'is_truncated',
       'is_censored', 'is_dead', 'date_start_observed', 'date_end_observed'],
      dtype='object')
columns of YAML are: -Client id -age_start_observed -age_end -is_truncated -is_censored -is_dead -date_start_observed -date_end_observed


<hr>