# **File ingestion and schema validation**

Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

Read the file ( Present approach of reading the file )

Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

Perform basic validation on data columns : eg: remove special character , white spaces from the col name

As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML

Validate number of columns and column name of ingested file with YAML.

Write the file in pipe separated text file (|) in gz format.

Create a summary of the file:

1. total number of columns

2. file size


### **Dataset Grossary**

The Price Paid Data includes information on all registered property sales in England and Wales that are sold for full market value. Address details have been truncated to the town/city level.

You might also find the HM Land Registry transaction records to be a useful supplement to this dataset: https://www.kaggle.com/datasets/hm-land-registry/uk-housing-prices-paid

The available fields are as follows:

Transaction unique identifier A reference number which is generated automatically recording each published sale. The number is unique and will change each time a sale is recorded.

Price Sale price stated on the transfer deed.

Date of Transfer Date when the sale was completed, as stated on the transfer deed.

Property Type D = Detached, S = Semi-Detached, T = Terraced, F = Flats/Maisonettes, O = Other
Note that:

we only record the above categories to describe property type, we do not separately identify bungalows.
end-of-terrace properties are included in the Terraced category above.
‘Other’ is only valid where the transaction relates to a property type that is not covered by existing values.
Old/New Indicates the age of the property and applies to all price paid transactions, residential and non-residential.
Y = a newly built property, N = an established residential building

Duration Relates to the tenure: F = Freehold, L= Leasehold etc.
Note that HM Land Registry does not record leases of 7 years or less in the Price Paid Dataset.

Town/City

District

County

PPD Category Type Indicates the type of Price Paid transaction.
A = Standard Price Paid entry, includes single residential property sold for full market value.
B = Additional Price Paid entry including transfers under a power of sale/repossessions, buy-to-lets (where they can be identified by a Mortgage) and transfers to non-private individuals. Note that category B does not separately identify the transaction types stated. HM Land Registry has been collecting information on Category A transactions from January 1995. Category B transactions were identified from October 2013.

Record Status - monthly file only Indicates additions, changes and deletions to the records.(see guide below).
A = Addition
C = Change
D = Delete.

Note that where a transaction changes category type due to misallocation (as above) it will be deleted from the original category type and added to the correct category with a new transaction unique identifier.

In [3]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import yaml

#creating a function to read the file
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower() # changing all columns to lower case
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [4]:
#mounting the drive
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [5]:
##writing yaml file
%%writefile file.yaml
file_type: csv
dataset_name: housingPricesPaid 
file_name: price_paid_records
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Transaction unique identifier
    - Price
    - Date of Transfer
    - Property Type
    - Old/New
    - Duration
    - Town/City
    - District
    - County
    - PPDCategory Type
    - Record Status - monthly file only

Writing file.yaml


In [6]:
# Reading the  config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [7]:
#confriming the delimiter
config_data['inbound_delimiter']

','

In [8]:
# confirming the columnsconfig
config_data['columns']

['Transaction unique identifier',
 'Price',
 'Date of Transfer',
 'Property Type',
 'Old/New',
 'Duration',
 'Town/City',
 'District',
 'County',
 'PPDCategory Type',
 'Record Status - monthly file only']

In [9]:
#inspecting data of config file
config_data

{'columns': ['Transaction unique identifier',
  'Price',
  'Date of Transfer',
  'Property Type',
  'Old/New',
  'Duration',
  'Town/City',
  'District',
  'County',
  'PPDCategory Type',
  'Record Status - monthly file only'],
 'dataset_name': 'housingPricesPaid',
 'file_name': 'price_paid_records',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

## **Reading the Files Using different Methods**

we will try to load and read the file and check the time taken for the work to be done




#### **1. Reading using config file**

In [12]:
#using an absolute path to open the file:
import os
os.chdir(r'/content/drive/MyDrive/Courses Online/Data Glacier/Week 6/')

In [13]:
# importing the operating system to get the file path
print("Confirming my file(online_trans.csv) exists: ", os.listdir())
print("\n")
print("The file directory path is:",os.getcwd()) # getting the exact file locattion directory

Confirming my file(online_trans.csv) exists:  ['Data_ingestion.ipynb', 'testutility.py', 'DSVIICODE', 'test_ingestion.ipynb', '2019-Nov.csv', '2019-Oct.csv', 'file.yaml', 'price_paid_records.csv']


The file directory path is: /content/drive/MyDrive/Courses Online/Data Glacier/Week 6


In [14]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
print("",source_file)

 ./price_paid_records.csv


In [25]:
#import needed libraries
import time
import pandas as pd

#load and read the dataset
start_time = time.time()
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
end_time = time.time()
print("config estimated loading time: = {}".format(end_time-start_time))
print("\n")
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


config estimated loading time: = 138.66593527793884




Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


### **Validate number of columns and column name of ingested file with YAML**

In [26]:
#validating the header(columns) of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['event_time', 'event_type', 'brand', 'category_id', 'user_id', 'category_code', 'user_session', 'product_id']
Following YAML columns are not in the file uploaded ['date of transfer', 'county', 'town/city', 'record status - monthly file only', 'district', 'property type', 'duration', 'transaction unique identifier', 'old/new', 'ppdcategory type']


0

In [27]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')
columns of YAML are: ['Transaction unique identifier', 'Price', 'Date of Transfer', 'Property Type', 'Old/New', 'Duration', 'Town/City', 'District', 'County', 'PPDCategory Type', 'Record Status - monthly file only']


In [28]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['event_time', 'event_type', 'brand', 'category_id', 'user_id', 'category_code', 'user_session', 'product_id']
Following YAML columns are not in the file uploaded ['date of transfer', 'county', 'town/city', 'record status - monthly file only', 'district', 'property type', 'duration', 'transaction unique identifier', 'old/new', 'ppdcategory type']
validation failed


In [None]:
pd.read_csv("./price_paid_records.csv")

## 2. loading and reading the dataset using pandas

In [1]:
#importing libraries
import pandas as pd
import numpy as np
import time 

In [None]:
%%time
#1.loading the dataset
df_pandas = pd.read_csv("./price_paid_records.csv", delimiter=',')

#checking the first 5 rows
df_pandas.head()

Crash using pandas

## 3. loading and reading dataset using Dask

In [16]:
!pip install "dask[dataframe]"

Collecting fsspec>=0.6.0
  Downloading fsspec-2022.3.0-py3-none-any.whl (136 kB)
[K     |████████████████████████████████| 136 kB 5.3 MB/s 
Collecting partd>=0.3.10
  Downloading partd-1.2.0-py3-none-any.whl (19 kB)
Collecting locket
  Downloading locket-0.2.1-py2.py3-none-any.whl (4.1 kB)
Installing collected packages: locket, partd, fsspec
Successfully installed fsspec-2022.3.0 locket-0.2.1 partd-1.2.0


In [18]:
%%time
#install dask dataframe first and then import library
import dask.dataframe as dd
import time

#loading the dataset
df_dask = dd.read_csv("./price_paid_records.csv", delimiter=',')

#getting the first 5 colums
df_dask.head()

CPU times: user 1.88 s, sys: 546 ms, total: 2.42 s
Wall time: 4.37 s


In [19]:
df_dask.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


## 4. loading and reading the dataset using Datatable


In [21]:
#istalling and importing required library
#!pip install pip --upgrade
!pip install datatable

Collecting datatable
  Downloading datatable-1.0.0-cp37-cp37m-manylinux_2_12_x86_64.whl (96.9 MB)
[K     |████████████████████████████████| 96.9 MB 110 kB/s 
[?25hInstalling collected packages: datatable
Successfully installed datatable-1.0.0


In [22]:
%%time

#importing the library 
import datatable as dt
import time

#loading using datatable
datatable_df = dt.fread("./price_paid_records.csv")

CPU times: user 41.6 s, sys: 12.6 s, total: 54.2 s
Wall time: 59 s


In [23]:
#getting the first 5 colums
datatable_df.head()

Unnamed: 0_level_0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
Unnamed: 0_level_1,▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪▪▪▪▪,▪▪▪▪,▪▪▪▪
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d
5,2019-10-01 00:00:05 UTC,view,1480613,2053013561092866779,computers.desktop,pulser,908.62,512742880,0d0d91c2-c9c2-4e81-90a5-86594dec0db9
6,2019-10-01 00:00:08 UTC,view,17300353,2053013553853497655,,creed,380.96,555447699,4fe811e9-91de-46da-90c3-bbd87ed3a65d
7,2019-10-01 00:00:08 UTC,view,31500053,2053013558031024687,,luminarc,41.16,550978835,6280d577-25c8-4147-99a7-abc6048498d6
8,2019-10-01 00:00:10 UTC,view,28719074,2053013565480109009,apparel.shoes.keds,baden,102.71,520571932,ac1cd4e5-a3ce-4224-a2d7-ff660a105880
9,2019-10-01 00:00:11 UTC,view,1004545,2053013555631882655,electronics.smartphone,huawei,566.01,537918940,406c46ed-90a4-4787-a43b-59a410c1a5fb


## 5. loading and reading file using Modin and rays
https://modin.readthedocs.io/en/stable/

In [24]:
!pip install modin[all]

Collecting modin[all]
  Downloading modin-0.12.1-py3-none-any.whl (761 kB)
[?25l[K     |▍                               | 10 kB 5.6 MB/s eta 0:00:01[K     |▉                               | 20 kB 9.3 MB/s eta 0:00:01[K     |█▎                              | 30 kB 10.7 MB/s eta 0:00:01[K     |█▊                              | 40 kB 4.7 MB/s eta 0:00:01[K     |██▏                             | 51 kB 4.1 MB/s eta 0:00:01[K     |██▋                             | 61 kB 4.9 MB/s eta 0:00:01[K     |███                             | 71 kB 5.3 MB/s eta 0:00:01[K     |███▍                            | 81 kB 4.4 MB/s eta 0:00:01[K     |███▉                            | 92 kB 4.9 MB/s eta 0:00:01[K     |████▎                           | 102 kB 5.3 MB/s eta 0:00:01[K     |████▊                           | 112 kB 5.3 MB/s eta 0:00:01[K     |█████▏                          | 122 kB 5.3 MB/s eta 0:00:01[K     |█████▋                          | 133 kB 5.3 MB/s eta 0:00:01[K

In [25]:
#importing the library
import pandas as pd
import modin.pandas as pd
import numpy as np
import ray
ray.shutdown()
ray.init()

2022-04-03 04:31:19,437	INFO services.py:1414 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'address': '172.28.0.2:65072',
 'gcs_address': '172.28.0.2:65072',
 'metrics_export_port': 60472,
 'node_id': '1f20a5c18277fe8364c260c6a1298ac8c44e9b020e87256f7d1eeb2d',
 'node_ip_address': '172.28.0.2',
 'object_store_address': '/tmp/ray/session_2022-04-03_04-31-15_296970_911/sockets/plasma_store',
 'raylet_ip_address': '172.28.0.2',
 'raylet_socket_name': '/tmp/ray/session_2022-04-03_04-31-15_296970_911/sockets/raylet',
 'redis_address': None,
 'session_dir': '/tmp/ray/session_2022-04-03_04-31-15_296970_911',
 'webui_url': '127.0.0.1:8265'}

In [29]:
#importing os
import os
os.environ["MODIN_ENGINE"] = "ray"  # Modin will use Ray

In [None]:
# loading and reading the dataset with modin
import modin.pandas as pd
from distributed import Client
client = Client()

In [None]:
%%time
df1 = pd.read_csv("./price_paid_records.csv")

In [None]:
#getting the first 5 colums.head()
df1.head()

crashing thus i was unable to run the codes above

## File loading conclusion

While loading big files, i came to this conclusion
1. Dask is much faster compared to others; it take about 0.0125 seconds to load the big file.
2. Datatable cames second, as it is slower loading big file compared to dask.it took about 2.6 minutes (about 157 seconds).
3.config file came in 3rd, taking about 2.8 minutes to load (about 169 seconds)
4. Pandas came in last, taking about 2.86 minutes(about 171 seconds).

Therefore, Dask is the best when loading big files.

## **Creating a file in pipe separated text file (|) in gz format**.









In [31]:
import gzip 

while True:
    if util.col_header_val(df_dask,config_data)==0:
        print("---------------------------------------------")
        print("Validation Failed! Please, check file columns!")
    else:
        print("Column Validation Passed")
        input = open('online_trans.csv', 'rb')
        s = input.read()
        input.close()
        
        output = gzip.GzipFile('df_dask.gz','wb')
        output.write(s)
        output.close()
        
      
        print("Your Dataframe has been compressed to a .gzip file in the same folder.")
    break


column name and column length validation failed
Following File columns are not in the YAML file ['brand', 'product_id', 'category_id', 'user_id', 'event_type', 'event_time', 'category_code', 'user_session']
Following YAML columns are not in the file uploaded ['duration', 'old/new', 'county', 'town/city', 'ppdcategory type', 'property type', 'record status - monthly file only', 'date of transfer', 'transaction unique identifier', 'district']
---------------------------------------------
Validation Failed! Please, check file columns!


## **Create a summary of the file:**

1. Total number of rows,

2. total number of columns

2. file size

In [33]:
#checking number of columns
print("The dataset has {} columns" .format(len(df_dask.columns)))

The dataset has 9 columns


In [34]:
import os
#file size
file_size = os.path.getsize("./price_paid_records.csv")

print("The file size is: {}" .format(file_size))

The file size is: 5668612855


Finish