## File Ingestion

In [1]:
# Importing required libraries

import os
import time

import pandas as pd
from dask import dataframe as dd

import warnings
warnings.filterwarnings('ignore')

In [2]:
# Getting the size of the file
os.path.getsize(r'C:\\Users\\Mama\\Desktop\\ztm_project\\match_data.csv')

3456745016

### Reading the data with Dask

In [3]:
start = time.time()
dask_df = dd.read_csv(r'C:\Users\Mama\Desktop\ztm_project\match_data.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.31255340576171875 sec


In [4]:
# Viewing the dask dataframe
dask_df

Unnamed: 0_level_0,Unnamed: 0,gameCreation,gameDuration,gameId,gameMode,gameType,gameVersion,mapId,participantIdentities,participants,platformId,queueId,seasonId,status.message,status.status_code
npartitions=54,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
,int64,float64,float64,float64,object,object,object,float64,object,object,object,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


### Reading the data with Pandas

In [5]:
start = time.time()
pandas_df = pd.read_csv(r'C:\Users\Mama\Desktop\ztm_project\match_data.csv', engine='python')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  1029.716032743454 sec


**Here,it is evident Dask is better than Pandas with the least reading time of 0.31 sec. So we would be using dask to our data.**

In [6]:
# Reading the data again and specify a delimiter
dask_df = dd.read_csv(r'C:\Users\Mama\Desktop\ztm_project\match_data.csv', delimiter='|')

In [7]:
# Viewing information about the dask dataframe
dask_df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 1 entries, ,gameCreation,gameDuration,gameId,gameMode,gameType,gameVersion,mapId,participantIdentities,participants,platformId,queueId,seasonId,status.message,status.status_code to ,gameCreation,gameDuration,gameId,gameMode,gameType,gameVersion,mapId,participantIdentities,participants,platformId,queueId,seasonId,status.message,status.status_code
dtypes: object(1)

In [8]:
# Checking the length of the dataframe
len(dask_df.index)

108829

In [9]:
# Checking the number of Columns
len(dask_df.columns)

1

In [10]:
# Removing special character
dask_df.columns = dask_df.columns.str.replace('[#,@,&,:]','', regex=True)

In [11]:
#Removing white space from columns
dask_df.columns = dask_df.columns.str.replace(' ', '')

In [12]:
# Removing colons from columns
dask_df.columns = dask_df.columns.str.replace(':', '')

In [13]:
# Remove periods from columns
dask_df.columns = dask_df.columns.str.replace('.', '_')

In [14]:
# Viewing the modified columns
dask_df.columns

Index(['gameCreationgameDurationgameIdgameModegameTypegameVersionmapIdparticipantIdentitiesparticipantsplatformIdqueueIdseasonIdstatus_messagestatus_status_code'], dtype='object')

### Schema Validation

**Here, I am going to create a utility.py file, that contains instructions for schema validation.**

In [15]:
%%writefile utility_f.py

import os
import subprocess
import pandas as pd
import datetime 
import gc
import re
import logging
logger = logging.getLogger(__name__)




def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[#,@,&,:]','', regex=True)
    df.columns = df.columns.str.replace(' ', '')
    df.columns = df.columns.str.replace(':', '')
    df.columns = df.columns.str.replace('.', '_')
    expected_col = list(map(lambda x: x.lower(), table_config['columns']))
    expected_col.sort()
    df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logger.info(f'df columns: {df.columns}')
        logger.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility_f.py


In [16]:
# Creating a dictionary of data for my yaml file

data = {
    'file_type': 'csv', 
    'dataset_name': 'match_data', 
    'file_name': 'match_data', 
    'table_name': 'tehetab',
    'delimiter': "|", 
    'skip_leading_rows': 1, 
    'columns':['Unnamed0', 'gameCreation', 'gameDuration', 'gameId', 'gameMode', 'gameType', 'gameVersion', 'mapId', 'participantIdentities', 'participants', 'platformId', 'queueId', 'seasonId', 'status_message', 'status_status_code']
}
   

In [17]:
# Viewing the dictionary
data

{'file_type': 'csv',
 'dataset_name': 'match_data',
 'file_name': 'match_data',
 'table_name': 'tehetab',
 'delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Unnamed0',
  'gameCreation',
  'gameDuration',
  'gameId',
  'gameMode',
  'gameType',
  'gameVersion',
  'mapId',
  'participantIdentities',
  'participants',
  'platformId',
  'queueId',
  'seasonId',
  'status_message',
  'status_status_code']}

In [18]:
# Writing our yaml file

from  ruamel.yaml import YAML

yaml = YAML()
yaml.preserve_quotes = True
with open(r'C:/Users/Mama/Desktop/ztm_project/store_file.yaml', 'w') as file:
    config_data = yaml.dump(data, file)

In [19]:
# Reading the yaml file
import sys
from  ruamel.yaml import YAML
yaml = YAML()

with open(r'C:/Users/Mama/Desktop/ztm_project/store_file.yaml') as file:
    config_file = yaml.load(file)
    
    print(config_file)
    yaml.dump(config_file, sys.stdout)

ordereddict([('file_type', 'csv'), ('dataset_name', 'match_data'), ('file_name', 'match_data'), ('table_name', 'tehetab'), ('delimiter', '|'), ('skip_leading_rows', 1), ('columns', ['Unnamed0', 'gameCreation', 'gameDuration', 'gameId', 'gameMode', 'gameType', 'gameVersion', 'mapId', 'participantIdentities', 'participants', 'platformId', 'queueId', 'seasonId', 'status_message', 'status_status_code'])])
file_type: csv
dataset_name: match_data
file_name: match_data
table_name: tehetab
delimiter: '|'
skip_leading_rows: 1
columns:
- Unnamed0
- gameCreation
- gameDuration
- gameId
- gameMode
- gameType
- gameVersion
- mapId
- participantIdentities
- participants
- platformId
- queueId
- seasonId
- status_message
- status_status_code


In [20]:
# Viewing the columns of the yaml file
config_file['columns']

['Unnamed0', 'gameCreation', 'gameDuration', 'gameId', 'gameMode', 'gameType', 'gameVersion', 'mapId', 'participantIdentities', 'participants', 'platformId', 'queueId', 'seasonId', 'status_message', 'status_status_code']

In [21]:
# Creating our source_file using config file
file_type = config_file['file_type']
source_file = "C:/Users/Mama/Desktop/ztm_project/" + config_file['file_name'] + f'.{file_type}'

In [22]:
source_file

'C:/Users/Mama/Desktop/ztm_project/match_data.csv'

In [23]:
# Reading the source_file with pandas. Due to low memory issues, I will just read a 1000 rows
import pandas as pd
import csv

df = pd.read_csv(source_file,  nrows=1000)
df

Unnamed: 0.1,Unnamed: 0,gameCreation,gameDuration,gameId,gameMode,gameType,gameVersion,mapId,participantIdentities,participants,platformId,queueId,seasonId,status.message,status.status_code
0,0,1.585155e+12,1323.0,4.247263e+09,CLASSIC,MATCHED_GAME,10.6.314.4405,11.0,"[{'participantId': 1, 'player': {'platformId':...","[{'participantId': 1, 'teamId': 100, 'champion...",KR,420.0,13.0,,
1,1,1.585152e+12,1317.0,4.247156e+09,CLASSIC,MATCHED_GAME,10.6.314.4405,11.0,"[{'participantId': 1, 'player': {'platformId':...","[{'participantId': 1, 'teamId': 100, 'champion...",KR,420.0,13.0,,
2,2,1.585059e+12,932.0,4.243963e+09,CLASSIC,MATCHED_GAME,10.6.313.8894,11.0,"[{'participantId': 1, 'player': {'platformId':...","[{'participantId': 1, 'teamId': 100, 'champion...",KR,420.0,13.0,,
3,3,1.584978e+12,2098.0,4.241678e+09,CLASSIC,MATCHED_GAME,10.6.313.8894,11.0,"[{'participantId': 1, 'player': {'platformId':...","[{'participantId': 1, 'teamId': 100, 'champion...",KR,420.0,13.0,,
4,4,1.584973e+12,2344.0,4.241539e+09,CLASSIC,MATCHED_GAME,10.6.313.8894,11.0,"[{'participantId': 1, 'player': {'platformId':...","[{'participantId': 1, 'teamId': 100, 'champion...",KR,420.0,13.0,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,995,1.581362e+12,930.0,4.144496e+09,CLASSIC,MATCHED_GAME,10.3.307.1028,11.0,"[{'participantId': 1, 'player': {'platformId':...","[{'participantId': 1, 'teamId': 100, 'champion...",KR,420.0,13.0,,
996,996,1.581358e+12,982.0,4.144510e+09,CLASSIC,MATCHED_GAME,10.3.307.1028,11.0,"[{'participantId': 1, 'player': {'platformId':...","[{'participantId': 1, 'teamId': 100, 'champion...",KR,420.0,13.0,,
997,997,1.581346e+12,1500.0,4.144149e+09,CLASSIC,MATCHED_GAME,10.3.307.1028,11.0,"[{'participantId': 1, 'player': {'platformId':...","[{'participantId': 1, 'teamId': 100, 'champion...",KR,420.0,13.0,,
998,998,1.581342e+12,1609.0,4.144094e+09,CLASSIC,MATCHED_GAME,10.3.307.1028,11.0,"[{'participantId': 1, 'player': {'platformId':...","[{'participantId': 1, 'teamId': 100, 'champion...",KR,420.0,13.0,,


In [24]:
# Validating the header of the file
import utility_f as util
util.col_header_val(df,config_file)

column name and column length validation passed


1

In [25]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_file['columns'])

columns of files are: Index(['unnamed0', 'gamecreation', 'gameduration', 'gameid', 'gamemode',
       'gametype', 'gameversion', 'mapid', 'participantidentities',
       'participants', 'platformid', 'queueid', 'seasonid', 'status_message',
       'status_status_code'],
      dtype='object')
columns of YAML are: ['Unnamed0', 'gameCreation', 'gameDuration', 'gameId', 'gameMode', 'gameType', 'gameVersion', 'mapId', 'participantIdentities', 'participants', 'platformId', 'queueId', 'seasonId', 'status_message', 'status_status_code']


In [26]:
if util.col_header_val(df,config_file)==0:
    print("validation failed")
else:
    print("column validation passed")

column name and column length validation passed
column validation passed


In [None]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('match_data.csv', delimiter='|')

# Writing csv in gz format in pipe separated text file (|)
df.to_csv('match_data.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

In [28]:
#number of files in gz format folder
import os
entries = os.listdir('match_data.csv.gz/')
for entry in entries:
    print(entry)

00.part
01.part
02.part
03.part
04.part
05.part
06.part
07.part
08.part
09.part
10.part
11.part
12.part
13.part
14.part
15.part
16.part
17.part
18.part
19.part
20.part
21.part
22.part
23.part
24.part
25.part
26.part
27.part
28.part
29.part
30.part
31.part
32.part
33.part
34.part
35.part
36.part
37.part
38.part
39.part
40.part
41.part
42.part
43.part
44.part
45.part
46.part
47.part
48.part
49.part
50.part
51.part
52.part
53.part


In [29]:
#size of the gz format folder
os.path.getsize('match_data.csv.gz')

24576