# Week 6: File ingestion and schema validation

Steps of the task,

* Take any csv/text file of 2+ GB of your choice

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

* Create a summary of the file: Total number of rows, total number of columns, file size.

# 1. READ THE FILE 

Due to the bad internet connection that it has, it was difficult for me to download a file of more than two gigabytes from the web, for this reason I created a file with these characteristics using R, the final file has eleven columns and 42 million rows , its final weight was 2.7 GB.

In [2]:
# IMPORT LIBRARY
import pandas as pd

In [3]:
%%time
#READ THE FILE IN THE USUAL WAY
data = pd.read_csv('DATA.csv')

Wall time: 1min 28s


In [4]:
#DIMESION
data.shape

(42000000, 11)

In [5]:
#HEAD
data.head()

Unnamed: 0,ID,AGE,CITY,GENDER,MARITAL STATUS,SALARY,OTHER INCOMES,RENT,FOOD,EDUCATION,DEFAULT
0,1,23,TRUJILLO,FEMALE,SEPARATED,12119,250,274,117,190,1
1,2,45,VALENCIA,FEMALE,MARIED,1610,239,641,584,403,0
2,3,19,MARACAIBO,FEMALE,ENGAGED,3246,364,412,556,686,1
3,4,20,TRUJILLO,FEMALE,WIDOWED,14962,1957,787,458,724,0
4,5,44,MARACAIBO,MALE,DIVORCED,16032,546,799,647,636,0


# READ THE FILE WITH DASK

In [6]:
import dask.dataframe as dd

In [8]:
%%time
#READ THE FILE WITH DASK
df = dd.read_csv('DATA.csv')

Wall time: 112 ms


In [9]:
#SHAPE
df.shape, df

((Delayed('int-5e664c04-2bc9-4dc7-b534-1cfc75c51119'), 11),
 Dask DataFrame Structure:
                    ID    AGE    CITY  GENDER MARITAL STATUS SALARY OTHER INCOMES   RENT   FOOD EDUCATION DEFAULT
 npartitions=44                                                                                                  
                 int64  int64  object  object         object  int64         int64  int64  int64     int64   int64
                   ...    ...     ...     ...            ...    ...           ...    ...    ...       ...     ...
 ...               ...    ...     ...     ...            ...    ...           ...    ...    ...       ...     ...
                   ...    ...     ...     ...            ...    ...           ...    ...    ...       ...     ...
                   ...    ...     ...     ...            ...    ...           ...    ...    ...       ...     ...
 Dask Name: read-csv, 44 tasks)

In [10]:
#HEAD
df.head()

Unnamed: 0,ID,AGE,CITY,GENDER,MARITAL STATUS,SALARY,OTHER INCOMES,RENT,FOOD,EDUCATION,DEFAULT
0,1,23,TRUJILLO,FEMALE,SEPARATED,12119,250,274,117,190,1
1,2,45,VALENCIA,FEMALE,MARIED,1610,239,641,584,403,0
2,3,19,MARACAIBO,FEMALE,ENGAGED,3246,364,412,556,686,1
3,4,20,TRUJILLO,FEMALE,WIDOWED,14962,1957,787,458,724,0
4,5,44,MARACAIBO,MALE,DIVORCED,16032,546,799,647,636,0


As we can see in the reading time Dask is the best reading option if we compare the normal reading of pandas and this package.

# Simple calculations by grouping

In [11]:
%%time
#USUAL WAY
data.groupby('CITY').RENT.mean()

Wall time: 7.31 s


CITY
BARINAS          549.850257
BARQUISIMETO     549.857732
CARACAS          550.155036
CUMANA           549.703094
MARACAIBO        550.009639
MERIDA           550.111520
SAN CRISTOBAL    550.006832
TRUJILLO         549.969235
TUCUPITA         550.052079
VALENCIA         549.812087
Name: RENT, dtype: float64

In [12]:
%%time
#USING DASK
df.groupby('CITY').RENT.mean().compute()

Wall time: 1min 10s


CITY
BARINAS          549.850257
BARQUISIMETO     549.857732
CARACAS          550.155036
CUMANA           549.703094
MARACAIBO        550.009639
MERIDA           550.111520
SAN CRISTOBAL    550.006832
TRUJILLO         549.969235
TUCUPITA         550.052079
VALENCIA         549.812087
Name: RENT, dtype: float64

# MODIN EXAMPLES

In [1]:
#IMPORT LIBRARIES
import os
os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask

import modin.pandas as pd
#READ
from distributed import Client

client = Client()

In [2]:
%%time
df = pd.read_csv("DATA.csv")

Wall time: 2min 14s


In [3]:
#HEAD
df.head()

Unnamed: 0,ID,AGE,CITY,GENDER,MARITAL STATUS,SALARY,OTHER INCOMES,RENT,FOOD,EDUCATION,DEFAULT
0,1,23,TRUJILLO,FEMALE,SEPARATED,12119,250,274,117,190,1
1,2,45,VALENCIA,FEMALE,MARIED,1610,239,641,584,403,0
2,3,19,MARACAIBO,FEMALE,ENGAGED,3246,364,412,556,686,1
3,4,20,TRUJILLO,FEMALE,WIDOWED,14962,1957,787,458,724,0
4,5,44,MARACAIBO,MALE,DIVORCED,16032,546,799,647,636,0


In [4]:
df.columns

Index(['ID', 'AGE', 'CITY', 'GENDER', 'MARITAL STATUS', 'SALARY',
       'OTHER INCOMES', 'RENT', 'FOOD', 'EDUCATION', 'DEFAULT'],
      dtype='object')

In [5]:
%%time
df.groupby('CITY').RENT.mean()

Wall time: 1min 29s


CITY
BARINAS          549.850257
BARQUISIMETO     549.857732
CARACAS          550.155036
CUMANA           549.703094
MARACAIBO        550.009639
MERIDA           550.111520
SAN CRISTOBAL    550.006832
TRUJILLO         549.969235
TUCUPITA         550.052079
VALENCIA         549.812087
Name: RENT, dtype: float64

If we compare the reading time of pandas against Dask and against Modin, we can conclude, according to the results, that the best reading option in this case is Dask.

# 2. BASIC VALIDATION

In [6]:
# SEE COLUMNS NAMES
df.columns

Index(['ID', 'AGE', 'CITY', 'GENDER', 'MARITAL STATUS', 'SALARY',
       'OTHER INCOMES', 'RENT', 'FOOD', 'EDUCATION', 'DEFAULT'],
      dtype='object')

In [7]:
# REPLACE WHITE SPACE WITH "_"
df.columns = df.columns.str.replace('[^\w]','_',regex=True)
df.columns

Index(['ID', 'AGE', 'CITY', 'GENDER', 'MARITAL_STATUS', 'SALARY',
       'OTHER_INCOMES', 'RENT', 'FOOD', 'EDUCATION', 'DEFAULT'],
      dtype='object')

# 3. YAML FILE CREATION

In [8]:
%%writefile read.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

#CREATE A SUPPORT FILE

################
# File Reading #
################

def read_yaml_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

Overwriting read.py


In [9]:
%%writefile yaml_file.yaml
#WRITE YAML FILE
file_type: csv
file_name: DATA
inbound_delimiter: ","
n_row: 42000000
n_col: 11
columns: 
    - ID
    - AGE
    - CITY
    - GENDER
    - MARITAL STATUS
    - SALARY
    - OTHER INCOMES
    - RENT
    - FOOD
    - EDUCATION
    - DEFAULT

Overwriting yaml_file.yaml


In [10]:
#READ SUPPORT FILE AND YAML 
import read as support
yaml_data = support.read_yaml_file("yaml_file.yaml")

In [11]:
#YAML FILE
yaml_data

{'file_type': 'csv',
 'file_name': 'DATA',
 'inbound_delimiter': ',',
 'n_row': 42000000,
 'n_col': 11,
 'columns': ['ID',
  'AGE',
  'CITY',
  'GENDER',
  'MARITAL STATUS',
  'SALARY',
  'OTHER INCOMES',
  'RENT',
  'FOOD',
  'EDUCATION',
  'DEFAULT']}

In [12]:
#SPECIFIC INFO
yaml_data['columns']

['ID',
 'AGE',
 'CITY',
 'GENDER',
 'MARITAL STATUS',
 'SALARY',
 'OTHER INCOMES',
 'RENT',
 'FOOD',
 'EDUCATION',
 'DEFAULT']

# READ FILE WITH YAML

In [13]:
# CREATE ROUTE
file_type = yaml_data['file_type']
source_file = "./" + yaml_data['file_name'] + f'.{file_type}'
print("",source_file)

 ./DATA.csv


In [15]:
import dask.dataframe as dd

In [16]:
%%time
#READ FILE WITH DASK
df = dd.read_csv(source_file,sep=yaml_data['inbound_delimiter'])

Wall time: 19 ms


In [17]:
df.head()

Unnamed: 0,ID,AGE,CITY,GENDER,MARITAL STATUS,SALARY,OTHER INCOMES,RENT,FOOD,EDUCATION,DEFAULT
0,1,23,TRUJILLO,FEMALE,SEPARATED,12119,250,274,117,190,1
1,2,45,VALENCIA,FEMALE,MARIED,1610,239,641,584,403,0
2,3,19,MARACAIBO,FEMALE,ENGAGED,3246,364,412,556,686,1
3,4,20,TRUJILLO,FEMALE,WIDOWED,14962,1957,787,458,724,0
4,5,44,MARACAIBO,MALE,DIVORCED,16032,546,799,647,636,0


In [18]:
#CALCULATE NROW AND N COL
a = df.shape
a[0].compute(),a[1]

(42000000, 11)

# 4. VALIDATE NUMBER OF ROWS, COLUMNS AND COLUMNS NAMES WITH YAML FILE

In [19]:
#EXPECTED ROWS AND COLUMNS
yaml_data['n_row'],yaml_data['n_col']

(42000000, 11)

In [20]:
#VALIDATE NUMBER OF ROW
if a[0].compute()==yaml_data['n_row']:
    print("Equal number of rows!")
else:
    print("Wrong number of rows")
    # write the code to perform further action
    # in the pipleine

Equal number of rows!


In [21]:
#VALIDATE NUMBER OF COLUMNS
if len(df.columns)==yaml_data['n_col']:
    print("Equal number of columns!")
else:
    print("Wrong number of columns")
    # write the code to perform further action
    # in the pipleine

Equal number of columns!


In [22]:
#VALIDATE NAMES OF DATAFRAME
if sum(yaml_data['columns'] == df.columns) == 11:
    print("Names validated!")
else:
    print("Mismatch in columns names")
    # write the code to perform further action
    # in the pipleine

Names validated!


# 5. WRITE FILE IN GZ FORMAT

In [23]:
#WRITE FILE
df.to_parquet(os.path.join('df1.gzip'), compression='gzip')



In [24]:
#READ THE COMPRESSED FILE
df1 = dd.read_parquet('df1.gzip')

In [25]:
df1.head()

Unnamed: 0_level_0,ID,AGE,CITY,GENDER,MARITAL STATUS,SALARY,OTHER INCOMES,RENT,FOOD,EDUCATION,DEFAULT
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,1,23,TRUJILLO,FEMALE,SEPARATED,12119,250,274,117,190,1
1,2,45,VALENCIA,FEMALE,MARIED,1610,239,641,584,403,0
2,3,19,MARACAIBO,FEMALE,ENGAGED,3246,364,412,556,686,1
3,4,20,TRUJILLO,FEMALE,WIDOWED,14962,1957,787,458,724,0
4,5,44,MARACAIBO,MALE,DIVORCED,16032,546,799,647,636,0


# 6. SUMMARY OF THE FILE

In [26]:
#SUMMARY 
#N ROWS
a[0].compute()

42000000

In [27]:
#N COL
a[1]

11

In [28]:
#TOTAL SIZE
#NORMAL DATA
from pathlib import Path
Path('DATA.csv').stat().st_size #BYTES - 10.000.000 BYTES = 10 MB

2803003497

In [29]:
import os

#FUNCTION THAT CALCULATE TOTAL SIZE IN BYTES OF THE COMPRESSED FOLDER
def get_size(start_path = '.'):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(start_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            # skip if it is symbolic link
            if not os.path.islink(fp):
                total_size += os.path.getsize(fp)

    return total_size

In [30]:
print(get_size('df1.gzip'), 'bytes')

527055579 bytes


We can see that the complete data according to the pytho function weighs 2.8 GB, on the other hand when compressing this file its weight is 527 MB.