<a href="https://colab.research.google.com/github/MohsenBah/COVID19-news/blob/main/File_ingestion_and_schema_validation/Data_ingestion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Data Ingestion and Schema Validation steps:


    DATA: Airline Delay and Cancelation Data from 2009 to 2018, 7.62 GB. (I used 2009 to 2011 files, 2.31 GB)
    CSV files have been concatinate to one file.
    Performed basic validation on data columns : eg: remove special character , white spaces from the col name
    Created a YAML file and write the column name in YAML file. --defined separator of read and write file, column name in YAML
    Validated number of columns and column name of ingested file with YAML.
    Write the file in pipe separated text file (|) in gz format.
    Created a summary of the file: Total number of rows/Total number of columns/File size.

Data Source: https://www.kaggle.com/datasets/yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018




In [None]:
# the next steps are uploading zip file from kaggle and unzip them.
! pip install -q kaggle

In [None]:
from google.colab import files 
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"mohsenbahremani","key":"51842db852fccea4d7c8e1197305d7b5"}'}

In [None]:
! mkdir ~/.kaggle 

In [None]:
! cp kaggle.json ~/.kaggle/

In [None]:
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
! kaggle datasets download -d yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018

Downloading airline-delay-and-cancellation-data-2009-2018.zip to /content
 99% 1.93G/1.95G [00:13<00:00, 137MB/s]
100% 1.95G/1.95G [00:14<00:00, 149MB/s]


In [49]:
! unzip airline-delay-and-cancellation-data-2009-2018.zip -d train

Archive:  airline-delay-and-cancellation-data-2009-2018.zip
  inflating: train/2009.csv          
  inflating: train/2010.csv          
  inflating: train/2011.csv          
  inflating: train/2012.csv          

In [1]:
import pandas as pd
from glob import iglob
path=r'/content/train/*.csv'
file_list = pd.concat((pd.read_csv(f) for f in iglob(path, recursive=True)), ignore_index=True)

In [2]:
file_list.to_csv("Airline_Delay.csv", index=False)

In [20]:
%%writefile file.yaml
file_type: csv
dataset_name: Airline_Delay
file_name: Airline_Delay
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - FL_DATE
    - OP_CARRIER
    - OP_CARRIER_FL_NUM
    - ORIGIN
    - DEST
    - CRS_DEP_TIME
    - DEP_TIME
    - DEP_DELAY
    - TAXI_OUT
    - WHEELS_OFF
    - WHEELS_ON
    - TAXI_IN
    - CRS_ARR_TIME
    - ARR_TIME
    - ARR_DELAY
    - CANCELLED
    - CANCELLATION_CODE
    - DIVERTED
    - CRS_ELAPSED_TIME
    - ACTUAL_ELAPSED_TIME
    - AIR_TIME
    - DISTANCE
    - CARRIER_DELAY
    - WEATHER_DELAY
    - NAS_DELAY
    - SECURITY_DELAY
    - LATE_AIRCRAFT_DELAY
    - Unnamed_27

Overwriting file.yaml


In [21]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [22]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [6]:
config_data['inbound_delimiter']

','

In [23]:
#inspecting data of config file
config_data

{'columns': ['FL_DATE',
  'OP_CARRIER',
  'OP_CARRIER_FL_NUM',
  'ORIGIN',
  'DEST',
  'CRS_DEP_TIME',
  'DEP_TIME',
  'DEP_DELAY',
  'TAXI_OUT',
  'WHEELS_OFF',
  'WHEELS_ON',
  'TAXI_IN',
  'CRS_ARR_TIME',
  'ARR_TIME',
  'ARR_DELAY',
  'CANCELLED',
  'CANCELLATION_CODE',
  'DIVERTED',
  'CRS_ELAPSED_TIME',
  'ACTUAL_ELAPSED_TIME',
  'AIR_TIME',
  'DISTANCE',
  'CARRIER_DELAY',
  'WEATHER_DELAY',
  'NAS_DELAY',
  'SECURITY_DELAY',
  'LATE_AIRCRAFT_DELAY',
  'Unnamed_27'],
 'dataset_name': 'Airline_Delay',
 'file_name': 'Airline_Delay',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

In [None]:
# By this code, dataset cannot be opened.
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file, config_data['inbound_delimiter'])

In [None]:
# By this code, dataset cannot be opened.
df = pd.read_csv('/content/Airline_Delay.csv')
print(f'Number of Rows is: {len(df)}, Number of Columns is: {len(df.columns)}')
print(f"The file size is: {os.path.getsize('/content/Airline_Delay.csv')/10**9:.2f} GB")

In [1]:
%%time
import dask.dataframe as dd
import os
df = dd.read_csv('/content/Airline_Delay.csv' , dtype={'CANCELLATION_CODE': 'object'})
print(f'Number of Rows is: {len(df)}, Number of Columns is: {len(df.columns)}')
print(f"The file size is: {os.path.getsize('/content/Airline_Delay.csv')/10**9:.2f} GB")

Number of Rows is: 18946105, Number of Columns is: 28
The file size is: 2.31 GB
CPU times: user 1min 17s, sys: 3.58 s, total: 1min 21s
Wall time: 1min 11s


In [17]:
df=df.compute()

In [24]:
util.col_header_val(df, config_data)

column name and column length validation passed


1

In [25]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine



column name and column length validation passed
col validation passed


In [31]:
import datetime
import csv
import gzip

# Write csv in gz format in pipe separated text file (|)
df.to_csv('Airline.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')
