In [92]:
import warnings, json
import pandas as pd
from tabulate import tabulate

warnings.filterwarnings("ignore")

# Load and extract Data

## Data source I

In [93]:
import urllib.request

source_I_url = "https://raw.githubusercontent.com/rahilpacmann/case-data-wrangling-api/main/city.csv"

with urllib.request.urlopen(source_I_url) as response:
    df_1 = pd.read_csv(response)
df_1.head(5)

Unnamed: 0,city_id,city,country
0,1,A Corua (La Corua),Spain
1,2,Abha,Saudi Arabia
2,3,Abu Dhabi,United Arab Emirates
3,4,Acua,Mexico
4,5,Adana,Turkey


## Data source II

In [94]:
source_II_url = "https://raw.githubusercontent.com/rahilpacmann/case-data-wrangling-api/main/country.csv"

with urllib.request.urlopen(source_II_url) as response:
    df_2 = pd.read_csv(response)
df_2.head(5)

Unnamed: 0,country,last_update
0,Afghanistan,2006-02-15 09:44:00
1,Algeria,2006-02-15 09:44:00
2,American Samoa,2006-02-15 09:44:00
3,Angola,2006-02-15 09:44:00
4,Anguilla,2006-02-15 09:44:00


## Data source III

In [95]:
from sqlalchemy import create_engine, Engine
from pandas import DataFrame

def source_postgres_engine(database_name: str):
    # connection to database
    user = "postgres"
    password = "qwerty123"
    host = "localhost"
    port = "5433"

    engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database_name}")
    
    return engine

def get_table_data(table_name: str, engine: Engine) -> DataFrame:
    query = f"SELECT * FROM {table_name}"
    return pd.read_sql(query, engine)

In [96]:
engine = source_postgres_engine("dvdrental")

actor_df = get_table_data("actor", engine)
store_df = get_table_data('store', engine)
address_df = get_table_data('address', engine)
category_df = get_table_data('category', engine)
customer_df = get_table_data('customer', engine)
film_actor_df = get_table_data('film_actor', engine)
film_category_df = get_table_data('film_category', engine)
inventory_df = get_table_data('inventory',engine)
language_df = get_table_data('language',engine)
rental_df = get_table_data('rental',engine)
staff_df = get_table_data('staff',engine)
payment_df = get_table_data('payment',engine)
film_df = get_table_data('film',engine)

In [97]:
data_dict = {
    "actor": actor_df,
    "store": store_df,
    "address": address_df,
    "category": category_df,
    "customer": customer_df,
    "film_actor": film_actor_df,
    "film_category": film_category_df,
    "inventory": inventory_df,
    "language": language_df,
    "rental": rental_df,
    "staff": staff_df,
    "payment": payment_df,
    "film": film_df,
    "city": df_1,
    "country": df_2    
}

# Data Validation

In [98]:
requirement_url = "https://rahilpacmann.github.io/case-data-wrangling-api/requirements_table.json"

with urllib.request.urlopen(requirement_url) as response:
    requirement_json = json.load(response)
print(requirement_json)

{'actor': [{'column_name': 'actor_id', 'data_type': 'int64'}, {'column_name': 'last_update', 'data_type': 'datetime64[ns]'}, {'column_name': 'first_name', 'data_type': 'object'}, {'column_name': 'last_name', 'data_type': 'object'}], 'store': [{'column_name': 'store_id', 'data_type': 'int64'}, {'column_name': 'manager_staff_id', 'data_type': 'int64'}, {'column_name': 'address_id', 'data_type': 'int64'}, {'column_name': 'last_update', 'data_type': 'datetime64[ns]'}], 'address': [{'column_name': 'last_update', 'data_type': 'datetime64[ns]'}, {'column_name': 'city_id', 'data_type': 'int64'}, {'column_name': 'address_id', 'data_type': 'int64'}, {'column_name': 'district', 'data_type': 'object'}, {'column_name': 'phone', 'data_type': 'object'}, {'column_name': 'postal_code', 'data_type': 'object'}, {'column_name': 'address', 'data_type': 'object'}, {'column_name': 'address2', 'data_type': 'object'}], 'category': [{'column_name': 'category_id', 'data_type': 'int64'}, {'column_name': 'last_upd

In [99]:
actual_table_name = list(requirement_json.keys())
print(actual_table_name)

['actor', 'store', 'address', 'category', 'city', 'country', 'customer', 'film_actor', 'film_category', 'inventory', 'language', 'rental', 'staff', 'payment', 'film']


In [100]:
def check_table_requirements(actual_table: dict, requirement_table: dict):
    actual_table_name = list(actual_table.keys())
    requirement_table_name = list(requirement_table.keys())

    table_checking = []
    for table_name in requirement_table_name:
        if table_name in actual_table_name:
            table_checking.append([table_name, "✓"])
        else:
            table_checking.append([table_name, "✗"])
    
    table_headers = ["Table name", "Is exist"]
    table = tabulate(table_checking, headers=table_headers, tablefmt="grid")
    print("=> STEP 1: CHECK TABLE")
    print(table)

check_table_requirements(data_dict, requirement_json)

=> STEP 1: CHECK TABLE
+---------------+------------+
| Table name    | Is exist   |
| actor         | ✓          |
+---------------+------------+
| store         | ✓          |
+---------------+------------+
| address       | ✓          |
+---------------+------------+
| category      | ✓          |
+---------------+------------+
| city          | ✓          |
+---------------+------------+
| country       | ✓          |
+---------------+------------+
| customer      | ✓          |
+---------------+------------+
| film_actor    | ✓          |
+---------------+------------+
| film_category | ✓          |
+---------------+------------+
| inventory     | ✓          |
+---------------+------------+
| language      | ✓          |
+---------------+------------+
| rental        | ✓          |
+---------------+------------+
| staff         | ✓          |
+---------------+------------+
| payment       | ✓          |
+---------------+------------+
| film          | ✓          |
+---------------

In [101]:
def check_data_shape(actual_table: dict):
    table_shape = []

    for table_name in actual_table:
        rows, cols = actual_table[table_name].shape
        table_shape.append([table_name, rows, cols])
    
    table_headers = ["Table name", "Number of rows", "Number of columns"]
    table = tabulate(table_shape, headers=table_headers, tablefmt="grid")
    print("=> STEP 2: CHECK TABLE SHAPE")
    print(table)
    
check_data_shape(data_dict)

=> STEP 2: CHECK TABLE SHAPE
+---------------+------------------+---------------------+
| Table name    |   Number of rows |   Number of columns |
| actor         |              200 |                   4 |
+---------------+------------------+---------------------+
| store         |                2 |                   4 |
+---------------+------------------+---------------------+
| address       |              603 |                   8 |
+---------------+------------------+---------------------+
| category      |               16 |                   3 |
+---------------+------------------+---------------------+
| customer      |              599 |                  10 |
+---------------+------------------+---------------------+
| film_actor    |             5462 |                   3 |
+---------------+------------------+---------------------+
| film_category |             1000 |                   3 |
+---------------+------------------+---------------------+
| inventory     |          

In [102]:
def check_columns(actual_table: dict, requirement_table: dict):
    print("=> STEP 3: CHECK COLUMNS")

    for table_name in requirement_table:
        result = []
        actual_columns = list(actual_table[table_name].columns)
        requirement_columns = []

        for data in requirement_table[table_name]:
            requirement_columns.append(data["column_name"])
        
        for column_name in set(actual_columns + requirement_columns):
            in_actual_table = "✔" if column_name in actual_columns else "✘"
            in_requirement_table = "✔" if column_name in requirement_columns else "✘"
            result.append([column_name, in_actual_table, in_requirement_table])

        if set(actual_columns) == set(requirement_columns):
            pass
        else:
            print(table_name)
            table_headers = ["Column name", "In actual table", "In requirement table"]
            table = tabulate(result, headers=table_headers, tablefmt="grid")
            print(table)
            print("\n")
            
check_columns(data_dict, requirement_json)

=> STEP 3: CHECK COLUMNS
city
+---------------+-------------------+------------------------+
| Column name   | In actual table   | In requirement table   |
| city_id       | ✔                 | ✔                      |
+---------------+-------------------+------------------------+
| country       | ✔                 | ✘                      |
+---------------+-------------------+------------------------+
| last_update   | ✘                 | ✔                      |
+---------------+-------------------+------------------------+
| country_id    | ✘                 | ✔                      |
+---------------+-------------------+------------------------+
| city          | ✔                 | ✔                      |
+---------------+-------------------+------------------------+


country
+---------------+-------------------+------------------------+
| Column name   | In actual table   | In requirement table   |
| country       | ✔                 | ✔                      |
+--------------

In [103]:
def check_data_type(actual_table: dict, requirement_table: dict):
    result = []

    for table_name, df in actual_table.items():
        if table_name in requirement_table:
            for info_table in requirement_table[table_name]:
                column_name = info_table["column_name"] 
                data_type_req = info_table["data_type"]
                if column_name in df.columns:
                    data_type_actual = df[column_name].dtype
                    result_data_type = "✔" if data_type_req == data_type_actual else "✘"
                    result.append([table_name, column_name, data_type_actual, 
                                   data_type_req, result_data_type])
                else:
                    result.append([table_name, column_name, "N/A", 
                                   data_type_req, "✘ (Column not found)"])
    
    print("=> STEP 4: CHECK DATA TYPE")
    table_headers = ["Table name", "Column name", "Actual type", "Requirement type", "Match"]
    
    missmatch_data = [row for row in result if "✘" in row[4]] 
    if missmatch_data:
        print("\nSummary of Mismatches Data Types:")
        table = tabulate(missmatch_data, headers=table_headers, tablefmt="grid")
        print(table)
    else:
        print("All data types match")

check_data_type(data_dict, requirement_json)

=> STEP 4: CHECK DATA TYPE

Summary of Mismatches Data Types:
+--------------+---------------+---------------+--------------------+----------------------+
| Table name   | Column name   | Actual type   | Requirement type   | Match                |
| customer     | create_date   | object        | datetime64[ns]     | ✘                    |
+--------------+---------------+---------------+--------------------+----------------------+
| city         | country_id    | N/A           | int64              | ✘ (Column not found) |
+--------------+---------------+---------------+--------------------+----------------------+
| city         | last_update   | N/A           | datetime64[ns]     | ✘ (Column not found) |
+--------------+---------------+---------------+--------------------+----------------------+
| country      | country_id    | N/A           | int64              | ✘ (Column not found) |
+--------------+---------------+---------------+--------------------+----------------------+
| countr

In [104]:
def check_missing_values(actual_table: dict):
    result = []

    for table_name, df in actual_table.items():
        missing_count = df.isna().sum()
        total_data = df.shape[0]
        
        for column, missing in missing_count.items():
            missing_count_percantage = round((missing / total_data * 100), 2)
            result.append([table_name, column, missing, missing_count_percantage])
    
    table_headers = ["Table name", "Column name", "Missing value count", 
                     "Missing value percentage"]
    missing_value = [row for row in result if row[2] != 0]
    if missing_value:
        table = tabulate(missing_value, headers=table_headers, tablefmt="grid")
        print("=> STEP 5: CHECK MISSING VALUE")
        print(table)
    else:
        print("There's no Missing Values")

check_missing_values(data_dict)

=> STEP 5: CHECK MISSING VALUE
+--------------+---------------+-----------------------+----------------------------+
| Table name   | Column name   |   Missing value count |   Missing value percentage |
| address      | address2      |                     4 |                       0.66 |
+--------------+---------------+-----------------------+----------------------------+
| rental       | return_date   |                   183 |                       1.14 |
+--------------+---------------+-----------------------+----------------------------+
| staff        | picture       |                     1 |                      50    |
+--------------+---------------+-----------------------+----------------------------+
| city         | city          |                    10 |                       1.48 |
+--------------+---------------+-----------------------+----------------------------+
| city         | country       |                     7 |                       1.03 |
+--------------+-------

In [114]:
def check_duplicates_data(actual_table: dict):
    result = []

    for table_name, df in actual_table.items():
        # duplicate_rows = df[df.duplicated(keep=False)]
        duplicate_rows = df.astype(str).duplicated(keep=False).sum()
        result.append([table_name, duplicate_rows])
    
    duplicate_data = [row for row in result if row[1] != 0]
    if duplicate_data:
        table_headers = ["Table name", "Duplicate rows count"]
        duplicates_data = [row for row in result if row[1] != 0]
        table = tabulate(duplicates_data, headers=table_headers, tablefmt="grid")
        print("=> STEP 6: CHECK DUPLICATES DATA")
        print("Duplicate Data Summary:")
        print(table)
    else:   
        print("No Duplicate Data Found")

check_duplicates_data(data_dict)

=> STEP 6: CHECK DUPLICATES DATA
Duplicate Data Summary:
+--------------+------------------------+
| Table name   |   Duplicate rows count |
| city         |                    154 |
+--------------+------------------------+
