In [1]:
import dask.dataframe as dd
import pandas as pd

# Reading the file

In [2]:
file_name = 'data/KernelVersionOutputFiles.csv'

## Dask

In [3]:
%%timeit -r 3 -n 1 -t

df = dd.read_csv(file_name, sep=',')

13 ms ± 4.33 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [5]:
df = dd.read_csv(file_name, sep=',', dtype={'CompressionTypeExtension': 'object'})

In [6]:
df.shape

(Delayed('int-88cabe91-7321-4144-a8aa-9be05c96d375'), 6)

In [5]:
df.head()

Unnamed: 0,Id,KernelVersionId,FileName,ContentLength,ContentTypeExtension,CompressionTypeExtension
0,1,2,DigitsEmbedding.png,95544,.png,
1,47,59,digit.png,52771,.png,
2,48,59,digit2.png,7393,.png,
3,65,70,digit.png,52771,.png,
4,66,70,digit2.png,7393,.png,


In [6]:
df.columns

Index(['Id', 'KernelVersionId', 'FileName', 'ContentLength',
       'ContentTypeExtension', 'CompressionTypeExtension'],
      dtype='object')

In [7]:
df.tail()

Unnamed: 0,Id,KernelVersionId,FileName,ContentLength,ContentTypeExtension,CompressionTypeExtension
617378,1308429688,63943610,__results___files/__results___22_0.png,7772,.png,
617379,1308429689,63943610,custom.css,0,.css,
617380,1308429690,63943610,maevsepoch.jpg,2571,.jpg,
617381,1308429691,63943610,msevsepoch.jpg,2571,.jpg,
617382,1308541184,63943610,__resultx__.html,352598,.html,


## Modin(ray)

import modin.pandas as mpd
import os

os.environ["MODIN_ENGINE"] = "ray"

%%timeit -r 3 -n 1 -t

import ray

ray.shutdown()

ray.init()

df1 = mpd.read_csv(file_name, sep=',', usecols = ['Id', 'KernelVersionId', 'FileName', 'ContentLength', 'ContentTypeExtension'])

ray.shutdown()

## Pandas

%%timeit -r 3 -n 1 -t

df2 = pd.read_csv(file_name, sep=',', dtype={'CompressionTypeExtension': 'object'}, usecols = ['Id', 'KernelVersionId', 'FileName', 'ContentLength', 'ContentTypeExtension'])

df2 = pd.read_csv(file_name, sep=',', dtype={'CompressionTypeExtension': 'object'}, usecols = ['Id', 'KernelVersionId', 'FileName', 'ContentLength', 'ContentTypeExtension'])
df2.head()

### Pandas and Modin(ray) consume all the resources when reading the csv file since it seems is too long for them. Also, that's the reason why I have not been able to convert the df to a pandas dataframe afterwards and had to change a few lines of code from the testutility

# testutility

In [7]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df, table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df_columns = sorted(df.columns)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df_columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df_columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df_columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


# Writing yaml file

In [8]:
%%writefile file.yaml
file_type: csv
file_name: KernelVersionOutputFiles
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
dtype: {'CompressionTypeExtension': 'object'}
columns: 
    - id
    - kernelversionid
    - filename
    - contentlength
    - contenttypeextension
    - compressiontypeextension

Overwriting file.yaml


In [9]:
import testutility as util
config_data = util.read_config_file("file.yaml")

In [10]:
file_path = './data/' + config_data['file_name'] + '.' + config_data['file_type']

In [11]:
df = dd.read_csv(file_path, sep=config_data['inbound_delimiter'], dtype=config_data['dtype'])

In [14]:
df_test = dd.read_csv('data/DatasetVotes.csv', sep=',')

In [12]:
df.head()

Unnamed: 0,Id,KernelVersionId,FileName,ContentLength,ContentTypeExtension,CompressionTypeExtension
0,1,2,DigitsEmbedding.png,95544,.png,
1,47,59,digit.png,52771,.png,
2,48,59,digit2.png,7393,.png,
3,65,70,digit.png,52771,.png,
4,66,70,digit2.png,7393,.png,


In [13]:
util.col_header_val(df, config_data)

column name and column length validation passed


1

In [15]:
util.col_header_val(df_test, config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['datasetversionid', 'userid', 'votedate']
Following YAML columns are not in the file uploaded ['compressiontypeextension', 'filename', 'contentlength', 'kernelversionid', 'contenttypeextension']


0

In [16]:
if util.col_header_val(df, config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [None]:
df.to_csv("file.csv.gz", index=False, compression="gzip")