Task: File Ingestion and Schema validation
Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

Read the file ( Present approach of reading the file )

Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

Perform basic validation on data columns : eg: remove special character , white spaces from the col name

As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of
read and write file, column name in YAML

Validate number of columns and column name of ingested file with YAML.

Write the file in pipe separated text file (|) in gz format.

Create a summary of the file:

Total number of rows,

total number of columns

file size

In [1]:
import time
import pandas as pd

In [2]:
from dask import dataframe as dd
start=time.time()
dask_dd=dd.read_csv('data.csv')
end=time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.0033309459686279297 sec


In [3]:
start=time.time()
pdd=pd.read_csv('data.csv')
end=time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  15.176892042160034 sec


In [4]:
%%writefile utilityfile.py

import logging
import os
import yaml
import subprocess
import pandas as pd
import datetime
import gc
import re


#reading the file

def read_file(filepath):
    with open(filepath,'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)
            
# replacing the character

def replace(string, char):
    pattern=char + '{2,}'
    string= re.sub(pattern,char,string)
    return string


# standardizes columns and validates dataframe YAML against validation YAML

def col_header_valid(df, table_config):
    df.columns=df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]', '_', regex=True)
    df.columns= list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns= list(map(lambda x: replace(x, '_'), list(df.columns)))
    expected_col= list(map(lambda x:x.lower(),table_config['columns']))
    expected_col.sort()
    df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    
    if len(expected_col)==len(df.columns) and list(expected_col)==list(df.columns):
        print("Column name and column length validation passed.")
        return 1
    else:
        print("Column name and column length validation failed.")
        mismatched_columns_file=list(set(df.columns).difference(expected_col))
        print("Following file columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing utilityfile.py


In [5]:
%%writefile validation_yaml.csv

file_type: csv
dataset_name: validation
file_name: validation_yaml
table_name: example
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - date
    - game_size
    - match_id
    - match_mode
    - party_size
    - player_assists
    - player_dbno
    - player_dist_ride
    - player_dist_walk
    - player_dmg
    - player_kills
    - player_name
    - player_survive_time
    - team_id
    - team_placement

Overwriting validation_yaml.csv


In [6]:
%%writefile pubgdata.yaml

file_type: yaml
dataset_name: pubg_agg_match
file_name: pubg_agg_match_stats_0
table_name: ast
inbound_delimiter: ','
outbound_delimiter: '|'
skip_leading_rows: 1
columns: 
    - date
    - game_size
    - match_id
    - match_mode
    - party_size
    - player_assists
    - player_dbno
    - player_dist_ride
    - player_dist_walk
    - player_dmg
    - player_kills
    - player_name
    - player_survive_time
    - team_id
    - team_placement

Overwriting pubgdata.yaml


In [7]:
import utilityfile as util

config_data=util.read_file("validation_yaml.csv")

In [8]:
config_data['file_type']

'csv'

In [9]:
config_data

{'file_type': 'csv',
 'dataset_name': 'validation',
 'file_name': 'validation_yaml',
 'table_name': 'example',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['date',
  'game_size',
  'match_id',
  'match_mode',
  'party_size',
  'player_assists',
  'player_dbno',
  'player_dist_ride',
  'player_dist_walk',
  'player_dmg',
  'player_kills',
  'player_name',
  'player_survive_time',
  'team_id',
  'team_placement']}

In [10]:
# validate the header of the file

util.col_header_valid(dask_dd,config_data)

Column name and column length validation passed.


1

In [11]:
invalid_data= pd.read_csv("test_data.csv")
invalid_data

Unnamed: 0,city,age,Country
0,Delhi,34,India
1,Lima,30,Peru
2,Istanbul,16,Turkey
3,Riyadh,33,Saudi Arabia


In [12]:
util.col_header_valid(invalid_data,config_data)

Column name and column length validation failed.
Following file columns are not in the YAML file ['country', 'city', 'age']
Following YAML columns are not in the file uploaded ['player_dbno', 'party_size', 'match_id', 'player_name', 'player_dist_walk', 'player_dmg', 'team_placement', 'team_id', 'game_size', 'date', 'player_survive_time', 'match_mode', 'player_dist_ride', 'player_assists', 'player_kills']


0

In [18]:
import pandas as pd

# Assuming dask_dd is your DataFrame
columns_df = pd.DataFrame([dask_dd.columns])

# Save the columns as a single-row CSV with pipe delimiter and optional compression
columns_df.to_csv('output.txt', sep='|', index=False, header=False)

# Optional: If you want to compress the file
columns_df.to_csv('output.txt.gz', sep='|', index=False, header=False, compression='gzip')


In [21]:
import os
# get the file size
file_size = os.path.getsize('output.txt.gz')

# get the total number of rows and columns
num_rows = len(dask_dd)
num_cols = len(dask_dd.columns)

# print the summary
print(f"Total number of rows: {num_rows}")
print(f"Total number of columns: {num_cols}")
print(f"File size: {file_size} bytes")

Total number of rows: 13849287
Total number of columns: 15
File size: 140 bytes
