# File Ingestion


In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [22]:
%%writefile file.yaml
file_type: csv
dataset_name: month_sale
file_name: test_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - eventtime
    - eventtype
    - productid
    - categoryid


Overwriting file.yaml


In [23]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [24]:
config_data['inbound_delimiter']

','

In [25]:
# inspecting data of config file
config_data

{'columns': ['eventtime', 'eventtype', 'productid', 'categoryid'],
 'dataset_name': 'month_sale',
 'file_name': 'test_data',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

In [26]:
# Normal reading process of the file
import pandas as pd
df = pd.read_csv("month_sale.csv",delimiter=',')
df.head()

Unnamed: 0,eventtime,eventtype,productid,categoryid,categorycode,brand,price,userid,usersession
0,2019-10-01 00:00:00 UTC,view,44600062,2.103807e+18,,shiseido,35.79,541312140.0,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2.053014e+18,appliances.environment.water_heater,aqua,33.2,554748717.0,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2.053014e+18,furniture.living_room.sofa,,543.1,519107250.0,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2.053014e+18,computers.notebook,lenovo,251.74,550050854.0,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2.053014e+18,electronics.smartphone,apple,1081.98,535871217.0,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


In [27]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,eventtime,eventtype,productid,categoryid
0,2019-10-01 00:00:00 UTC,view,44600062,2.103807e+18
1,2019-10-01 00:00:00 UTC,view,3900821,2.053014e+18
2,2019-10-01 00:00:01 UTC,view,17200506,2.053014e+18
3,2019-10-01 00:00:01 UTC,view,1307067,2.053014e+18
4,2019-10-01 00:00:04 UTC,view,1004237,2.053014e+18


In [28]:

# validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [29]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['eventtime', 'eventtype', 'productid', 'categoryid'], dtype='object')
columns of YAML are: ['eventtime', 'eventtype', 'productid', 'categoryid']


In [30]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipeline

column name and column length validation passed
col validation passed


In [45]:
df.to_csv('test_data_pipe',sep = '|')

In [47]:
import gzip
fp = open("test_data_pipe","rb")
data = fp.read()
bindata = bytearray(data)
with gzip.open("test_data.gzip","wb") as f:
    f.write(bindata)

In [48]:
with gzip.open("test_data.gzip", "rb") as f:
    data = f.read()
data

b'|eventtime|eventtype|productid|categoryid\n0|2019-10-01 03:09:05 UTC|view|5487421|2.05301355477624E+018\n1|2019-10-02 03:09:05 UTC|view|988975|2.05301355477624E+018\n2|2019-10-09 04:59:05 UTC|view|9987745|2.05301355477624E+018\n3|2019-10-21 03:09:05 UTC|view|11111222|2.05301355477624E+018\n'

In [49]:
df.describe()

Unnamed: 0,eventtime,eventtype,productid,categoryid
count,4,4,4,4.0
unique,4,1,4,1.0
top,2019-10-21 03:09:05 UTC,view,5487421,2.05301355477624e+18
freq,1,4,1,4.0


In [50]:
df.shape

(4, 4)

In [31]:
pd.read_csv("month_sale.csv")

Unnamed: 0,eventtime,eventtype,productid,categoryid,categorycode,brand,price,userid,usersession
0,2019-10-01 00:00:00 UTC,view,44600062,2.103807e+18,,shiseido,35.79,541312140.0,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2.053014e+18,appliances.environment.water_heater,aqua,33.20,554748717.0,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2.053014e+18,furniture.living_room.sofa,,543.10,519107250.0,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2.053014e+18,computers.notebook,lenovo,251.74,550050854.0,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2.053014e+18,electronics.smartphone,apple,1081.98,535871217.0,c6bd7419-2748-4c56-95b4-8cec9ff8b80d
...,...,...,...,...,...,...,...,...,...
30979,2019-10-01 03:09:05 UTC,view,12300622,2.053014e+18,construction.tools.drill,,90.09,512661902.0,c66cadda-e768-4a98-b97a-42d995cc710a
30980,2019-10-01 03:09:05 UTC,view,28715707,2.053014e+18,apparel.shoes.keds,puma,94.98,555468214.0,8200220c-1260-4c4b-864d-403f340e78c3
30981,2019-10-01 03:09:05 UTC,view,38900046,2.085719e+18,,,51.46,529440580.0,9bd11c77-99e4-4c93-9a12-8acf2d8af680
30982,2019-10-01 03:09:05 UTC,view,4400226,2.053014e+18,appliances.kitchen.coffee_machine,delonghi,175.01,526529128.0,7dc77cdc-7593-4e39-b959-d083bfa4ee56


In [33]:
df['categoryid'][0:4]

0    2.103807e+18
1    2.053014e+18
2    2.053014e+18
3    2.053014e+18
Name: categoryid, dtype: float64

In [35]:
df['productid'][3]

1307067

In [42]:
### Creating test file for this demo:
testdata = {
    'eventtime' : ['2019-10-01 03:09:05 UTC', '2019-10-02 03:09:05 UTC', '2019-10-09 04:59:05 UTC','2019-10-21 03:09:05 UTC'],
    'eventtype' : ['view', 'view', 'view','view'],
    'productid' : ['5487421','988975','9987745','11111222'],
    'categoryid': ['2.05301355477624E+018','2.05301355477624E+018','2.05301355477624E+018','2.05301355477624E+018']
}
import pandas as pd
df = pd.DataFrame(testdata, columns=['eventtime', 'eventtype','productid','categoryid'])
df.to_csv("./test_data.csv",index=False)

In [43]:
df

Unnamed: 0,eventtime,eventtype,productid,categoryid
0,2019-10-01 03:09:05 UTC,view,5487421,2.05301355477624e+18
1,2019-10-02 03:09:05 UTC,view,988975,2.05301355477624e+18
2,2019-10-09 04:59:05 UTC,view,9987745,2.05301355477624e+18
3,2019-10-21 03:09:05 UTC,view,11111222,2.05301355477624e+18


In [44]:
testdata

{'categoryid': ['2.05301355477624E+018',
  '2.05301355477624E+018',
  '2.05301355477624E+018',
  '2.05301355477624E+018'],
 'eventtime': ['2019-10-01 03:09:05 UTC',
  '2019-10-02 03:09:05 UTC',
  '2019-10-09 04:59:05 UTC',
  '2019-10-21 03:09:05 UTC'],
 'eventtype': ['view', 'view', 'view', 'view'],
 'productid': ['5487421', '988975', '9987745', '11111222']}