# LISUM09, Week 6, File ingestion and schema validation
## Name: Laâroussi Saâdeddine
## Mail : laar.saad.eddine@gmail.com
## Country : Morocco

Tasks for this week's assignement are:

## 1) Choosing data of 2+ GB. 

Data was taken from kaggle.

Data is about 'StackSample: 10% of Stack Overflow Q&A'

Link to data : https://www.kaggle.com/datasets/stackoverflow/stacksample

Size of data as of 06/06/2022 : 3.6 GB (Questions.csv : 1.87 GB, Anserws.csv: 1.57 GB, Tags : 0.63 GB)

## 2) Reading data using different methods

In [18]:
import pandas as pd
import timeit

In [19]:
start = timeit.default_timer()

df_pd = pd.read_csv('Dataset/Questions.csv',encoding='ISO-8859-1')

stop = timeit.default_timer()

print('Time: ', stop - start)  

Time:  320.41127972799995


In [20]:
import dask.dataframe as dd

In [21]:
start = timeit.default_timer()

df_dd = dd.read_csv('Dataset/Questions.csv',encoding='ISO-8859-1')

stop = timeit.default_timer()

print('Time: ', stop - start)  

Time:  2.274378377000062


Dask is faster than pandas

I tried using modin but my PC crashes

ray isn't available for python version after 3.9

## 3) Performing basic validation on columns

In [22]:
df_pd.columns

Index(['Id', 'OwnerUserId', 'CreationDate', 'ClosedDate', 'Score', 'Title',
       'Body'],
      dtype='object')

In [23]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Overwriting testutility.py


Renaming columns to test functions

## 4) Creating YAML file

In [24]:
%%writefile file.yaml
file_type: csv
dataset_name: Questions
file_name: Questions
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - id
    - owneruserid
    - creationdate
    - closeddate
    - score
    - title
    - body

Overwriting file.yaml


In [25]:
import testutility as util
config_data = util.read_config_file("file.yaml")

In [26]:
config_data

{'file_type': 'csv',
 'dataset_name': 'Questions',
 'file_name': 'Questions',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['id',
  'owneruserid',
  'creationdate',
  'closeddate',
  'score',
  'title',
  'body']}

## 5) Validation and ingestion

In [27]:
file_type = config_data['file_type']
source_file = "DataSet/" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,config_data['inbound_delimiter'],encoding='ISO-8859-1')
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,Id,OwnerUserId,CreationDate,ClosedDate,Score,Title,Body
0,80,26.0,2008-08-01T13:57:07Z,,26,SQLStatement.execute() - multiple queries in o...,<p>I've written a database generation script i...
1,90,58.0,2008-08-01T14:41:24Z,2012-12-26T03:45:49Z,144,Good branching and merging tutorials for Torto...,<p>Are there any really good tutorials explain...
2,120,83.0,2008-08-01T15:50:08Z,,21,ASP.NET Site Maps,<p>Has anyone got experience creating <strong>...
3,180,2089740.0,2008-08-01T18:42:19Z,,53,Function for creating color wheels,<p>This is something I've pseudo-solved many t...
4,260,91.0,2008-08-01T23:22:08Z,,49,Adding scripting functionality to .NET applica...,<p>I have a little game written in C#. It uses...


In [28]:
util.col_header_val(df_pd,config_data)

column name and column length validation passed


1

In [29]:
df = df[['Id','OwnerUserId']]

In [30]:
df=df.rename(columns={'Id':'_  ID__','OwnerUserId': 'owner USER__ID'})

In [31]:
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['owner_user_id']
Following YAML columns are not in the file uploaded ['closeddate', 'owneruserid', 'score', 'body', 'title', 'creationdate']


0

## 6) Writing file with | pipeline in gz format

In [32]:
filename = "DataSet/" + config_data['file_name']+ " out" + f'.{file_type}.gz'

df_pd.to_csv(filename, sep=config_data['outbound_delimiter'], compression='gzip')


In [33]:
import os
origin_size = os.path.getsize(source_file)
compressed_size = os.path.getsize(filename)
number_rows = df_pd.shape[0]
number_columns= df_pd.shape[1]

In [34]:
print("Original size: " + f'{origin_size}' +"Ko")
print("Compressed size: " + f'{compressed_size}' +"Ko")
print("Number of rows: " + f'{number_rows}')
print("Number of columns: "+  f'{number_columns}')


Original size: 1923682009Ko
Compressed size: 616475376Ko
Number of rows: 1264216
Number of columns: 7
