### Writing YAML file

In [None]:
%%writefile file.yaml
file_type: csv
dataset_name: dataset
file_name: dataset
inbound_delimiter: ","
outbound_delimiter: "|"
columns: 
    - event_time
    - event_type
    - product_id
    - category_id
    - category_code
    - brand
    - price
    - user_id
    - user_session

### Preliminary functions

In [None]:
## Writing Preliminary Functions

import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config,msg=False):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = {x.lower() for x in table_config['columns']}
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df_col = {x for x in df.columns}
    if len(df_col) == len(expected_col) and expected_col  == df_col:
        if msg==True:
            print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

In [None]:
# Reading YAML file

config_data = read_config_file("file.yaml")

In [None]:
config_data['columns']

In [None]:
# Reading the file

import dask.dataframe as dd

filetype = config_data['file_type']
source_file = config_data['file_name'] + f'.{filetype}'
df = dd.read_csv(source_file)

In [None]:
df.head(10)

In [None]:
# Validating column headers

col_header_val(df,config_data)

In [None]:
if col_header_val(df,config_data)==0:
    print("Validation failed")
    print("Please check configuration file and enter correct parameters")
    print("OR upload the correct dataset which conforms to given parameters")
else:
    print("Validation passed")
    # Program may proceed
    display(df.head(10))

### Writing file to .gz format

In [None]:
import csv

with open('dataset.csv') as fin:
    with open('dataset.txt', 'w', newline='') as fout:
        reader = csv.DictReader(fin, delimiter=config_data['inbound_delimiter'])
        writer = csv.DictWriter(fout, reader.fieldnames, delimiter=config_data['outbound_delimiter'])
        writer.writeheader()
        writer.writerows(reader)

In [None]:
fp = open("dataset.txt","rb")
data = fp.read()
bindata = bytearray(data)

In [None]:
import gzip
with gzip.open("dataset.txt.gz", "wb") as f:
    f.write(bindata)