# **File Ingestion and schema validation**

#### **Task**
- Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

- Read the file ( Present approach of reading the file )

- Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

- Perform basic validation on data columns : eg: remove special character , white spaces from the col name

- As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML

- Validate number of columns and column name of ingested file with YAML.

- Write the file in pipe separated text file (|) in gz format.

- Create a summary of the file:

- Total number of rows,

- total number of columns

- file size

In [None]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


#### **Testing computational efficiency using brute force**

In [None]:
pip install modin[ray]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting modin[ray]
  Downloading modin-0.12.1-py3-none-any.whl (761 kB)
[K     |████████████████████████████████| 761 kB 26.4 MB/s 
Collecting fsspec
  Downloading fsspec-2022.7.1-py3-none-any.whl (141 kB)
[K     |████████████████████████████████| 141 kB 48.5 MB/s 
Collecting ray[default]>=1.4.0
  Downloading ray-1.13.0-cp37-cp37m-manylinux2014_x86_64.whl (54.5 MB)
[K     |████████████████████████████████| 54.5 MB 1.2 MB/s 
Collecting virtualenv
  Downloading virtualenv-20.16.3-py2.py3-none-any.whl (8.8 MB)
[K     |████████████████████████████████| 8.8 MB 51.5 MB/s 
Collecting grpcio<=1.43.0,>=1.28.1
  Downloading grpcio-1.43.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.1 MB)
[K     |████████████████████████████████| 4.1 MB 52.8 MB/s 
Collecting gpustat>=1.0.0b1
  Downloading gpustat-1.0.0rc1.tar.gz (89 kB)
[K     |████████████████████████████████| 89 kB 9.1 MB/

In [None]:
import dask.dataframe as dd
import modin.pandas as mpd
import time
import ray
import pandas as pd

In [None]:
import time
import resource 

time_start = time.perf_counter()
df = dd.read_table('/content/drive/MyDrive/data.csv') 
time_elapsed = (time.perf_counter() - time_start)
memMb=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1024.0/1024.0
print ("%5.1f secs %5.1f MByte" % (time_elapsed,memMb))

  0.0 secs   1.9 MByte


In [None]:
time_start = time.perf_counter()
df1 = mpd.read_csv('/content/drive/MyDrive/data.csv') 
time_elapsed = (time.perf_counter() - time_start)
memMb=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1024.0/1024.0
print ("%5.1f secs %5.1f MByte" % (time_elapsed,memMb))

 21.0 secs   1.9 MByte


Data types of partitions are different! Please refer to the troubleshooting section of the Modin documentation to fix this issue.


In [None]:
time_start = time.perf_counter()
df3 = pd.read_csv('/content/drive/MyDrive/data.csv') 
time_elapsed = (time.perf_counter() - time_start)
memMb=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1024.0/1024.0
print ("%5.1f secs %5.1f MByte" % (time_elapsed,memMb))

 18.9 secs   1.6 MByte




In [None]:
df3.head()

Unnamed: 0.1,Unnamed: 0,id,cell_id,source,ancestor_id,pct_rank
0,1,0944b58318b789,fced0b7a,# Spectrogram-based CNN for the Tensorflow Spe...,0212b702,0.0
1,3,59958672e3bf59,9b68df7e,# A simple explanation and implementation of D...,8f0f9cda,0.0
2,4,b22e24942614c9,fa089df9,Name: Aviral Jain | \nRoll No: 18AG3AI08 | \nB...,52b2390a,0.0
3,5,59959edc72f7c1,815f8d1f,"<img src=""https://drive.google.com/uc?export=d...",2ba4272a,0.0
4,7,599548cea78ff7,7b084d0a,# DATA UNDESRTANDING\n\n#### Churn veri seti i...,ce055a99,0.0


#### **Removing white spaces or special characters**

In [None]:
#Replacing white spaces from pandas datafreme 
df3.columns = df3.columns.str.replace(' ','_')
#Replacing special characters in a column with white space
#df['B'].str.replace('[^\w\s]', '')

In [None]:
df3.head()

Unnamed: 0,Unnamed:_0,id,cell_id,source,ancestor_id,pct_rank
0,1,0944b58318b789,fced0b7a,# Spectrogram-based CNN for the Tensorflow Spe...,0212b702,0.0
1,3,59958672e3bf59,9b68df7e,# A simple explanation and implementation of D...,8f0f9cda,0.0
2,4,b22e24942614c9,fa089df9,Name: Aviral Jain | \nRoll No: 18AG3AI08 | \nB...,52b2390a,0.0
3,5,59959edc72f7c1,815f8d1f,"<img src=""https://drive.google.com/uc?export=d...",2ba4272a,0.0
4,7,599548cea78ff7,7b084d0a,# DATA UNDESRTANDING\n\n#### Churn veri seti i...,ce055a99,0.0


### **Writing YAML file**

In [None]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: penguins
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - CulmenLength
    - CulmenDepth
    - FlipperLength
    - BodyMass
    - Species

Overwriting file.yaml


In [None]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")


In [None]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'penguins',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['CulmenLength',
  'CulmenDepth',
  'FlipperLength',
  'BodyMass',
  'Species']}

In [None]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("penguins.csv",delimiter=',')
df_sample.head()

Unnamed: 0,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
0,39.1,18.7,181.0,3750.0,0
1,39.5,17.4,186.0,3800.0,0
2,40.3,18.0,195.0,3250.0,0
3,,,,,0
4,36.7,19.3,193.0,3450.0,0


In [None]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
0,39.1,18.7,181.0,3750.0,0
1,39.5,17.4,186.0,3800.0,0
2,40.3,18.0,195.0,3250.0,0
3,,,,,0
4,36.7,19.3,193.0,3450.0,0


In [None]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['culmenlength', 'culmendepth', 'flipperlength', 'bodymass', 'species'], dtype='object')
columns of YAML are: ['CulmenLength', 'CulmenDepth', 'FlipperLength', 'BodyMass', 'Species']


In [None]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [None]:
# Create csv with gzip compression
df.to_csv('gzfile.csv.gz',
      sep='|',
      header=True,
      index=False,
      compression='gzip')

### **Summary of file**

In [3]:
import os

gz_file = pd.read_csv('gzfile.csv.gz', compression='gzip', sep='|')

data = {
    'Total_number_of_rows' : gz_file.shape[0],
    'Total_number_of_columns' : gz_file.shape[1],
    'file_size' : os.path.getsize("gzfile.csv.gz")}
summarydata = pd.DataFrame(data, columns= ['Total_number_of_rows', 
                                           'Total_number_of_columns',
                                           'file_size'], index=[0])
summarydata

Unnamed: 0,Total_number_of_rows,Total_number_of_columns,file_size
0,344,5,2384
