In [6]:
pip install dask[dataframe] --upgrade

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
import pandas as pandas
import dask.dataframe as dd

In [15]:
%%time

data = pd.read_csv("Book2.csv")

print("Train size:", data.shape)

Train size: (1048575, 9)
CPU times: user 765 ms, sys: 84.4 ms, total: 849 ms
Wall time: 855 ms


In [16]:
data.head()

Unnamed: 0,row_id,timestamp,user_id,content_id,content_type_id,task_container_id,user_answer,answered_correctly,prior_question_elapsed_time
0,0,0.0,115,5692,0,1,3,1,
1,1,56943.0,115,5716,0,2,2,1,37000.0
2,2,118363.0,115,128,0,0,0,1,55000.0
3,3,131167.0,115,7860,0,3,0,1,19000.0
4,4,137965.0,115,7922,0,4,1,1,11000.0


DASK

In [22]:
%%time

dtypes = {
    "row_id": "int64",
    "timestamp": "int64",
    "user_id": "int32",
    "content_id": "int16",
    "content_type_id": "boolean",
    "task_container_id": "int16",
    "user_answer": "int8",
    "answered_correctly": "int8",
    "prior_question_elapsed_time": "float32"
}
data1 = dd.read_csv("Book2.csv", sample=100000000, dtype =dtypes).compute()

print("Train size:", data1.shape)

Train size: (1048575, 9)
CPU times: user 3.85 s, sys: 239 ms, total: 4.09 s
Wall time: 4.96 s


In [23]:
data1.head()

Unnamed: 0,row_id,timestamp,user_id,content_id,content_type_id,task_container_id,user_answer,answered_correctly,prior_question_elapsed_time
0,0,0,115,5692,False,1,3,1,
1,1,56943,115,5716,False,2,2,1,37000.0
2,2,118363,115,128,False,0,0,1,55000.0
3,3,131167,115,7860,False,3,0,1,19000.0
4,4,137965,115,7922,False,4,1,1,11000.0


In [26]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


In [38]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: Book2
table_name: testtable
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - row_id
    - timestamp
    - user_id 
    - content_id 
    - content_type_id 
    - task_container_id 
    - user_answer 
    - answered_correctly 
    - prior_question_elapsed_time 
     


Overwriting file.yaml


In [39]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [40]:
config_data['inbound_delimiter']

','

In [41]:
#inspecting data of config file
config_data

{'columns': ['row_id',
  'timestamp',
  'user_id',
  'content_id',
  'content_type_id',
  'task_container_id',
  'user_answer',
  'answered_correctly',
  'prior_question_elapsed_time'],
 'dataset_name': 'testfile',
 'file_name': 'Book2',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'testtable'}

In [42]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,row_id,timestamp,user_id,content_id,content_type_id,task_container_id,user_answer,answered_correctly,prior_question_elapsed_time
0,0,0.0,115,5692,0,1,3,1,
1,1,56943.0,115,5716,0,2,2,1,37000.0
2,2,118363.0,115,128,0,0,0,1,55000.0
3,3,131167.0,115,7860,0,3,0,1,19000.0
4,4,137965.0,115,7922,0,4,1,1,11000.0


In [43]:
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [44]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])


columns of files are: Index(['row_id', 'timestamp', 'user_id', 'content_id', 'content_type_id',
       'task_container_id', 'user_answer', 'answered_correctly',
       'prior_question_elapsed_time'],
      dtype='object')
columns of YAML are: ['row_id', 'timestamp', 'user_id', 'content_id', 'content_type_id', 'task_container_id', 'user_answer', 'answered_correctly', 'prior_question_elapsed_time']


In [45]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed
