# 4.2 Data Quality Checks

Data quality checks includes

1. Data schema of every dimensional table matches data model
2. No empty table after running ETL data pipeline

In [1]:
import os
import configparser
from pathlib import Path
from pyspark.sql import SparkSession

In [2]:
config = configparser.ConfigParser()
config.read('capstone.cfg', encoding='utf-8-sig')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
SOURCE_S3_BUCKET = config['S3']['SOURCE_S3_BUCKET']
DEST_S3_BUCKET = config['S3']['DEST_S3_BUCKET']

In [3]:
spark = SparkSession.builder\
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
                    .enableHiveSupport().getOrCreate()

### 1. Data schema of every dimensional table matches data model

[Data Dictionary](https://github.com/KCvW/DataEng/blob/main/Capstone/Data%20Dictionary.png)

In [4]:
s3_bucket = Path(SOURCE_S3_BUCKET)

In [5]:
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        print("Table: " + path.split('/')[-1])
        schema = df.printSchema()

Table: city_code
root
 |-- code: string (nullable = true)
 |-- city: string (nullable = true)

Table: dim_demog_statistics
root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- avg_household_size: string (nullable = true)
 |-- demog_stat_id: long (nullable = true)

Table: fact_immigration
root
 |-- cic_id: double (nullable = true)
 |-- year: double (nullable = true)
 |-- month: double (nullable = true)
 |-- city_code: string (nullable = true)
 |-- arrive_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- mode: double (nullable = true)
 |-- visa: double (nullable = true)
 |-- immigration_id: long (nullable = true)
 |-- country: string (nullable = true)
 |-- state_code: string (nullable = true)

Table: dim_demog_population
root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = tru

### 2. No empty table after running ETL data pipeline

In [6]:
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        record_num = df.count()
        if record_num <= 0:
            raise ValueError("This table is empty!")
        else:
            print("Table: " + path.split('/')[-1] + f" is not empty: total {record_num} records.")

Table: city_code is not empty: total 659 records.
Table: dim_demog_statistics is not empty: total 596 records.
Table: fact_immigration is not empty: total 3096313 records.
Table: dim_demog_population is not empty: total 2891 records.
Table: country_code is not empty: total 288 records.
Table: state_code is not empty: total 54 records.
Table: dim_immi_airline is not empty: total 3096313 records.
Table: dim_temperature is not empty: total 687004 records.
Table: dim_immi_personal is not empty: total 3096313 records.
