In [None]:
# Created by: Jess Gallo
# Date Created: 09/07/2022
# Last Modified: 09/10/2022
# Description: Data Glacier Week 6 File Investigation and Schema Validation.

# Task

Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

Read the file ( Present approach of reading the file )

Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

Perform basic validation on data columns : eg: remove special character , white spaces from the col name

As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML

Validate number of columns and column name of ingested file with YAML.

Write the file in pipe separated text file (|) in gz format.

Create a summary of the file:

Total number of rows,

total number of columns

file size


In [1]:
# Libraries
import os
import time
import pandas as pd

In [2]:
# Size of the file
import os
os.path.getsize('C:/Users/Gallo/Downloads/used_cars_data.csv')

9980208148

# Pandas

In [1]:
filename = r'C:/Users/Gallo/Downloads/used_cars_data.csv'

In [6]:
%time p_temp = pd.read_csv(filename, encoding = 'ISO-8859-1')



Wall time: 2min 3s


# Dask

In [3]:
import dask.dataframe as dd

%time d_temp = dd.read_csv(filename, encoding = 'ISO-8859-1')

Wall time: 195 ms


# Modin & Ray

In [None]:
os.environ["MODIN_ENGINE"] = "ray"

import ray
ray.shutdown()
ray.init()

import modin.pandas as pd

%time m_temp = pd.read_csv(filename)

2022-09-18 18:41:16,241	INFO services.py:1338 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


# Dask has the best computational efficiency

In [4]:
df = d_temp

In [6]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 66 entries, vin to year
dtypes: object(39), bool(2), float64(19), int64(6)

In [7]:
len(df.columns)

66

In [8]:
# removes special characters
df.columns = df.columns.str.replace('[#,@,&]','')

  df.columns=df.columns.str.replace('[#,@,&]','')


In [9]:
# removes white spaces from columns
df.columns = df.columns.str.replace(' ', '')

In [10]:
data = df.columns
data

Index(['vin', 'back_legroom', 'bed', 'bed_height', 'bed_length', 'body_type',
       'cabin', 'city', 'city_fuel_economy', 'combine_fuel_economy',
       'daysonmarket', 'dealer_zip', 'description', 'engine_cylinders',
       'engine_displacement', 'engine_type', 'exterior_color', 'fleet',
       'frame_damaged', 'franchise_dealer', 'franchise_make', 'front_legroom',
       'fuel_tank_volume', 'fuel_type', 'has_accidents', 'height',
       'highway_fuel_economy', 'horsepower', 'interior_color', 'isCab',
       'is_certified', 'is_cpo', 'is_new', 'is_oemcpo', 'latitude', 'length',
       'listed_date', 'listing_color', 'listing_id', 'longitude',
       'main_picture_url', 'major_options', 'make_name', 'maximum_seating',
       'mileage', 'model_name', 'owner_count', 'power', 'price', 'salvage',
       'savings_amount', 'seller_rating', 'sp_id', 'sp_name', 'theft_title',
       'torque', 'transmission', 'transmission_display', 'trimId', 'trim_name',
       'vehicle_damage_category', 'whe

# Schema Validation

In [14]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [3]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_', regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: x.replace(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


# Write YAML File

In [4]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: used_cars_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - vin
    - back_legroom
    - bed
    - bed_height
    - bed_length
    - body_type
    - cabin
    - city
    - city_fuel_economy
    - combine_fuel_economy
    - daysonmarket
    - dealer_zip
    - description
    - engine_cylinders
    - engine_displacement
    - engine_type
    - exterior_color
    - fleet
    - frame_damaged
    - franchise_dealer
    - franchise_make
    - front_legroom
    - fuel_tank_volume
    - fuel_type
    - has_accidents
    - height
    - highway_fuel_economy
    - horsepower
    - interior_color
    - isCab
    - is_certified
    - is_cpo
    - is_new
    - is_oemcpo
    - latitude
    - length
    - listed_date
    - listing_color
    - listing_id
    - longitude
    - main_picture_url
    - major_options
    - make_name
    - maximum_seating
    - mileage
    - model_name
    - owner_count
    - power
    - price
    - salvag
    - savings_amount
    - seller_rating
    - sp_id
    - sp_name
    - theft_title
    - torque
    - transmission
    - transmission_display
    - trimId
    - trim_name
    - vehicle_damage_category
    - wheel_system
    - wheel_system_display
    - wheelbase
    - width
    - year

Overwriting file.yaml


In [5]:
# Read config file
import utility as util

with open('file.yaml') as stream:
    config_data = yaml.safe_load(stream)

In [6]:
config_data['inbound_delimiter']

','

In [7]:
# inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'used_cars_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['vin',
  'back_legroom',
  'bed',
  'bed_height',
  'bed_length',
  'body_type',
  'cabin',
  'city',
  'city_fuel_economy',
  'combine_fuel_economy',
  'daysonmarket',
  'dealer_zip',
  'description',
  'engine_cylinders',
  'engine_displacement',
  'engine_type',
  'exterior_color',
  'fleet',
  'frame_damaged',
  'franchise_dealer',
  'franchise_make',
  'front_legroom',
  'fuel_tank_volume',
  'fuel_type',
  'has_accidents',
  'height',
  'highway_fuel_economy',
  'horsepower',
  'interior_color',
  'isCab',
  'is_certified',
  'is_cpo',
  'is_new',
  'is_oemcpo',
  'latitude',
  'length',
  'listed_date',
  'listing_color',
  'listing_id',
  'longitude',
  'main_picture_url',
  'major_options',
  'make_name',
  'maximum_seating',
  'mileage',
  'model_name',
  'owner_count',
  'power'

In [8]:
# reading process of the file using Dask
from dask import dataframe as dd
df_sample = dd.read_csv(filename)

In [9]:
# reading the file using config file
file_type = config_data['file_type']
source_file = 'C:/Users/Gallo/Downloads/' + config_data['file_name'] + f'.{file_type}'

In [10]:
import pandas as pd
df = pd.read_csv(source_file, config_data['inbound_delimiter'])

  exec(code_obj, self.user_global_ns, self.user_ns)
  df = pd.read_csv(source_file, config_data['inbound_delimiter'])


In [11]:
# validate the header of the file
util.col_header_val(df_sample, config_data)

AttributeError: 'DataFrame' object has no attribute 'reindex'

In [12]:
print("columns of files are:" , df.columns)
print("columns of YAML are:" , config_data['columns'])

columns of files are: Index(['vin', 'back_legroom', 'bed', 'bed_height', 'bed_length', 'body_type',
       'cabin', 'city', 'city_fuel_economy', 'combine_fuel_economy',
       'daysonmarket', 'dealer_zip', 'description', 'engine_cylinders',
       'engine_displacement', 'engine_type', 'exterior_color', 'fleet',
       'frame_damaged', 'franchise_dealer', 'franchise_make', 'front_legroom',
       'fuel_tank_volume', 'fuel_type', 'has_accidents', 'height',
       'highway_fuel_economy', 'horsepower', 'interior_color', 'isCab',
       'is_certified', 'is_cpo', 'is_new', 'is_oemcpo', 'latitude', 'length',
       'listed_date', 'listing_color', 'listing_id', 'longitude',
       'main_picture_url', 'major_options', 'make_name', 'maximum_seating',
       'mileage', 'model_name', 'owner_count', 'power', 'price', 'salvage',
       'savings_amount', 'seller_rating', 'sp_id', 'sp_name', 'theft_title',
       'torque', 'transmission', 'transmission_display', 'trimId', 'trim_name',
       'vehicle_

In [15]:
if util.col_header_val(df, config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation failed
Following File columns are not in the YAML file ['_']
Following YAML columns are not in the file uploaded ['bed_height', 'iscab', 'maximum_seating', 'has_accidents', 'city', 'owner_count', 'interior_color', 'model_name', 'latitude', 'longitude', 'price', 'salvag', 'front_legroom', 'vin', 'is_certified', 'is_cpo', 'bed', 'is_new', 'is_oemcpo', 'major_options', 'seller_rating', 'sp_name', 'transmission', 'main_picture_url', 'engine_type', 'exterior_color', 'listed_date', 'description', 'power', 'engine_cylinders', 'torque', 'year', 'franchise_dealer', 'wheelbase', 'cabin', 'listing_color', 'length', 'body_type', 'bed_length', 'make_name', 'height', 'franchise_make', 'fleet', 'horsepower', 'city_fuel_economy', 'wheel_system_display', 'frame_damaged', 'highway_fuel_economy', 'vehicle_damage_category', 'transmission_display', 'combine_fuel_economy', 'daysonmarket', 'fuel_type', 'fuel_tank_volume', 'mileage', 'listing_id', 'engine_displacement'

NameError: name 'logging' is not defined

In [17]:
pd.read_csv('C:/Users/Gallo/Downloads/used_cars_data.csv')

  pd.read_csv('C:/Users/Gallo/Downloads/used_cars_data.csv')


Unnamed: 0,vin,back_legroom,bed,bed_height,bed_length,body_type,cabin,city,city_fuel_economy,combine_fuel_economy,...,transmission,transmission_display,trimId,trim_name,vehicle_damage_category,wheel_system,wheel_system_display,wheelbase,width,year
0,ZACNJABB5KPJ92081,35.1 in,,,,SUV / Crossover,,Bayamon,,,...,A,9-Speed Automatic Overdrive,t83804,Latitude FWD,,FWD,Front-Wheel Drive,101.2 in,79.6 in,2019
1,SALCJ2FX1LH858117,38.1 in,,,,SUV / Crossover,,San Juan,,,...,A,9-Speed Automatic Overdrive,t86759,S AWD,,AWD,All-Wheel Drive,107.9 in,85.6 in,2020
2,JF1VA2M67G9829723,35.4 in,,,,Sedan,,Guaynabo,17.0,,...,M,6-Speed Manual,t58994,Base,,AWD,All-Wheel Drive,104.3 in,78.9 in,2016
3,SALRR2RV0L2433391,37.6 in,,,,SUV / Crossover,,San Juan,,,...,A,8-Speed Automatic Overdrive,t86074,V6 HSE AWD,,AWD,All-Wheel Drive,115 in,87.4 in,2020
4,SALCJ2FXXLH862327,38.1 in,,,,SUV / Crossover,,San Juan,,,...,A,9-Speed Automatic Overdrive,t86759,S AWD,,AWD,All-Wheel Drive,107.9 in,85.6 in,2020
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3000035,2GNAXJEV0J6261526,39.7 in,,,,SUV / Crossover,,Fairfield,26.0,,...,A,Automatic,t72936,1.5T LT FWD,,FWD,Front-Wheel Drive,107.3 in,72.6 in,2018
3000036,1GNERFKW0LJ225508,38.4 in,,,,SUV / Crossover,,Vallejo,18.0,,...,A,Automatic,t85763,LS FWD,,FWD,Front-Wheel Drive,120.9 in,78.6 in,2020
3000037,3FA6P0HD3GR134062,38.3 in,,,,Sedan,,Napa,,,...,A,6-Speed Automatic Overdrive,t57569,SE,,FWD,Front-Wheel Drive,112.2 in,83.5 in,2016
3000038,SAJAJ4BNXHA968809,35 in,,,,Sedan,,Fairfield,30.0,,...,A,Automatic,t65977,20d Premium AWD,,AWD,All-Wheel Drive,111.6 in,81.7 in,2017


In [18]:
df

Unnamed: 0,_,_.1,_.2,_.3,_.4,_.5,_.6,_.7,_.8,_.9,...,_.10,_.11,_.12,_.13,_.14,_.15,_.16,_.17,_.18,_.19
0,ZACNJABB5KPJ92081,35.1 in,,,,SUV / Crossover,,Bayamon,,,...,A,9-Speed Automatic Overdrive,t83804,Latitude FWD,,FWD,Front-Wheel Drive,101.2 in,79.6 in,2019
1,SALCJ2FX1LH858117,38.1 in,,,,SUV / Crossover,,San Juan,,,...,A,9-Speed Automatic Overdrive,t86759,S AWD,,AWD,All-Wheel Drive,107.9 in,85.6 in,2020
2,JF1VA2M67G9829723,35.4 in,,,,Sedan,,Guaynabo,17.0,,...,M,6-Speed Manual,t58994,Base,,AWD,All-Wheel Drive,104.3 in,78.9 in,2016
3,SALRR2RV0L2433391,37.6 in,,,,SUV / Crossover,,San Juan,,,...,A,8-Speed Automatic Overdrive,t86074,V6 HSE AWD,,AWD,All-Wheel Drive,115 in,87.4 in,2020
4,SALCJ2FXXLH862327,38.1 in,,,,SUV / Crossover,,San Juan,,,...,A,9-Speed Automatic Overdrive,t86759,S AWD,,AWD,All-Wheel Drive,107.9 in,85.6 in,2020
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3000035,2GNAXJEV0J6261526,39.7 in,,,,SUV / Crossover,,Fairfield,26.0,,...,A,Automatic,t72936,1.5T LT FWD,,FWD,Front-Wheel Drive,107.3 in,72.6 in,2018
3000036,1GNERFKW0LJ225508,38.4 in,,,,SUV / Crossover,,Vallejo,18.0,,...,A,Automatic,t85763,LS FWD,,FWD,Front-Wheel Drive,120.9 in,78.6 in,2020
3000037,3FA6P0HD3GR134062,38.3 in,,,,Sedan,,Napa,,,...,A,6-Speed Automatic Overdrive,t57569,SE,,FWD,Front-Wheel Drive,112.2 in,83.5 in,2016
3000038,SAJAJ4BNXHA968809,35 in,,,,Sedan,,Fairfield,30.0,,...,A,Automatic,t65977,20d Premium AWD,,AWD,All-Wheel Drive,111.6 in,81.7 in,2017


# Test File

In [None]:
# Creating test file for this demo:
testdata = {
    'city' : ['Delhi', 'Lima', 'Istanbul','Riyadh'],
    'age' : [34, 30, 16,33],
    'Country' : ['India','Peru','Turkey','Saudi Arabia']
}
import pandas as pd
df = pd.DataFrame(testdata, columns=['city', 'age','Country'])
df.to_csv('used_cars_data.csv',index=False)

In [None]:
df

In [None]:
df