In [4]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [5]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: 2019-Oct
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - event_type
    - brand
    - category_code
    - category_id
    - event_time
    - product_id
    - row_id

Overwriting file.yaml


In [14]:
import testutility as util
import vaex
config_data = util.read_config_file("file.yaml")
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': '2019-Oct',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['event_type',
  'brand',
  'category_code',
  'category_id',
  'event_time',
  'product_id',
  'row_id']}

In [3]:
import dask.dataframe as dd
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = dd.read_csv(source_file)
print(df.head())

                event_time event_type  product_id          category_id  \
0  2019-10-01 00:00:00 UTC       view    44600062  2103807459595387724   
1  2019-10-01 00:00:00 UTC       view     3900821  2053013552326770905   
2  2019-10-01 00:00:01 UTC       view    17200506  2053013559792632471   
3  2019-10-01 00:00:01 UTC       view     1307067  2053013558920217191   
4  2019-10-01 00:00:04 UTC       view     1004237  2053013555631882655   

                         category_code     brand    price    user_id  \
0                                  NaN  shiseido    35.79  541312140   
1  appliances.environment.water_heater      aqua    33.20  554748717   
2           furniture.living_room.sofa       NaN   543.10  519107250   
3                   computers.notebook    lenovo   251.74  550050854   
4               electronics.smartphone     apple  1081.98  535871217   

                           user_session  
0  72d76fde-8bb3-4e00-8c23-a032dfed738c  
1  9333dfbd-b87a-4708-9857-6336556b0fc

In [5]:
df.info(memory_usage=True)

<class 'dask.dataframe.core.DataFrame'>
Columns: 9 entries, event_time to user_session
dtypes: object(5), float64(1), int64(3)
memory usage: 2.8 GB


In [15]:
#validate the header of the file
util.col_header_val(df,config_data)


column name and column length validation failed
Following File columns are not in the YAML file ['user_id', 'user_session', 'price']
Following YAML columns are not in the file uploaded ['row_id']


0

In [6]:
import dask.dataframe as dd
# saving file in pipe seperated text file
file_type = config_data['file_type']
output_path = "./" + config_data['file_name'] + '_new' + f'.{file_type}'
seperator = config_data['outbound_delimiter']

df.to_csv(output_path, sep=seperator, single_file=True, index=False)  

['/content/2019-Oct_new.csv']

In [7]:
data = dd.read_csv('2019-Oct_new.csv', sep=seperator)

In [31]:
print('File Summary:\n ', data.info(memory_usage=True) , '\nNumber of rows: ' , len(data.index))

<class 'dask.dataframe.core.DataFrame'>
Columns: 9 entries, event_time to user_session
dtypes: object(5), float64(1), int64(3)
memory usage: 2.8 GB
File Summary:
  None 
Number of rows:  42448764
