# File ingestion and schema validation


In [3]:
import time

import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import dask.dataframe as dd
import multiprocessing as mp
import csv
import yaml
import gzip
import os
from subprocess import check_call
import warnings
warnings.filterwarnings('ignore')

import datatest

## Using Pandas

In [5]:
start = time.time()
df = pd.read_csv('Data/price_paid_records.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  53.97488236427307 sec


## Using Dask

In [6]:
# Dataframes implement the pandas API
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('Data/price_paid_records.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.08531713485717773 sec


Dask is faster than pandas


In [7]:
df.head(5)

Unnamed: 0,Transaction unique identifier,Price,Date of Transfer,Property Type,Old/New,Duration,Town/City,District,County,PPDCategory Type,Record Status - monthly file only
0,{81B82214-7FBC-4129-9F6B-4956B4A663AD},25000,1995-08-18 00:00,T,N,F,OLDHAM,OLDHAM,GREATER MANCHESTER,A,A
1,{8046EC72-1466-42D6-A753-4956BF7CD8A2},42500,1995-08-09 00:00,S,N,F,GRAYS,THURROCK,THURROCK,A,A
2,{278D581A-5BF3-4FCE-AF62-4956D87691E6},45000,1995-06-30 00:00,T,N,F,HIGHBRIDGE,SEDGEMOOR,SOMERSET,A,A
3,{1D861C06-A416-4865-973C-4956DB12CD12},43150,1995-11-24 00:00,T,N,F,BEDFORD,NORTH BEDFORDSHIRE,BEDFORDSHIRE,A,A
4,{DD8645FD-A815-43A6-A7BA-4956E58F1874},18899,1995-06-23 00:00,S,N,F,WAKEFIELD,LEEDS,WEST YORKSHIRE,A,A


In [8]:
df.columns

Index(['Transaction unique identifier', 'Price', 'Date of Transfer',
       'Property Type', 'Old/New', 'Duration', 'Town/City', 'District',
       'County', 'PPDCategory Type', 'Record Status - monthly file only'],
      dtype='object')

In [9]:
df.rename(columns = {'Transaction unique identifier':"Transaction_unique_identifier", 'Date of Transfer':"Date_of_Transfer",
                       'Property Type':"Property_Type", 'PPDCategory Type':"PPDCategory_Type",
                       'Record Status - monthly file only':"Record_Status_monthly"}, inplace = True)

In [10]:
# Remove unwanted column
df.drop('Transaction_unique_identifier', axis = 1, inplace = True)

In [11]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')

In [12]:
df.head()

Unnamed: 0,Price,Date_of_Transfer,Property_Type,Old/New,Duration,Town/City,District,County,PPDCategory_Type,Record_Status_monthly
0,25000,1995-08-18 00:00,T,N,F,OLDHAM,OLDHAM,GREATER MANCHESTER,A,A
1,42500,1995-08-09 00:00,S,N,F,GRAYS,THURROCK,THURROCK,A,A
2,45000,1995-06-30 00:00,T,N,F,HIGHBRIDGE,SEDGEMOOR,SOMERSET,A,A
3,43150,1995-11-24 00:00,T,N,F,BEDFORD,NORTH BEDFORDSHIRE,BEDFORDSHIRE,A,A
4,18899,1995-06-23 00:00,S,N,F,WAKEFIELD,LEEDS,WEST YORKSHIRE,A,A


In [13]:
df.to_csv("Data/cleaned_data.csv",index=False)

#### Data Ingestion

In [14]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [15]:
df.columns

Index(['Price', 'Date_of_Transfer', 'Property_Type', 'Old/New', 'Duration',
       'Town/City', 'District', 'County', 'PPDCategory_Type',
       'Record_Status_monthly'],
      dtype='object')

In [16]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: cleaned_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Price
    - Date_of_Transfer
    - Property_Type
    - old_new
    - Duration
    - town_city
    - District
    - County
    - PPDCategory_Type
    - record_status_monthly

Overwriting file.yaml


In [17]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [18]:
config_data['inbound_delimiter']

','

In [19]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'cleaned_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Price',
  'Date_of_Transfer',
  'Property_Type',
  'old_new',
  'Duration',
  'town_city',
  'District',
  'County',
  'PPDCategory_Type',
  'record_status_monthly']}

In [20]:
# read the file using config file
file_type = config_data['file_type']
source_file = "Data/" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,Price,Date_of_Transfer,Property_Type,Old/New,Duration,Town/City,District,County,PPDCategory_Type,Record_Status_monthly
0,25000,1995-08-18 00:00,T,N,F,OLDHAM,OLDHAM,GREATER MANCHESTER,A,A
1,42500,1995-08-09 00:00,S,N,F,GRAYS,THURROCK,THURROCK,A,A
2,45000,1995-06-30 00:00,T,N,F,HIGHBRIDGE,SEDGEMOOR,SOMERSET,A,A
3,43150,1995-11-24 00:00,T,N,F,BEDFORD,NORTH BEDFORDSHIRE,BEDFORDSHIRE,A,A
4,18899,1995-06-23 00:00,S,N,F,WAKEFIELD,LEEDS,WEST YORKSHIRE,A,A


In [21]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [22]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['price', 'date_of_transfer', 'property_type', 'old_new', 'duration',
       'town_city', 'district', 'county', 'ppdcategory_type',
       'record_status_monthly'],
      dtype='object')
columns of YAML are: ['Price', 'Date_of_Transfer', 'Property_Type', 'old_new', 'Duration', 'town_city', 'District', 'County', 'PPDCategory_Type', 'record_status_monthly']


In [23]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [25]:
df.head()

Unnamed: 0,price,date_of_transfer,property_type,old_new,duration,town_city,district,county,ppdcategory_type,record_status_monthly
0,25000,1995-08-18 00:00,T,N,F,OLDHAM,OLDHAM,GREATER MANCHESTER,A,A
1,42500,1995-08-09 00:00,S,N,F,GRAYS,THURROCK,THURROCK,A,A
2,45000,1995-06-30 00:00,T,N,F,HIGHBRIDGE,SEDGEMOOR,SOMERSET,A,A
3,43150,1995-11-24 00:00,T,N,F,BEDFORD,NORTH BEDFORDSHIRE,BEDFORDSHIRE,A,A
4,18899,1995-06-23 00:00,S,N,F,WAKEFIELD,LEEDS,WEST YORKSHIRE,A,A


In [26]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22489348 entries, 0 to 22489347
Data columns (total 10 columns):
 #   Column                 Dtype 
---  ------                 ----- 
 0   price                  int64 
 1   date_of_transfer       object
 2   property_type          object
 3   old_new                object
 4   duration               object
 5   town_city              object
 6   district               object
 7   county                 object
 8   ppdcategory_type       object
 9   record_status_monthly  object
dtypes: int64(1), object(9)
memory usage: 1.7+ GB


In [27]:
df.shape

(22489348, 10)

In [28]:
df.columns

Index(['price', 'date_of_transfer', 'property_type', 'old_new', 'duration',
       'town_city', 'district', 'county', 'ppdcategory_type',
       'record_status_monthly'],
      dtype='object')

In [30]:
#Size of the file
os.path.getsize('Data/cleaned_data.csv')

1551090637

In [31]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('Data/cleaned_data.csv')

In [33]:
# Write csv in gz format in pipe separated text file (|)
df.to_csv('compressed.gz',
          sep = '|',
          header = True,
          index = False,
          quoting = csv.QUOTE_ALL,
          compression = 'gzip',
          quotechar = '"',
          doublequote = True,
          line_terminator = '\n')

['C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Internship\\Data-Glacier-Week6\\compressed.gz\\00.part',
 'C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Internship\\Data-Glacier-Week6\\compressed.gz\\01.part',
 'C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Internship\\Data-Glacier-Week6\\compressed.gz\\02.part',
 'C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Internship\\Data-Glacier-Week6\\compressed.gz\\03.part',
 'C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Internship\\Data-Glacier-Week6\\compressed.gz\\04.part',
 'C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Internship\\Data-Glacier-Week6\\compressed.gz\\05.part',
 'C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Internship\\Data-Glacier-Week6\\compressed.gz\\06.part',
 'C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Internship\\Data-Glacier-Week6\\compressed.gz\\07.part',
 'C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Internship\\Data-Glacier-Week6\\compressed.gz\\08.part',
 'C:\\Users\\amr_a\\Data-Glacier2\\Data-Glacier-Interns