<a href="https://colab.research.google.com/github/anshimathur0325/DataIngestion/blob/main/Data_ingestion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

#Read file function

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

#returns a string

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

#format column names and remove whitespace (Helps with matching YAML column names)

def col_header_val(df,table_config):
    
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [None]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: toy_dataset
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Number
    - City
    - Gender
    - Age
    - Income
    - Illness

Overwriting file.yaml


In [None]:
import testutility as util


In [None]:
#configure the data with the YAML file
config_data = util.read_config_file("file.yaml")
config_data['inbound_delimiter']


','

In [None]:
#config_data contents.
config_data

{'columns': ['Number', 'City', 'Gender', 'Age', 'Income', 'Illness'],
 'dataset_name': 'testfile',
 'file_name': 'toy_dataset',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

In [None]:
#import csv as a pandas dataframe
import pandas as pd
df_sample = pd.read_csv("toy_dataset.csv",delimiter=',')
df_sample.head()

Unnamed: 0,Number,City,Gender,Age,Income,Illness
0,1,Dallas,Male,41,40367.0,No
1,2,Dallas,Male,54,45084.0,No
2,3,Dallas,Male,42,52483.0,No
3,4,Dallas,Male,40,40941.0,No
4,5,Dallas,Male,46,50289.0,No


In [None]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,Number,City,Gender,Age,Income,Illness
0,1,Dallas,Male,41,40367.0,No
1,2,Dallas,Male,54,45084.0,No
2,3,Dallas,Male,42,52483.0,No
3,4,Dallas,Male,40,40941.0,No
4,5,Dallas,Male,46,50289.0,No


In [None]:
#Verify the columns of config_data and df if they match. 
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [None]:
#Shows that the columns all match between df and config_data
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['number', 'city', 'gender', 'age', 'income', 'illness'], dtype='object')
columns of YAML are: ['Number', 'City', 'Gender', 'Age', 'Income', 'Illness']


In [None]:
#Shows that the columns match, printing validation passed. 
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")


column name and column length validation passed
col validation passed


In [None]:
#Convert toy_dataset.csv to a pipe separated textfile
import csv
with open('toy_dataset.csv') as fin:
    with open('output.txt', 'w', newline='') as fout:
        reader = csv.DictReader(fin, delimiter=',')
        writer = csv.DictWriter(fout, reader.fieldnames, delimiter='|')
        writer.writeheader()
        writer.writerows(reader)


In [None]:
#Save pipe separated textfiles as a gzp format
import gzip
import shutil
with open('output.txt', 'rb') as f_in:
    with gzip.open('output.txt.gz', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

In [139]:
#Row and column data with file size. 

print("Amount of Rows:",df.shape[0])
print("Amount of Columns:",df.shape[1])
size = os.path.getsize('output.txt') 
print("Size of file is", size, "bytes")

Amount of Rows: 150000
Amount of Columns: 6
Size of file is 7124464 bytes
