## Task: File Ingestion and Schema validation

 • Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

 • Read the file ( Present approach of reading the file )

 • Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

 • Perform basic validation on data columns : eg: remove special character , white spaces from the col name

 • As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of
read and write file, column name in YAML

 • Validate number of columns and column name of ingested file with YAML.

 • Write the file in pipe separated text file (|) in gz format.

 • Create a summary of the file:

    Total number of rows,

    total number of columns

    file size

In [125]:
import os
import time

In [126]:
#Size of the file
os.path.getsize('C:\\Users\\Admin\\Desktop\\Data Glacier\\Week6\\carsales.csv')

1901641188

## Reading the data with Pandas

In [3]:
import pandas as pd

In [8]:
%%time
df = pd.read_csv('carsales.csv')

CPU times: total: 47.3 s
Wall time: 49 s


In [7]:
start = time.time()
df = pd.read_csv('carsales.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  50.706236124038696 sec


## Reading the data with modin and ray

In [12]:
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv('carsales.csv')
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

2023-04-04 15:28:05,145	INFO worker.py:1553 -- Started a local Ray instance.


Read csv with modin and ray:  73.02560472488403 sec


In [14]:
%%time
df = pd.read_csv('carsales.csv')

CPU times: total: 8.47 s
Wall time: 1min 41s


## Reading the data with dask

In [19]:
from dask import dataframe as dd

In [18]:
%%time
dask_df = dd.read_csv('carsales.csv')

CPU times: total: 15.6 ms
Wall time: 87.1 ms


In [21]:
start = time.time()
dask_df = dd.read_csv('carsales.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.020333051681518555 sec


### Each of the libraries has different strengths, weaknesses, and scaling strategies. From the above reading with data, dask in faster.It reads CSV files(Size of 2GB) faster than Pandas,Modin and ray.

### So as a conclusion,Dask is better than pandas and modin with computational time of 0.025 s, whereas the other two took 73 s and 50 s respectively.

## Dask is using to read the file and performing ingestion

In [27]:
df = dd.read_csv('carsales.csv', delimiter = ',')
df.head()

Unnamed: 0,brand,name,bodyType,color,fuelType,year,mileage,transmission,power,price,vehicleConfiguration,engineName,engineDisplacement,date,location,link,description,parse_date
0,Fiat,124 Spider,Открытый,Синий,Бензин,,8000.0,Автомат,,1830000,,,,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/fiat/124_spider/47...,Только сегодня с таможни забрали Как новый! Кр...,2022-08-20 04:00:00
1,BMW,i3,Хэтчбек 5 дв.,Черный,Электро,,12000.0,Автомат,,1830000,,,,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/bmw/i3/47958301.html,"Электричка +двс V-600cc ,как новая BMW i3 с ге...",2022-08-20 04:00:00
2,Mercedes-Benz,GLE Coupe,Джип 5 дв.,Бордовый,Бензин,2015.0,57000.0,АКПП,367.0,4600000,450 AMG 4MATIC Особая серия,M 276 DE 30 AL,3.0 LTR,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/mercedes-benz/gle_...,Отличное состояние автомобиля. Комплектация «G...,2022-08-20 04:00:00
3,Mercedes-Benz,G-Class,Джип 5 дв.,Черный,Бензин,2002.0,200000.0,АКПП,296.0,2999999,G 500,M 113 E 50,5.0 LTR,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/mercedes-benz/g-cl...,Продам правильный гелик. Лучшая машина. Полнос...,2022-08-20 04:00:00
4,Audi,Q7,Джип 5 дв.,Белый,Бензин,,67000.0,Автомат,252.0,3300000,,,,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/audi/q7/46498184.html,Audi Q7 II S line (252л.с.) 4WD Внедорожник 5 ...,2022-08-20 04:00:00


In [28]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 18 entries, brand to parse_date
dtypes: object(14), float64(3), int64(1)

In [29]:
print("Number of rows: ", len(df.index))
print("Number of columns : {}".format(len(df.columns)))

Number of rows:  1513200
Number of columns : 18


In [30]:
#To check white spaces and special characters
df.columns

Index(['brand', 'name', 'bodyType', 'color', 'fuelType', 'year', 'mileage',
       'transmission', 'power', 'price', 'vehicleConfiguration', 'engineName',
       'engineDisplacement', 'date', 'location', 'link', 'description',
       'parse_date'],
      dtype='object')

Special characters are there in the column name(parse_date),we can remove that.

In [79]:
# remove special character
df.columns=df.columns.str.replace('[#,@,_]','')

#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [36]:
df.columns

Index(['brand', 'name', 'bodyType', 'color', 'fuelType', 'year', 'mileage',
       'transmission', 'power', 'price', 'vehicleConfiguration', 'engineName',
       'engineDisplacement', 'date', 'location', 'link', 'description',
       'parsedate'],
      dtype='object')

## Creating Utility file

In [5]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [6]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: carsales
table_name: sales
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - brand
    - name
    - bodyType
    - color
    - fuelType
    - year
    - mileage
    - transmission
    - power
    - price
    - vehicleConfiguration
    - engineName
    - engineDisplacement
    - date
    - location
    - link
    - description
    - parsedate

Overwriting file.yaml


In [7]:
# Read config file
import utility as util
config_data = util.read_config_file("file.yaml")
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'carsales',
 'table_name': 'sales',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['brand',
  'name',
  'bodyType',
  'color',
  'fuelType',
  'year',
  'mileage',
  'transmission',
  'power',
  'price',
  'vehicleConfiguration',
  'engineName',
  'engineDisplacement',
  'date',
  'location',
  'link',
  'description',
  'parsedate']}

In [8]:
config_data['file_name']

'carsales'

In [30]:
# Reading process of the file using Dask
from dask import dataframe as dd
df = dd.read_csv('carsales.csv',delimiter=',')
df.head()

Unnamed: 0,brand,name,bodyType,color,fuelType,year,mileage,transmission,power,price,vehicleConfiguration,engineName,engineDisplacement,date,location,link,description,parse_date
0,Fiat,124 Spider,Открытый,Синий,Бензин,,8000.0,Автомат,,1830000,,,,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/fiat/124_spider/47...,Только сегодня с таможни забрали Как новый! Кр...,2022-08-20 04:00:00
1,BMW,i3,Хэтчбек 5 дв.,Черный,Электро,,12000.0,Автомат,,1830000,,,,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/bmw/i3/47958301.html,"Электричка +двс V-600cc ,как новая BMW i3 с ге...",2022-08-20 04:00:00
2,Mercedes-Benz,GLE Coupe,Джип 5 дв.,Бордовый,Бензин,2015.0,57000.0,АКПП,367.0,4600000,450 AMG 4MATIC Особая серия,M 276 DE 30 AL,3.0 LTR,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/mercedes-benz/gle_...,Отличное состояние автомобиля. Комплектация «G...,2022-08-20 04:00:00
3,Mercedes-Benz,G-Class,Джип 5 дв.,Черный,Бензин,2002.0,200000.0,АКПП,296.0,2999999,G 500,M 113 E 50,5.0 LTR,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/mercedes-benz/g-cl...,Продам правильный гелик. Лучшая машина. Полнос...,2022-08-20 04:00:00
4,Audi,Q7,Джип 5 дв.,Белый,Бензин,,67000.0,Автомат,252.0,3300000,,,,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/audi/q7/46498184.html,Audi Q7 II S line (252л.с.) 4WD Внедорожник 5 ...,2022-08-20 04:00:00


In [10]:
#Reading the file using config file
file_type = config_data['file_type']
source_file = "C:\\Users\\Admin\\Desktop\Data Glacier\Week6/" + config_data['file_name'] + f'.{file_type}'

In [31]:
import pandas as pd
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  df = pd.read_csv(source_file,config_data['inbound_delimiter'])


Unnamed: 0,brand,name,bodyType,color,fuelType,year,mileage,transmission,power,price,vehicleConfiguration,engineName,engineDisplacement,date,location,link,description,parse_date
0,Fiat,124 Spider,Открытый,Синий,Бензин,,8000.0,Автомат,,1830000,,,,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/fiat/124_spider/47...,Только сегодня с таможни забрали Как новый! Кр...,2022-08-20 04:00:00
1,BMW,i3,Хэтчбек 5 дв.,Черный,Электро,,12000.0,Автомат,,1830000,,,,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/bmw/i3/47958301.html,"Электричка +двс V-600cc ,как новая BMW i3 с ге...",2022-08-20 04:00:00
2,Mercedes-Benz,GLE Coupe,Джип 5 дв.,Бордовый,Бензин,2015.0,57000.0,АКПП,367.0,4600000,450 AMG 4MATIC Особая серия,M 276 DE 30 AL,3.0 LTR,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/mercedes-benz/gle_...,Отличное состояние автомобиля. Комплектация «G...,2022-08-20 04:00:00
3,Mercedes-Benz,G-Class,Джип 5 дв.,Черный,Бензин,2002.0,200000.0,АКПП,296.0,2999999,G 500,M 113 E 50,5.0 LTR,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/mercedes-benz/g-cl...,Продам правильный гелик. Лучшая машина. Полнос...,2022-08-20 04:00:00
4,Audi,Q7,Джип 5 дв.,Белый,Бензин,,67000.0,Автомат,252.0,3300000,,,,2022-08-20 00:00:00,Владивосток,https://vladivostok.drom.ru/audi/q7/46498184.html,Audi Q7 II S line (252л.с.) 4WD Внедорожник 5 ...,2022-08-20 04:00:00


In [12]:
#validating the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [13]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['brand', 'name', 'bodytype', 'color', 'fueltype', 'year', 'mileage',
       'transmission', 'power', 'price', 'vehicleconfiguration', 'enginename',
       'enginedisplacement', 'date', 'location', 'link', 'description',
       'parsedate'],
      dtype='object')
columns of YAML are: ['brand', 'name', 'bodyType', 'color', 'fuelType', 'year', 'mileage', 'transmission', 'power', 'price', 'vehicleConfiguration', 'engineName', 'engineDisplacement', 'date', 'location', 'link', 'description', 'parsedate']


## File in pipe separated the text file (|) in gz format

In [14]:
# Write csv in gz format in pipe separated text file (|)
df.to_csv("car_sales.gz", sep = '|', index = False)

In [21]:
#size of the gz format folder
import os 
os.path.getsize('C:\\Users\\Admin\\Desktop\\Data Glacier\\Week6\\carsales.gz')

457589697

In [29]:
entries = os.listdir('C:\\Users\\Admin\\Desktop\\Data Glacier\\Week6\\Carsales.gz')
len(entries)

38