In [8]:
import time
import os
import numpy as np
import pandas as pd
from dask import dataframe as dd
import datatable as dt
import datetime
import csv
import gzip

**We use the New York taxi fare prediction dataset downloaded from kaggle which is about 5.7 GB in size (https://www.kaggle.com/competitions/new-york-city-taxi-fare-prediction/data?select=train.csv).**

### 1. Comparing different packages for reading in the data

#### 1.1 Pandas 

In [12]:
tic = time.time() #Start time

df = pd.read_csv("train.csv") #Read in the data

toc = time.time() #End time

print(f"Time necessary for reading in the data is {toc-tic:3f} s.")

Time necessary for reading in the data is 104.140306 s.


#### 1.2 Dask

In [11]:
tic = time.time() #Start time

df = dd.read_csv("train.csv") #Read in the data

toc = time.time() #End time

print(f"Time necessary for reading in the data is {toc-tic:.3f} s.")

Time necessary for reading in the data is 2.447 s.


#### 1.3 Datatable

In [4]:
tic = time.time() #Start time

df = dt.fread("train.csv") #Read in the data

toc = time.time() #End time

print(f"Time necessary for reading in the data is {toc-tic:3f} s.")

Time necessary for reading in the data is 7.102851 s.


**We choose Dask as the fastest**

### 2. Creating the util file

In [2]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    
    if len(df.columns) == len(expected_col) and list(sorted(expected_col))  == list(sorted(df.columns)):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


### 3. Creating the YAML file

In [3]:
%%writefile fare.yaml
file_type: csv
dataset_name: taxi_fare
file_name: train
table_name: taxi_fare
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - key
    - pickup_datetime
    - pickup_longitude
    - pickup_latitude
    - dropoff_longitude
    - dropoff_latitude
    - passenger_count
    - fare_amount

Overwriting fare.yaml


### 4. Validation

In [4]:
import testutility as util
config_data = util.read_config_file("fare.yaml")

In [5]:
config_data

{'file_type': 'csv',
 'dataset_name': 'taxi_fare',
 'file_name': 'train',
 'table_name': 'taxi_fare',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['key',
  'pickup_datetime',
  'pickup_longitude',
  'pickup_latitude',
  'dropoff_longitude',
  'dropoff_latitude',
  'passenger_count',
  'fare_amount']}

In [6]:
#Read in the file using dask and config_data
file_type = config_data['file_type']
source_file = config_data['file_name'] + f'.{file_type}'
df = dd.read_csv(source_file,sep=config_data['inbound_delimiter'])
df.head()

Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,2009-06-15 17:26:21.0000001,4.5,2009-06-15 17:26:21 UTC,-73.844311,40.721319,-73.84161,40.712278,1
1,2010-01-05 16:52:16.0000002,16.9,2010-01-05 16:52:16 UTC,-74.016048,40.711303,-73.979268,40.782004,1
2,2011-08-18 00:35:00.00000049,5.7,2011-08-18 00:35:00 UTC,-73.982738,40.76127,-73.991242,40.750562,2
3,2012-04-21 04:30:42.0000001,7.7,2012-04-21 04:30:42 UTC,-73.98713,40.733143,-73.991567,40.758092,1
4,2010-03-09 07:51:00.000000135,5.3,2010-03-09 07:51:00 UTC,-73.968095,40.768008,-73.956655,40.783762,1


In [7]:
#Now we validadate
util.col_header_val(df,config_data)

column name and column length validation passed


1

### 5. Create a gzip file

In [10]:
df.to_csv('train.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\00.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\01.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\02.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\03.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\04.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\05.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\06.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\07.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\08.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\09.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\10.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\11.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\train.csv.gz\\12.part',
 'C:\\Users\\tsnik\\DGInternship\\Nikola\\Week6\\tr

### 5. Create a summary file

In [26]:
def get_dir_size(path='.'):
    total = 0
    with os.scandir(path) as it:
        for entry in it:
            if entry.is_file():
                total += entry.stat().st_size
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total/(1024**3)

In [30]:
no_rows = df.shape[0].compute()
no_cols = df.shape[1]
file_size = get_dir_size("train.csv.gz")

In [39]:
with open("summary.txt","w") as summ:
    summ.write(f"rows: {no_rows}\ncolumns: {no_cols}\nsize (GB): {file_size}")