# File Ingestion and Schema Validation

In [None]:
import os
import time

In [2]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

## File Reading

### Panads

In [3]:
import pandas as pd
start = time.time()
df = pd.read_csv('sales_data.csv')
end = time.time()
print("Time it took to read the file using panads: ",(end-start),"sec")

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


Time it took to read the file using panads:  13.105557918548584 sec


### Dask

In [4]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('sales_data.csv')
end = time.time()
print("Time it took to read the file using Dask: ",(end-start),"sec")

Time it took to read the file using Dask:  0.015810012817382812 sec


**Dask is more efficient on reading large datasets**

## Column Validation

In [5]:
df.columns

Index(['product_id', 'store_id', 'date', 'sales', 'revenue', 'stock', 'price',
       'promo_type_1', 'promo_bin_1', 'promo_type_2', 'promo_bin_2',
       'promo_discount_2', 'promo_discount_type_2'],
      dtype='object')

In [6]:
df.columns=df.columns.str.replace('[#,@,&]','')
df.columns = df.columns.str.replace(' ', '')
df.columns

Index(['product_id', 'store_id', 'date', 'sales', 'revenue', 'stock', 'price',
       'promo_type_1', 'promo_bin_1', 'promo_type_2', 'promo_bin_2',
       'promo_discount_2', 'promo_discount_type_2'],
      dtype='object')

## YAML file Creation

In [7]:
%%writefile file.yaml
file_type: csv
file_name: sales_data
inbound_delimiter: ","
columns: 
    - product_id
    - store_id
    - date
    - sales
    - revenue
    - stock
    - price
    - promo_type_1
    - promo_bin_1
    - promo_type_2
    - promo_bin_2
    - promo_discount_2
    - promo_discount_type_2

Overwriting file.yaml


In [8]:
import testutility as util
config_data = util.read_config_file("file.yaml")
config_data

{'file_type': 'csv',
 'file_name': 'sales_data',
 'inbound_delimiter': ',',
 'columns': ['product_id',
  'store_id',
  'date',
  'sales',
  'revenue',
  'stock',
  'price',
  'promo_type_1',
  'promo_bin_1',
  'promo_type_2',
  'promo_bin_2',
  'promo_discount_2',
  'promo_discount_type_2']}

## Reading the file dinamically

In [9]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file)
df.head()

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


Unnamed: 0,product_id,store_id,date,sales,revenue,stock,price,promo_type_1,promo_bin_1,promo_type_2,promo_bin_2,promo_discount_2,promo_discount_type_2
0,P0001,S0002,2017-01-02,0.0,0.0,8.0,6.25,PR14,,PR03,,,
1,P0001,S0012,2017-01-02,1.0,5.3,0.0,6.25,PR14,,PR03,,,
2,P0001,S0013,2017-01-02,2.0,10.59,0.0,6.25,PR14,,PR03,,,
3,P0001,S0023,2017-01-02,0.0,0.0,6.0,6.25,PR14,,PR03,,,
4,P0001,S0025,2017-01-02,0.0,0.0,1.0,6.25,PR14,,PR03,,,


## Dynamically Validating

In [10]:
util.col_header_val(df ,config_data)

column name and column length validation passed


1

In [11]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['product_id', 'store_id', 'date', 'sales', 'revenue', 'stock', 'price',
       'promo_type_1', 'promo_bin_1', 'promo_type_2', 'promo_bin_2',
       'promo_discount_2', 'promo_discount_type_2'],
      dtype='object')
columns of YAML are: ['product_id', 'store_id', 'date', 'sales', 'revenue', 'stock', 'price', 'promo_type_1', 'promo_bin_1', 'promo_type_2', 'promo_bin_2', 'promo_discount_2', 'promo_discount_type_2']


**Since we have the same columns on both files, the validation passed**

## Writing the file in pipe separated text file (|) in gz format

In [12]:
import csv
import gzip

# Write csv in gz format in pipe separated text file (|)
df.to_csv('sales_data.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          line_terminator='\n')


## File Summary

In [13]:
#Total number of rows of the file: 
print("The total number of rows: ", df.shape[0])
#Total number of columns of the file:
print("The number of columns: ", df.shape[1])
df.info()
print("File Size is :", os.path.getsize('sales_data.csv'), "bytes")

The total number of rows:  19454838
The number of columns:  13
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19454838 entries, 0 to 19454837
Data columns (total 13 columns):
 #   Column                 Dtype  
---  ------                 -----  
 0   product_id             object 
 1   store_id               object 
 2   date                   object 
 3   sales                  float64
 4   revenue                float64
 5   stock                  float64
 6   price                  float64
 7   promo_type_1           object 
 8   promo_bin_1            object 
 9   promo_type_2           object 
 10  promo_bin_2            object 
 11  promo_discount_2       float64
 12  promo_discount_type_2  object 
dtypes: float64(5), object(8)
memory usage: 1.9+ GB
File Size is : 1064515532 bytes
