# Week #4 - Data Transformation
Data Pipeline Course - Sekolah Engineer - Pacmann Academy 



## Description 
Data transformation involves modifying or converting data from one format, structure, or representation to another, typically to make it more suitable for downstream processing, analysis, or storage

## Case Description
<img src='pict/transformation1.png' width="800"> <br>

In the Data Extraction and Load module, we successfully extracted and load data to staging area

In the Data Transformation module, we will focus on the following tasks:
1. Transform the data from the staging area to align with the given Target Data Model schema.
2. Validate the transformed data. If the data does not meet the validation criteria, it will not be loaded into the database.
3. Load the validated and transformed data into the target system.


## Target Data Schema

<img src='pict/transformation2.png' width="800"> <br>


Create your database with this DDL:
``` sql
-- Create the category table
CREATE TABLE category (
    category_id SERIAL PRIMARY KEY,
    category_nk int unique,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);


-- Create the customer table
CREATE TABLE customer (
    customer_id SERIAL PRIMARY KEY,
    customer_nk int unique, 
    first_name VARCHAR(255) NOT NULL,
    last_name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL,
    phone VARCHAR(100),
    address TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create the order table
CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    order_nk varchar(255) unique,
    customer_id INT REFERENCES customer(customer_id),
    order_date DATE NOT NULL,
    status varchar NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE order_detail (
    order_detail_id SERIAL PRIMARY KEY,
    order_id int REFERENCES orders(order_id),
    product_id varchar(255) NOT NULL,
    price NUMERIC(10, 2) NOT NULL,
    quantity INT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(order_id, product_id, quantity)
);
```

## Source to Target Maping

After conducting Data Profiling on the source, here is the source-to-target mapping information that we will use for the transformation process.

### Table Category
source: category table

| Source Field | Target Field  | Transformation Rule           |
|--------------|---------------|--------------------------------|
| -            | category_id   | Auto Generated                 |
| category_id  | category_nk   | Direct mapping, unique         |
| name         | name          | Direct mapping, unique         |
| description  | description   | Direct mapping                 |


### Table Customer
source:  customer table

| Source Field | Target Field | Transformation Rule     |
|--------------|--------------|-------------------------|
| -            | customer_id  | Auto Generated          |
| customer_id  | customer_nk  | Direct Mapping, unique  |
| first_name   | first_name   | Direct Mapping          |
| last_name    | last_name    | Direct Mapping          |
| email        | email        | Direct Mapping, unique          |
| phone        | phone        | Direct Mapping          |
| address      | address      | Direct Mapping          |



## Table orders
source: orders table

| Source Field | Target Field | Transformation Rule                                   |
|--------------|--------------|-------------------------------------------------------|
| -            | order_id     | Auto Generated                                        |
| order_id     | order_nk     | Direct Mapping                                        |
| customer_id  | customer_id  | Use the customer_nk from the customer table by matching the customer_id (source) |
| order_date   | order_date   | Direct Mapping                                        |
| status       | status       | Direct Mapping                                        |

Do Deduplication to get unique order data by order_id

## Table order_detail
source: orders table

| Source Field | Target Field     | Transformation Rule                                                |
|--------------|------------------|--------------------------------------------------------------------|
| -            | order_detail_id  | Auto Generated                                                     |
| order_id     | order_id         | Use the order_nk from the orders table by matching the order_id (source)    |
| product_id   | product_id       | Direct Mapping                                                     |
| price        | price            | Direct Mapping                                                     |
| quantity     | quantity         | Direct Mapping                                                     |

Do Deduplication to get unique order data by order_id, product_id and quantity

## Data Validation

Here are some validation rules that can be applied to the previously mentioned tables to ensure data integrity and quality:

### Validation Rule
1. Customer Table Validation:
    - Check first_name and last_name are not null or empty.
    - Validate email for correct format (yahoo.com, hotmail.com, gmail.com)

2. Order Detail Table Validation:
    - Validate price to ensure it is a positive number.
    - Validate quantity to ensure it is a positive integer.

In [3]:
from dotenv import load_dotenv
import os
import pandas as pd
from sqlalchemy import create_engine
import csv
from datetime import datetime
import re

#The Minio libray is used to interact with a MinIO server. 
from minio import Minio

# BytesIO provides a way to work with binary data in memory as if it were a file.
from io import BytesIO

from src.log.log import log_to_csv

## Extract From Staging Area

In [2]:
def extract_staging(table_name: str): 
    
    try:
        # create connection to database staging
        conn = create_engine("postgresql://postgres:aku@localhost/staging")

        log = pd.read_csv("log.csv")

        # Get date from previous process
        condition = (
            (log['step'] == 'extraction') &
            (log['status'] == 'success') &
            (log['source'] == 'staging') &
            (log['table_name'] == table_name)
        )

        # Apply the filter
        etl_date = log[condition]['etl_date']

        # If no previous extraction has been recorded (etl_date is empty), set etl_date to '1111-01-01' indicating the initial load.
        # Otherwise, retrieve data added since the last successful extraction (etl_date).
        if(etl_date.empty):
            etl_date = '1111-01-01'
        else:
            etl_date = max(etl_date)

        # Constructs a SQL query to select all columns from the specified table_name where created_at is greater than etl_date.
        query = f"SELECT * FROM {table_name} WHERE created_at > %s::timestamp"

        # Execute the query with pd.read_sql
        df = pd.read_sql(sql=query, con=conn, params=(etl_date,))
        log_msg = {
                "step" : "extraction",
                "status": "success",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
        return df
    except Exception as e:
        log_msg = {
            "step" : "extraction",
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
        }
        print(e)
    finally:
        log_to_csv(log_msg, 'log.csv')

    


## Data Transformation 

Since each table has different behavior, each target table has its own transformation and validation process.

### Category Table

In [3]:
def transform_category(data: pd.DataFrame):

    try:
        # rename column category to category_nk
        data = data.rename(columns={'category_id':'category_nk'})

        # deduplication based on category_nk and category name
        data = data.drop_duplicates(subset='category_nk')
        data = data.drop_duplicates(subset='name')
        
        log_msg = {
                "step" : "transformation",
                "status": "success",
                "source": "staging",
                "table_name": "category",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return data
    except Exception as e:
        log_msg = {
            "step" : "transformation",
            "status": "failed",
            "source": "staging",
            "table_name": "category",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
    finally:
        log_to_csv(log_msg, 'log.csv')
        

### Cutomer Table

In [4]:
def transform_customer(data: pd.DataFrame):
    try:
        # rename column customer to customer_nk
        data = data.rename(columns={'customer_id':'customer_nk'})

        #deduplication based on customer_nk and email
        data = data.drop_duplicates(subset='customer_nk')
        data = data.drop_duplicates(subset='email')
                
        log_msg = {
                "step" : "transformation",
                "status": "success",
                "source": "staging",
                "table_name": "customer",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return data
    except Exception as e:
        log_msg = {
            "step" : "transformation",
            "status": "failed",
            "source": "staging",
            "table_name": "customer",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
    finally:
        log_to_csv(log_msg, 'log.csv')


### Table Orders

The order table has a relationship with the customer table, so it is necessary to obtain the customer_id from our target database.

In [5]:
def extract_target(table_name: str):
    """
    Fungsi untuk mengekstrak data dari database target
    """
    conn = create_engine("postgresql://postgres:aku@localhost/wh_w4")

    # Constructs a SQL query to select all columns from the specified table_name where created_at is greater than etl_date.
    query = f"SELECT * FROM {table_name}"

    # Execute the query with pd.read_sql
    df = pd.read_sql(sql=query, con=conn)
    
    return df

In [6]:
def transform_orders(data: pd.DataFrame):
    try:
        # rename column order_id and customer_id
        rename_column = {
            'order_id':'order_nk',
            'customer_id':'customer_nk'
        }
        # select column
        selected_column = ['order_nk', 'customer_id', 'order_date', 'status']

        #extract data customer from target
        data_cust = extract_target(table_name='customer')
        
        data = data.rename(columns=rename_column)

        #deduplication based on ['order_nk','customer_nk','order_date','status'] (profiling result)
        data = data.drop_duplicates(subset=['order_nk','customer_nk','order_date','status'])

        # get customer_id from tabel customer in database target
        merged_data = data.merge(data_cust[['customer_id', 'customer_nk']], left_on='customer_nk', right_on='customer_nk', how='left')

        log_msg = {
                "step" : "transformation",
                "status": "success",
                "source": "staging",
                "table_name": "orders",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return merged_data[selected_column]
    except Exception as e:
        log_msg = {
            "step" : "transformation",
            "status": "failed",
            "source": "staging",
            "table_name": "orders",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
        print(e)
    finally:
        log_to_csv(log_msg, 'log.csv')


### Table Order Detail

The order_detail table has a relationship with the orders table, so it is necessary to obtain the order_id from our target database.

In [7]:
def transform_order_detail(data: pd.DataFrame):
    try:
        #rename column order_nk
        rename_column = {
            'order_id':'order_nk',
        }

        #select column
        selected_column = ['order_id', 'product_id', 'price', 'quantity']

        #extarct table orders from database target
        data_orders = extract_target(table_name='orders')
        
        data = data.rename(columns=rename_column)

        #deduplication based on ['order_nk','product_id','quantity']
        data = data.drop_duplicates(subset=['order_nk','product_id','quantity'])

        # get order_id from table orders in database target
        merged_data = data.merge(data_orders[['order_id', 'order_nk']], left_on='order_nk', right_on='order_nk', how='left')

                
        log_msg = {
                "step" : "transformation",
                "status": "success",
                "source": "staging",
                "table_name": "order_detail",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return merged_data[selected_column]
    except Exception as e:
        log_msg = {
            "step" : "transformation",
            "status": "failed",
            "source": "staging",
            "table_name": "order_detail",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
        print(e)
    finally:
        log_to_csv(log_msg, 'log.csv')


## Data Validation

### Customer Validation

In [1]:
#validation not null column
def validate_not_empty(value):
    return bool(value and value.strip())

#validation email domain
def validate_email_format(email):
    email_regex = re.compile(r"^[\w\.-]+@(yahoo\.com|hotmail\.com|gmail\.com)$")
    return bool(email_regex.match(email))

In [2]:
def valdiation_customer(data: pd.DataFrame, table_name: str):
    try:

        # Create a report DataFrame
        report_data = {
            'first_name_valid': data['first_name'].apply(validate_not_empty),
            'last_name_valid': data['last_name'].apply(validate_not_empty),
            'email_format_valid': data['email'].apply(validate_email_format),
        }

        report_df = pd.DataFrame(report_data)

        # summirize status data by all condition
        report_df['all_valid'] = report_df.all(axis=1)

        # Filter out valid rows (all_valid = 'True')
        valid_customers_df = data[report_df['all_valid']]

        # Filter out invalid rows (all_valid = 'False')
        invalid_customers_df = data[~report_df['all_valid']]
        
        #create success log message
        log_msg = {
                "step" : "validation",
                "status": "success",
                "source": 'staging',
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
        return valid_customers_df, invalid_customers_df
    except Exception as e:
        #create fail log message
        log_msg = {
            "step" : "validation",
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
        }
    finally:
        log_to_csv(log_msg, 'log.csv')


NameError: name 'pd' is not defined

### Order Detail Validation

In [None]:
def validate_positive_number(value):
    return pd.notna(value) and value > 0

In [9]:
def valdiation_order_detail(data: pd.DataFrame, table_name: str):
    try:
        # Create a report DataFrame
        report_data = {
            'price_valid': data['price'].apply(validate_positive_number),
            'quantity_valid': data['quantity'].apply(validate_positive_number)
        }

        report_df = pd.DataFrame(report_data)
        
        # summirize status data by all condition
        report_df['all_valid'] = report_df.all(axis=1)

        # Filter out valid rows (all_valid = 'True')
        valid_order_details_df = data[report_df['all_valid']]

        # Filter out invalid rows (all_valid = 'False')
        invalid_order_details_df = data[~report_df['all_valid']]
        
        #create success log message
        log_msg = {
                "step" : "validation",
                "status": "success",
                "source": 'staging',
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
        return valid_order_details_df, invalid_order_details_df
    except Exception as e:
        #create fail log message
        log_msg = {
            "step" : "validation",
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
        }
    finally:
        log_to_csv(log_msg, 'log.csv')


## Load Data

In [10]:
from pangres import upsert
from src.load.load_error import handle_error
def load_target(data:pd.DataFrame, schema:str, table_name: str, idx_name:str, source):
    try:
        # create connection to database
        conn = create_engine("postgresql://postgres:aku@localhost/wh_w4")
        
        # set data index or primary key
        data = data.set_index(idx_name)
        
        # Do upsert (Update for existing data and Insert for new data)
        upsert(con = conn,
                df = data,
                table_name = table_name,
                schema = schema,
                if_row_exists = "update")
        
        #create success log message
        log_msg = {
                "step" : "load target",
                "status": "success",
                "source": source,
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
        return data
    except Exception as e:

        #create fail log message
        log_msg = {
            "step" : "load target",
            "status": "failed",
            "source": source,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
        }
        print(e)
        # Handling error: save data to Object Storage
        try:
            handle_error(data = data, bucket_name='error', table_name= table_name)
        except Exception as e:
            print(e)
    finally:
        log_to_csv(log_msg, 'log.csv')


### Pipeline Staging to Target

In [11]:
# Extract Transform and Load Category
df_category = extract_staging(table_name='category')
category_transform = transform_category(data=df_category)
load_target(data=category_transform,
            schema="public",
            table_name='category',
            idx_name='category_nk',
            source='staging')

Unnamed: 0_level_0,name,description,created_at
category_nk,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,Computers&Accessories,Computers&Accessories is Skill final here skin...,2024-07-02 15:06:39.594703
2,Electronics,Electronics is Letter offer probably state org...,2024-07-02 15:06:39.594703
3,MusicalInstruments,MusicalInstruments is Above without but federa...,2024-07-02 15:06:39.594703
4,OfficeProducts,OfficeProducts is Letter participant lot indic...,2024-07-02 15:06:39.594703
6,HomeImprovement,HomeImprovement is Meeting senior student win ...,2024-07-02 15:06:39.594703
7,Toys&Games,Toys&Games is Local summer prevent authority h...,2024-07-02 15:06:39.594703
8,Car&Motorbike,Car&Motorbike is Big people role me play onto.,2024-07-02 15:06:39.594703
9,Health&PersonalCare,Health&PersonalCare is Stand response prove co...,2024-07-02 15:06:39.594703
5,Home and Kitchen,Home&Kitchen is Service discussion again sea a...,2024-07-02 15:06:39.594703


In [12]:
# Extract Transform and Load Customer
df_customer = extract_staging(table_name='customer')
customer_transform = transform_customer(data=df_customer)
valid_customer, invalid_customer = valdiation_customer(data = customer_transform,
                                     table_name='customer')
load_target(data=valid_customer,
            schema="public",
            table_name='customer',
            idx_name='customer_nk',
            source='staging')

Unnamed: 0_level_0,first_name,last_name,email,phone,address,created_at
customer_nk,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,Jackie,Butler,jackie740@hotmail.com,639-601-6489,"0682 Davis Mount\nNorth Ryan, DE 34214",2024-07-02 15:50:53.961925
2,Ryan,Brown,ryan611@gmail.com,7246609373,"087 Michael Mountain\nPort Dominiquechester, V...",2024-07-02 15:50:53.961925
3,Virginia,Allen,virginia858@yahoo.com,+1-938-242-0900,"845 Amanda Turnpike\nChadbury, AS 71148",2024-07-02 15:50:53.961925
4,Patty,Allen,patty464@hotmail.com,431.665.1039x74107,"48782 Lisa Centers Suite 303\nEast Marieton, V...",2024-07-02 15:50:53.961925
5,Bryan,Gonzalez,bryan273@yahoo.com,268.200.7349x794,"5896 Caitlin Radial Suite 467\nPort Maryfurt, ...",2024-07-02 15:50:53.961925
...,...,...,...,...,...,...
978,Reginald,Becker,reginald548@hotmail.com,595-251-4621x510,"589 Monica Landing Apt. 451\nLake James, NH 61198",2024-07-02 15:50:53.961925
979,Phyllis,Lewis,phyllis553@yahoo.com,001-294-785-8996x82361,"794 Wallace Circle\nHernandeztown, WV 14386",2024-07-02 15:50:53.961925
980,Christopher,Smith,christopher077@hotmail.com,001-282-853-7711x0234,"33341 Chen Gateway\nHaileyborough, AK 69666",2024-07-02 15:50:53.961925
981,Kenneth,Berry,kenneth878@yahoo.com,(430)513-6409x6624,"9067 Gray Hills Apt. 024\nKevinview, NY 11486",2024-07-02 15:50:53.961925


In [13]:
# Extract Transform and Load orders
df_orders = extract_staging(table_name='orders')
orders_transform = transform_orders(data=df_orders)
load_target(data=orders_transform,
            schema="public",
            table_name='orders',
            idx_name='order_nk',
            source='staging')

Unnamed: 0_level_0,customer_id,order_date,status
order_nk,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
IINI91PP812,2691.0,2022-01-30,Success
ONNA03MN757,2167.0,2021-01-03,Success
NPCC44AC852,2834.0,2022-09-08,Success
MMIM69AM147,2957.0,2021-06-26,Success
MCIA31MO690,2331.0,2021-07-09,Success
...,...,...,...
COOC58NA784,2146.0,2022-09-27,Success
NMNA61OM567,1996.0,2021-02-06,Success
AANA44AN436,2633.0,2021-04-24,Success
IAAC58MO380,2524.0,2022-10-10,Success


In [16]:
# Transform and Load order_detail
order_detail_transform = transform_order_detail(data=df_orders)
valid_order_detail, invalid_order_detail = valdiation_order_detail(data = order_detail_transform,
                                     table_name='customer')
load_target(data=valid_order_detail,
            schema="public",
            table_name='order_detail',
            idx_name=['order_id', 'product_id', 'quantity'],
            source='staging')

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,price
order_id,product_id,quantity,Unnamed: 3_level_1
871,B08ZN4B121,7,1599.0
871,B0B94JPY2N,13,999.0
871,B07MP21WJD,9,299.0
871,B08G43CCLC,9,999.0
872,B0B217Z5VK,5,3999.0
...,...,...,...
1738,B07NKNBTT3,1,1230.0
1739,B0083T231O,15,1499.0
1739,B07VZYMQNZ,4,1440.0
1739,B09PTT8DZF,10,670.0


link git repository: https://github.com/Kurikulum-Sekolah-Pacmann/ingestion_data_pipeline.git