# Week #7 - Live Class
Data Pipeline Course - Sekolah Engineer - Pacmann Academy 



## Objective

Objective:
1. Create Data Pipeline for integrating bluebikes data with pyspark

## Case Description

1. `Problem`

A company named Bluebike wants to analyze subscriber data who have used their services. Transaction data for trips is still stored in CSV files for each year, while object data such as stations and bikes are in a database.

- **CSV Files**: Trip data conducted for each year (2019 and 2020).
- **BlueBikes**: Object data for stations and bikes from Bluebikes.

2. `Solution`

To address these issues, an ETL (Extract, Transform, Load) pipeline will be developed. This pipeline will extract data from different sources, apply necessary transformations to clean and standardize the data, and then load it into a unified data warehouse. The pipeline will consist of two layers: Staging and Warehouse, and will include a logging system.

![Pipeline Diagram](https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-data-ingestion-spark/bluebikes_pipeline.drawio.png)

## Preparation

`Docker Compose` and `repository`:
<br> You can find the Docker Compose configuration and the repository at the following link:
- [Pipeline Bluebikes Repository](https://github.com/Kurikulum-Sekolah-Pacmann/pipeline-bluebikes.git)

`Source Dataset` csv: 
The source dataset for trip data is available as a CSV file:
- **Bluebikes Trip Data CSV**: [Link to Dataset](https://www.kaggle.com/datasets/jackdaoud/bluebikes-in-boston)
<br>copy in directory: ./script/data

`Tools and Technologies`:
- Python and Pyspark: For build Data Pipeline
- PostgreSQL: For log, staging and final data storage
- Docker: For running MinIO


## Task

1. `Profiling` <br>
Profiling involves analyzing and understanding the structure, content, and quality of the data from multiple sources within the clinic

2. `Building Data Pipeline EL Source to Staging` <br>
This step focuses on extracting data from the blubikes various source systems. The extracted data is then loaded into a staging area

3. `Building Data Pipeline ETL Staging to Warehouse` <br>
In this phase, data from the staging area is transformed and loaded into the data warehouse

### Profiling

In this task, you will conduct profiling of each table or file provided in the dataset. The profiling process involves the following steps:

1. **Check Number of Columns and Column Names**  
   Verify the number of columns and their names in each table or file to ensure they match the expected schema.

2. **Check Number of Rows**  
   Count the total number of rows to understand the size of the dataset and identify any discrepancies.

3. **Check Data Types**  
   Examine the data types of each column to confirm they align with the expected types and identify any inconsistencies.

4. **Check Percentage of Missing Values**  
   Calculate the percentage of missing values in each column to assess data completeness and quality.

5. **Check Percentage of Valid Date Formats**  
   Determine the percentage of valid date formats in date columns to ensure data consistency and validity.

The first step in the profiling process is to `extract the data` from the source systems. This involves gathering data from various tables or csv file

`Implementing Modular Code in ETL PySpark Process`

When implementing modular code in your ETL PySpark process, it is essential to pass the Spark session as a parameter to your functions. This practice ensures that you remain within the same Spark session while executing related functions, maintaining consistency across your ETL operations.

For example, if your main file is `pipeline_staging.py`, and you import the `extract_database()` function from `src.staging.extract`, you should pass the Spark session to `extract_database(spark_seesion)` when calling it, especially if the function is use `spark operation`. This approach keeps the function **within the Spark session** created in your main file, ensuring that all operations are performed within the same session context.
![Spark Session Diagram](https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-data-ingestion-spark/sparksession.png)


#### Load Your .env file

create `helper.py` to create your database connection (src/utils)

example of .env file:

```
DB_HOST_SOURCE="localhost"
DB_USER_SOURCE="postgres"
DB_PASS_SOURCE="aku"
DB_PORT_SOURCE="5432"

DB_HOST_TARGET="localhost"
DB_USER_TARGET="postgres"
DB_PASS_TARGET="aku"
DB_PORT_TARGET="5432"


DB_NAME_BLUEBIKES="bluebikes"
DB_NAME_STG="staging"
DB_NAME_LOG="etl_log"
DB_NAME_WH="warehouse"

```


In [1]:
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os

load_dotenv(".env")

DB_HOST_SOURCE = os.getenv("DB_HOST_SOURCE")
DB_USER_SOURCE = os.getenv("DB_USER_SOURCE")
DB_PASS_SOURCE = os.getenv("DB_PASS_SOURCE")
DB_PORT_SOURCE = os.getenv("DB_PORT_SOURCE")

DB_HOST_TARGET = os.getenv("DB_HOST_TARGET")
DB_USER_TARGET = os.getenv("DB_USER_TARGET")
DB_PASS_TARGET = os.getenv("DB_PASS_TARGET")
DB_PORT_TARGET = os.getenv("DB_PORT_TARGET")

DB_NAME_BLUEBIKES = os.getenv("DB_NAME_BLUEBIKES")
DB_NAME_STG = os.getenv("DB_NAME_STG")
DB_NAME_LOG = os.getenv("DB_NAME_LOG")
DB_NAME_WH = os.getenv("DB_NAME_WH")

# Create URL link for each database connection

def bluebikes_engine():
    DB_URL = f"jdbc:postgresql://{DB_HOST_SOURCE}:{DB_PORT_SOURCE}/{DB_NAME_BLUEBIKES}"
    return DB_URL, DB_USER_SOURCE, DB_PASS_SOURCE

def stg_engine():
    DB_URL = f"jdbc:postgresql://{DB_HOST_TARGET}:{DB_PORT_TARGET}/{DB_NAME_STG}"
    return DB_URL, DB_USER_TARGET, DB_PASS_TARGET

def log_engine():
    DB_URL = f"jdbc:postgresql://{DB_HOST_TARGET}:{DB_PORT_TARGET}/{DB_NAME_LOG}"
    return DB_URL, DB_USER_TARGET, DB_PASS_TARGET

def wh_engine():
    DB_URL = f"jdbc:postgresql://{DB_HOST_TARGET}:{DB_PORT_TARGET}/{DB_NAME_WH}"
    return DB_URL, DB_USER_TARGET, DB_PASS_TARGET
    


#### Extarct Data From Database

Create a file named `extract.py` in the `src/profiling/` directory to store the functions `extract_database` and `extract_csv`. 

Here’s how you can define these functions:

1. **`extract_database`**: This function will extract data from a database using a Spark session.
2. **`extract_csv`**: This function will extract data from a CSV file using a Spark session.

In [2]:
from src.utils.helper import bluebikes_engine
from pyspark.sql import SparkSession

def extract_database(spark: SparkSession, table_name):
    # get config
    DB_URL, DB_USER, DB_PASS = bluebikes_engine()

    # set config
    connection_properties = {
        "user": DB_USER,
        "password": DB_PASS,
        "driver": "org.postgresql.Driver" # set driver postgres
    }

    # read data
    df = spark \
              .read \
              .jdbc(url = DB_URL,
                    table = table_name,
                    properties = connection_properties)
    
    return df

#### Extarct Data From CSV File

In [3]:
PATH = "data/"
from pyspark.sql import SparkSession

def extract_csv(spark: SparkSession, file_name):

    # read data
    df = spark.read.csv(PATH + file_name, header=True)

    return df

#### Profiling Data From Database

In [4]:
import pyspark
from pyspark.sql import SparkSession

In [5]:
# create spark session
spark = SparkSession \
    .builder \
    .appName("Profiling Data") \
    .getOrCreate()

Extarct Data Table bike, station and user_type

In [6]:
from src.profiling.extract import extract_database, extract_csv

df_user_type = extract_database(spark, 'user_type')
df_station = extract_database(spark, 'station')
df_bike = extract_database(spark, 'bike')


In [7]:
df_user_type.show()

+------------+--------------+
|user_type_id|user_type_name|
+------------+--------------+
|           0|    Subscriber|
|           1|      Customer|
+------------+--------------+



In [8]:
df_station.show(5)

+----------+--------------------+------------------+------------------+
|station_id|        station_name|          latitude|         longitude|
+----------+--------------------+------------------+------------------+
|         1|18 Dorrance Wareh...|         42.387151|        -71.075978|
|         3|Colleges of the F...| 42.34011512249237|-71.10061883926393|
|         4|Tremont St at E B...|         42.345392|        -71.069616|
|         5|Northeastern Univ...|         42.341814|        -71.090179|
|         6|Cambridge St at J...|42.361211653079856|-71.06530619789737|
+----------+--------------------+------------------+------------------+
only showing top 5 rows



In [9]:
df_bike.show(5)

+-------+--------+------+-------------+
|bike_id|    type| model|purchase_date|
+-------+--------+------+-------------+
|      1|    road|R88W4N|   2020-12-02|
|      4|    road|G79YM9|   2022-10-08|
|      7|foldable|IOEZRL|   2024-07-07|
|      8|foldable|ADQ2DA|   2021-03-26|
|      9|foldable|8EEQ8Z|   2023-03-04|
+-------+--------+------+-------------+
only showing top 5 rows



Extract Data CSV bluebikes_tripdata_2019.csv and bluebikes_tripdata_2020.csv

In [10]:
df_trip_2020 = extract_csv(spark, 'bluebikes_tripdata_2020.csv')
df_trip_2019 = extract_csv(spark, 'bluebikes_tripdata_2019.csv')

In [11]:
df_trip_2020.show(5)
df_trip_2019.show(5)

+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+-----------+----+-----+----------+------+
|tripduration|           starttime|            stoptime|start station id|  start station name|start station latitude|start station longitude|end station id|    end station name|end station latitude|end station longitude|bikeid|  usertype|postal code|year|month|birth year|gender|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+-----------+----+-----+----------+------+
|        1793|2020-11-01 00:00:...|2020-11-01 00:30:...|             186|Congress St at No...|               42.3481|              -71.03764|           186|Cong

##### 1. Check the column size and name

In [12]:
# show column count and column name
print("--- Column Count and Column Name ---")
print(f"Column in data user_type: {df_user_type.columns}, Count: {len(df_user_type.columns)}")
print(f"Column in data station: {df_station.columns}, Count: {len(df_station.columns)}")
print(f"Column in data bike: {df_bike.columns}, Count: {len(df_bike.columns)}")
print(f"Column in data trip_2020: {df_trip_2020.columns},  Count: {len(df_trip_2020.columns)}")
print(f"Column in data trip_2019: {df_trip_2019.columns}, Count: {len(df_trip_2019.columns)}")

# show data size
print("--- Data Size ---")
print(f"Data size user_type: {df_user_type.count()}")
print(f"Data size station: {df_station.count()}")
print(f"Data size bike: {df_bike.count()}")
print(f"Data size trip_2020: {df_trip_2020.count()}")
print(f"Data size trip_2019: {df_trip_2019.count()}")


--- Column Count and Column Name ---
Column in data user_type: ['user_type_id', 'user_type_name'], Count: 2
Column in data station: ['station_id', 'station_name', 'latitude', 'longitude'], Count: 4
Column in data bike: ['bike_id', 'type', 'model', 'purchase_date'], Count: 4
Column in data trip_2020: ['tripduration', 'starttime', 'stoptime', 'start station id', 'start station name', 'start station latitude', 'start station longitude', 'end station id', 'end station name', 'end station latitude', 'end station longitude', 'bikeid', 'usertype', 'postal code', 'year', 'month', 'birth year', 'gender'],  Count: 18
Column in data trip_2019: ['tripduration', 'starttime', 'stoptime', 'start station id', 'start station name', 'start station latitude', 'start station longitude', 'end station id', 'end station name', 'end station latitude', 'end station longitude', 'bikeid', 'usertype', 'birth year', 'gender', 'year', 'month'], Count: 17
--- Data Size ---
Data size user_type: 2
Data size station: 3

- We observed that the data for trips in 2020 and 2019 have different column counts. The 2020 data includes a `postal code` column, which is absent in the 2019 data.
- The user data available for trips is limited to 'usertype', 'birth year', and 'gender'. The database does not contain additional user information.

Next Steps: **Determine Data Types for Each Column**

##### 2. Check the data size

In [13]:
# get data type of each column
print("--- Data Type For Each Column---")
print(f"Data type for each column in user_type: {df_user_type.dtypes}")
print(f"Data type for each column in station: {df_station.dtypes}")
print(f"Data type for each column in bike: {df_bike.dtypes}")
print(f"Data type for each column in trip_2020: {df_trip_2020.dtypes}")
print(f"Data type for each column in trip_2019: {df_trip_2019.dtypes}")


--- Data Type For Each Column---
Data type for each column in user_type: [('user_type_id', 'bigint'), ('user_type_name', 'string')]
Data type for each column in station: [('station_id', 'bigint'), ('station_name', 'string'), ('latitude', 'double'), ('longitude', 'double')]
Data type for each column in bike: [('bike_id', 'bigint'), ('type', 'string'), ('model', 'string'), ('purchase_date', 'date')]
Data type for each column in trip_2020: [('tripduration', 'string'), ('starttime', 'string'), ('stoptime', 'string'), ('start station id', 'string'), ('start station name', 'string'), ('start station latitude', 'string'), ('start station longitude', 'string'), ('end station id', 'string'), ('end station name', 'string'), ('end station latitude', 'string'), ('end station longitude', 'string'), ('bikeid', 'string'), ('usertype', 'string'), ('postal code', 'string'), ('year', 'string'), ('month', 'string'), ('birth year', 'string'), ('gender', 'string')]
Data type for each column in trip_2019: [

- We observed that the data extracted from the CSV files is all of type string, even though some columns should be numeric based on the data snippet.

Next: `Check Percentage of Missing Values for Each Column`

##### 3. Check Percentage of Missing Values for each column

In [14]:
# Check Percentage of Missing Values for each column with pyspark
# output column_a : 100.0, column_b : 0.0, column_c : 0.0
from pyspark.sql.functions import col, count, when

def missing_value(df):
    total_count = df.count()

    # Calculate the percentage of missing values for each column
    # use when function to check if the value is null then 1 otherwise None
    missing = df.select([
        (count(when(col(c).isNull(), c)) / total_count * 100).alias(c) 
        for c in df.columns
    ]).collect()[0].asDict()

    return missing

In [15]:
print("--- Missing Value ---")
print("Data user_type")
print(missing_value(df_user_type))
print("Data station")
print(missing_value(df_station))
print("Data bike")
print(missing_value(df_bike))
print("Data trip_2020")
print(missing_value(df_trip_2020))
print("Data trip_2019")
print(missing_value(df_trip_2019))

--- Missing Value ---
Data user_type
{'user_type_id': 0.0, 'user_type_name': 0.0}
Data station
{'station_id': 0.0, 'station_name': 0.0, 'latitude': 0.0, 'longitude': 0.0}
Data bike
{'bike_id': 0.0, 'type': 0.0, 'model': 0.0, 'purchase_date': 0.0}
Data trip_2020
{'tripduration': 0.0, 'starttime': 0.0, 'stoptime': 0.0, 'start station id': 0.0, 'start station name': 0.0, 'start station latitude': 0.0, 'start station longitude': 0.0, 'end station id': 0.0, 'end station name': 0.0, 'end station latitude': 0.0, 'end station longitude': 0.0, 'bikeid': 0.0, 'usertype': 0.0, 'postal code': 27.873170868330526, 'year': 0.0, 'month': 0.0, 'birth year': 79.1954371360867, 'gender': 79.1954371360867}
Data trip_2019
{'tripduration': 0.0, 'starttime': 0.0, 'stoptime': 0.0, 'start station id': 0.0, 'start station name': 0.0, 'start station latitude': 0.0, 'start station longitude': 0.0, 'end station id': 0.0, 'end station name': 0.0, 'end station latitude': 0.0, 'end station longitude': 0.0, 'bikeid': 0

- Missing values were identified in the following columns:
  - **Trip Data 2020**: `postal_code`, `birth_year`, and `gender`
- For the user information, only the `user_type` column is needed. Therefore, columns related to user information, other than `user_type`, will be removed.

Next: `Check Percentage of Valid Date Format`
- Bike: column purchase_date 
- check Check Percentage of Valid Datetime Format
- Data Trip 2020 & 2019: starttime, stoptime

##### 4. Check Percentage of Valid Date 

In [16]:
# check Check Percentage of Valid Date Format
# Table Bike: column purchase_date 
# check Check Percentage of Valid Datetime Format
# Table Data Trip 2020 & 2019: starttime, stoptime
from pyspark.sql.functions import to_date, to_timestamp

def valid_date_format(df, column_name):
    total_count = df.count()

    # Calculate the percentage of valid date format for each column
    # if data can be converted to date then it is valid
    valid_date = df.select([
        (count(when(col(column_name).isNotNull() & to_date(col(column_name), 'yyyy-MM-dd').isNotNull(), column_name)) / total_count  * 100).alias(column_name) 
    ]).collect()[0].asDict()

    return valid_date

def valid_datetime_format(df, column_name):
    total_count = df.count()

    # Calculate the percentage of valid datetime format for each column
    # if data can be converted to datetime then it is valid
    valid_datetime = df.select([
        (count(when(col(column_name).isNotNull() & to_timestamp(col(column_name), 'yyyy-MM-dd HH:mm:ss.SSSS').isNotNull(), column_name)) / total_count  * 100).alias(column_name) 
    ]).collect()[0].asDict()

    return valid_datetime

In [17]:
print("--- Valid Date Format ---")
print("Data bike")
print(valid_date_format(df_bike, "purchase_date"))
print("--- Valid Datetime Format ---")
print("Data trip_2019")
print(valid_datetime_format(df_trip_2019, "starttime"))
print(valid_datetime_format(df_trip_2019, "stoptime"))
print("Data trip_2020")
print(valid_datetime_format(df_trip_2020, "starttime"))
print(valid_datetime_format(df_trip_2020, "stoptime"))

--- Valid Date Format ---
Data bike
{'purchase_date': 100.0}
--- Valid Datetime Format ---
Data trip_2019
{'starttime': 100.0}
{'stoptime': 100.0}
Data trip_2020
{'starttime': 100.0}
{'stoptime': 100.0}


All date and datetime formats in the datasets are valid.

In [18]:
# Stop Profiling Session
spark.stop()

### Building Data Pipeline EL Source to Staging

Solution: 
1. Pattern: EL
    - `Data Extraction` involves retrieving data from various sources
    - `Data Loading` involves transferring this raw data into staging systems
2. Data Extraction:
    - `Sources`: Extract data from CSV files and the Bluebikes database.
    - `Techniques`: Use Full Incremental extraction for each source, as both sources lack new data markers (e.g., no `created_at` or `updated_at` columns).
3. Data Load:
    - `Staging`: Load raw data into a staging database (PostgreSQL) without transformation.
    - `Techniques`: Overwrite the data in staging; data will be overwritten every time the pipeline is run.
Untuk setiap proses akan disimpan dalam log process
4. Data Staging Schema:

<img src='https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-data-ingestion-spark/w7_staging_-_public.png' width="800"> <br>

In [19]:
import pyspark
from pyspark.sql import SparkSession

# create spark session
spark = SparkSession \
    .builder \
    .appName("Pipeline Staging") \
    .getOrCreate()

#### Log System

Each process in the ETL pipeline will generate log information. The log message will follow this format:

``` python
log_msg = {
                "step" : "staging | warehouse",
                "process":"extraction | transformation | load",
                "status": "success | failed",
                "source": "db_bluebikes | csv | staging",
                "table_name": table_name,
                "etl_date": Current timestamp
            }
```

In utils.py, create function for load log message to database log

In [20]:
from src.utils.helper import stg_engine, log_engine, wh_engine
from pyspark.sql import SparkSession


def load_log(spark: SparkSession, log_msg):
    DB_URL, DB_USER, DB_PASS = log_engine()
    table_name = "etl_log"

    # set config
    connection_properties = {
        "user": DB_USER,
        "password": DB_PASS,
        "driver": "org.postgresql.Driver" # set driver postgres
    }

    log_msg.write.jdbc(url = DB_URL,
                  table = table_name,
                  mode = "append",
                  properties = connection_properties)

In [21]:
# example of log message
from datetime import datetime
from src.utils.helper import load_log

current_timestamp = datetime.now()

log_msg = spark.sparkContext\
            .parallelize([("staging", "extraction", "success", "db_bluebikes", "user_type", current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])

In [22]:
# load log
load_log(spark, log_msg)

#### Extarct Data From Database bluebikes

**Steps:**

1. **Create the `extract_database` Module:**
   - Develop a module named `extract_database` within the `src/staging/extract` folder. This module will include functions to connect to the Bluebikes database and extract data from specific tables.

2. **Get Database Connection URL:**
   - Retrieve the connection URL for the Bluebikes database. This URL is necessary for connecting to the database and performing data extraction. It usually includes the hostname, port, database name, and authentication details.

3. **Extract Data from Specific Table:**
   - Implement a function within the `extract_database` module to extract data from the specific table(s) in the Bluebikes database.

4. **Save Log Info to Database Log:**
   - After extracting data, record the log information in a logging database. 

In [23]:
# Function Extarct with log
from src.utils.helper import load_log, bluebikes_engine
from datetime import datetime
from pyspark.sql import SparkSession

def extract_database(spark: SparkSession, table_name):
    # get config
    DB_URL, DB_USER, DB_PASS = bluebikes_engine()

    # set config
    connection_properties = {
        "user": DB_USER,
        "password": DB_PASS,
        "driver": "org.postgresql.Driver" # set driver postgres
    }

    current_timestamp = datetime.now()
    
    try:
        # read data
        df = spark \
                .read \
                .jdbc(url = DB_URL,
                        table = table_name,
                        properties = connection_properties)
    
        # log message
        log_msg = spark.sparkContext\
            .parallelize([("staging", "extraction", "success", "db_bluebikes", table_name, current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
        return df
    except Exception as e:
        print(e)

        # log message
        log_msg = spark.sparkContext\
            .parallelize([("staging", "extraction", "failed", "db_bluebikes", table_name, current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
    finally:
        # load log
        load_log(spark, log_msg)

In [24]:
from src.staging.extract.extract_database import extract_database

# Extract data from database
df_user_type = extract_database(spark, 'user_type')
df_station = extract_database(spark, 'station')
df_bike = extract_database(spark, 'bike')


In [25]:
df_user_type.show()

+------------+--------------+
|user_type_id|user_type_name|
+------------+--------------+
|           0|    Subscriber|
|           1|      Customer|
+------------+--------------+



In [26]:
df_station.show(5)

+----------+--------------------+------------------+------------------+
|station_id|        station_name|          latitude|         longitude|
+----------+--------------------+------------------+------------------+
|         1|18 Dorrance Wareh...|         42.387151|        -71.075978|
|         3|Colleges of the F...| 42.34011512249237|-71.10061883926393|
|         4|Tremont St at E B...|         42.345392|        -71.069616|
|         5|Northeastern Univ...|         42.341814|        -71.090179|
|         6|Cambridge St at J...|42.361211653079856|-71.06530619789737|
+----------+--------------------+------------------+------------------+
only showing top 5 rows



In [27]:
df_bike.show(5)

+-------+--------+------+-------------+
|bike_id|    type| model|purchase_date|
+-------+--------+------+-------------+
|      1|    road|R88W4N|   2020-12-02|
|      4|    road|G79YM9|   2022-10-08|
|      7|foldable|IOEZRL|   2024-07-07|
|      8|foldable|ADQ2DA|   2021-03-26|
|      9|foldable|8EEQ8Z|   2023-03-04|
+-------+--------+------+-------------+
only showing top 5 rows



#### Extarct Data From CSV file

**Steps:**

1. **Create the `extract_csv` Module:**
   - Develop a module named `extract_csv` within the `src/staging/extract` folder. This module will include functions to read data from CSV files.

2. **Extract Data from Specific File:**
   - Implement a function within the `extract_csv` module to read data from the specified CSV file.

3. **Save Log Info to Database Log:**
   - After extracting data, record the log information in a logging database. 

In [28]:
from pyspark.sql import SparkSession
from src.utils.helper import load_log
from datetime import datetime

PATH = "data/"

def extract_csv(spark: SparkSession, file_name):

    current_timestamp = datetime.now()

    try:

        df = spark.read.csv(PATH + file_name, header=True)

        # log message
        log_msg = spark.sparkContext\
            .parallelize([("staging", "extraction", "success", "csv", file_name, current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
        return df
    except Exception as e:
        print(e)

        # log message
        log_msg = spark.sparkContext\
            .parallelize([("staging", "extraction", "failed", "csv", file_name, current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
    finally:
        # load log
        load_log(spark, log_msg)

In [29]:
from src.staging.extract.extract_csv import extract_csv

# Extract data from csv
df_trip_2020 = extract_csv(spark, 'bluebikes_tripdata_2020.csv')
df_trip_2019 = extract_csv(spark, 'bluebikes_tripdata_2019.csv')

In [30]:
df_trip_2019.show(5)

+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+----+-----+
|tripduration|           starttime|            stoptime|start station id|  start station name|start station latitude|start station longitude|end station id|    end station name|end station latitude|end station longitude|bikeid|  usertype|birth year|gender|year|month|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+----+-----+
|         790|2019-12-01 00:01:...|2019-12-01 00:14:...|             370|Dartmouth St at N...|     42.35096144421219|     -71.07782810926437|            33|      Kenmore Square|           42.34870

In [31]:
df_trip_2020.show(5)

+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+-----------+----+-----+----------+------+
|tripduration|           starttime|            stoptime|start station id|  start station name|start station latitude|start station longitude|end station id|    end station name|end station latitude|end station longitude|bikeid|  usertype|postal code|year|month|birth year|gender|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+-----------+----+-----+----------+------+
|        1793|2020-11-01 00:00:...|2020-11-01 00:30:...|             186|Congress St at No...|               42.3481|              -71.03764|           186|Cong

#### Load Data to Satging Area

*Steps:**

1. **Create the `load_staging` Module:**
   - Develop a module named `load_staging` within the `src/staging/load` folder. This module will include functions to load data into the staging area.

2. **Load Data to Specific Table:**
   - Implement a function within the `load_staging` module to load data into the specified table in the staging area. Implement `overwrite`

3. **Save Log Info to Database Log:**
   - After loading data, record the log information in a logging database.

In [32]:
from src.utils.helper import load_log, stg_engine  
from datetime import datetime
from pyspark.sql import SparkSession

def load_staging(spark: SparkSession, df, table_name, source_name):
    current_timestamp = datetime.now()
    DB_URL, DB_USER, DB_PASS = stg_engine()
    properties = {
    "user": DB_USER,
    "password": DB_PASS
    }
    try:
        df.write.jdbc(url = DB_URL,
                    table = table_name,
                    mode = "overwrite",
                    properties = properties)
        
        #log message
        log_msg = spark.sparkContext\
            .parallelize([("staging", "load", "success", source_name, table_name, current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
    except Exception as e:
        print(e)
        
        # log message
        log_msg = spark.sparkContext\
            .parallelize([("staging", "load", "success", source_name, table_name, current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
    finally:
        load_log(spark, log_msg)

In [None]:
# Load data to staging
from src.staging.load.load_staging import load_staging
from pyspark.sql.functions import current_timestamp

# add column created_at 
df_user_type = df_user_type.withColumn("created_at", current_timestamp())
df_station = df_station.withColumn("created_at", current_timestamp())
df_bike = df_bike.withColumn("created_at", current_timestamp())
df_trip_2019 = df_trip_2019.withColumn("created_at", current_timestamp())
df_trip_2020 = df_trip_2020.withColumn("created_at", current_timestamp())


load_staging(spark, df_user_type, "user_type", "db_bluebikes")
load_staging(spark, df_station, "station", "db_bluebikes")
load_staging(spark, df_bike, "bike", "db_bluebikes")
load_staging(spark, df_trip_2019, "trip_data_2019", "csv")
load_staging(spark, df_trip_2020, "trip_data_2020", "csv")

In [34]:
# Stop Profiling Session
spark.stop()

`Create Views for Combined and Filtered Data`
To simplify the transformation process and meet business requirements, we will create views that combine data from multiple files and apply necessary filters. This approach helps streamline the data transformation steps by consolidating relevant data and applying filters at the view level.
- usertype = "Subscriber"
- tripduration > 500 seconds

``` sql
CREATE VIEW combined_trip_data AS
SELECT 
    tripduration,
    starttime,
    stoptime,
    "start station id",
    "start station name",
    "start station latitude",
    "start station longitude",
    "end station id",
    "end station name",
    "end station latitude",
    "end station longitude",
    bikeid,
    usertype,
    "birth year",
    gender,
    NULL AS "postal code",  -- Adding NULL for missing column
    "year",
    "month",
    created_at
FROM trip_data_2019
WHERE usertype = 'Subscriber' AND tripduration::int > 500

UNION

SELECT 
    tripduration,
    starttime,
    stoptime,
    "start station id",
    "start station name",
    "start station latitude",
    "start station longitude",
    "end station id",
    "end station name",
    "end station latitude",
    "end station longitude",
    bikeid,
    usertype,
    "birth year",
    gender,
    "postal code",  -- Including the actual column
    "year",
    "month",
    created_at
FROM trip_data_2020
WHERE usertype = 'Subscriber' AND tripduration::int > 500;
```

### Building Data Pipeline EL Staging to Warehouse

Solution: 
1. Pattern: ETL
    - `Data Extraction` involves retrieving data from staging area
    - `Data Transformation` involve transformed data to fit the desired format or data warehouse structure.
    - `Data Loading` involves transferring this staging data into data warehouse
2. Data Extraction:
    - Sources: Extract data from staging area.
    - Techniques: Full Ingestion is used to extract the complete dataset from the staging area.
3. Data Load:
    - Data Warehouse: Load clean, transformed and valid data to the final destination.
    - Techniques: Use the `TRUNCATE TABLE` statement to clear existing data, followed by `append` operations to load the new data.

4. Data Transformation:
    - Transformation: Adjust data to fit the desired format or structure of the data warehouse.
    - Techniques: Joining, Filtering, Aggregation, Deduplication, Conversion, Structuring, etc


##### Target Schema


In the data warehouse, the target schema will be designed using a `dimensional model`
Dimension Table:
- dim_date
- dim_time
- dim_bike
- dim_station
- dim_user_type

Fact Table
- fact_trip_data
- fact_bike_usage


<img src= 'https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-data-ingestion-spark/w7_warehouse_-_public.png' width="800"> <br>


##### Source to Target Mapping


Source to Target Mapping Documentation: [Link](https://github.com/Kurikulum-Sekolah-Pacmann/pipeline-clinic/blob/main/target_mapping_warehouse.md)

##### Validation Rule

Validation Rule:
- Trip data that will enter the warehouse is data with the user "Subscriber" and "trip duration" which is more than 500 seconds

In [35]:
import pyspark
from pyspark.sql import SparkSession

# # create spark session
spark = SparkSession \
    .builder \
    .appName("Pipeline Warehouse") \
    .getOrCreate()


##### Extract Data From Staging Area

**Steps:**

1. **Create the `extract_staging` Module:**
   - Develop a module named `extract_staging` within the `src/warehouse/extract` folder. This module will include functions to connect to the staging database and extract data from specific tables.

2. **Get Database Connection URL:**
   - Retrieve the connection URL for the staging database. This URL is necessary for connecting to the staging database and performing data extraction

3. **Extract Data from Specific Table:**
   - Implement a function within the `extract_staging` module to extract data from the specific table(s) in the staging database.

4. **Save Log Info to Database Log:**
   - After extracting data, record the log information in a logging database.

In [36]:
# Function Extarct with log
from src.utils.helper import load_log, stg_engine
from datetime import datetime
from pyspark.sql import SparkSession

def extract_staging(spark: SparkSession, table_name):
    # get config
    DB_URL, DB_USER, DB_PASS = stg_engine()

    # set config
    connection_properties = {
        "user": DB_USER,
        "password": DB_PASS,
        "driver": "org.postgresql.Driver" # set driver postgres
    }

    current_timestamp = datetime.now()
    
    try:
        # read data
        df = spark \
                .read \
                .jdbc(url = DB_URL,
                        table = table_name,
                        properties = connection_properties)
    
        # log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "extraction", "success", "staging", table_name, current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
        return df
    except Exception as e:
        print(e)

        # log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "extraction", "failed", "staging", table_name, current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
    finally:
        # load log
        load_log(spark, log_msg)

In [37]:
# Extract data from staging
from src.warehouse.extract.extract_staging import extract_staging

# Extarc Data fro Dmension Table
df_user_type = extract_staging(spark, 'user_type')
df_station = extract_staging(spark, 'station')
df_bike = extract_staging(spark, 'bike')


##### Load Data to Warehouse

**Steps:**

1. **Create the `load_warehouse` Module:**
   - Develop a module named `load_warehouse` within the `src/warehouse/load` folder. This module will include functions for connecting to the data warehouse, truncating tables, and loading data.

2. **Get Database Connection URL:**
   - Retrieve the connection URL for the data warehouse. 

3. **Truncate Target Table:**
   - Implement functionality within the `load_warehouse` module to truncate the target table in the data warehouse. Use the `TRUNCATE TABLE` SQL statement to clear existing data from the table.

4. **Load Data to Data Warehouse:**
   - Use the `Append` operation from pyspark to load the transformed data into the target table in the data warehouse

5. **Save Log Info to Database Log:**
   - After loading the data, record the log information in a logging database.

In [38]:
from src.utils.helper import load_log, wh_engine, wh_engine_sqlalchemy
from datetime import datetime
from pyspark.sql import SparkSession
from sqlalchemy import create_engine, text



# before load to warehouse, truncate the table with sqlalchemy
def load_warehouse(spark: SparkSession, df, table_name, source_name):
    current_timestamp = datetime.now()
    DB_URL, DB_USER, DB_PASS = wh_engine()
    properties = {
    "user": DB_USER,
    "password": DB_PASS
    }
    try:
        # truncate table with sqlalchemy
        conn = wh_engine_sqlalchemy()

        with conn.connect() as connection:
            # Execute the TRUNCATE TABLE command
            connection.execute(text(f"TRUNCATE TABLE {table_name} RESTART IDENTITY CASCADE "))
            connection.commit()
            connection.close()
        conn.dispose()
    except Exception as e:
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "load", "failed", source_name, table_name, current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
        load_log(spark, log_msg)
    
    try:
        # load data
        df.write.jdbc(url = DB_URL,
                    table = table_name,
                    mode = "append",
                    properties = properties)
        
        #log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "load", "success", source_name, table_name, current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
    except Exception as e:
        # print(e)
        
        # log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "load", "failed", source_name, table_name, current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
        
    finally:
        load_log(spark, log_msg)

In [39]:
from src.warehouse.load.load_warehouse import load_warehouse

##### Transform Data

**Steps:**

1. **Create the `table_name.py` Module:**
   - Develop a module named `table_name.py` within the `src/warehouse/transform` folder. This module will include functions to handle transformations for specific tables in the data warehouse.

2. **Develop Transformation Modules:**
   - Create transformation functions for the tables that require data transformation. Implement these functions to perform necessary operations such as joining, filtering, aggregating, or converting data to fit the warehouse schema.

3. **Perform Data Transformation:**
   - Use the transformation functions to process data from specific tables. Apply the transformations to ensure that the data adheres to the desired format and structure of the data warehouse.

4. **Save Log Info to Database Log:**
   - After performing data transformations, record the log information in a logging database.

Transform Data user_type

Source to Target Mapping

| Source Table: `user_type`      | Target Table: `dim_user_type`   | Description                    |
|--------------------------------|---------------------------------|--------------------------------|
| `user_type_id` (int8)          | `user_type_nk` (int4)           | Rename                         |
| -                              | `user_type_id` (UUID)           | Default Value                     |
| `user_type_name` (text)        | `user_type_name` (varchar)      | Direct Mapping                 |
| -                              | `created_at` (timestamp)        | Default Value       |


In [40]:
from pyspark.sql import SparkSession
from src.utils.helper import load_log
from datetime import datetime

def transform_user_type(spark, df):

    current_timestamp = datetime.now()
    try:
        # rename column user_type_id to user_type_nk
        df = df.withColumnRenamed("user_type_id", "user_type_nk")

        # drop column created_at
        df = df.drop("created_at")

        #log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "transform", "success", "staging", "user_type", current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
        return df
    except Exception as e:
        print(e)

        # log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "transform", "failed", "staging", "user_type", current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])

    finally:
        # load log
        print(log_msg.show())
        # load_log(spark, log_msg)

In [41]:
from src.warehouse.transformation.user_type import transform_user_type

user_type_transformed = transform_user_type(spark, df_user_type)

In [42]:
user_type_transformed.show()

+------------+--------------+
|user_type_nk|user_type_name|
+------------+--------------+
|           0|    Subscriber|
|           1|      Customer|
+------------+--------------+



In [43]:
# Load data to warehouse
load_warehouse(spark, user_type_transformed, "dim_user_type", 'staging')

Transform Data Bike

Source to Target Mapping

| Source Table: `bike`          | Target Table: `dim_bike`       | Description                    |
|-------------------------------|--------------------------------|--------------------------------|
| `bike_id` (int8)              | `bike_nk` (int4)               | Rename                         |
|   -                            | `bike_id` (UUID)               | default value                   |
| `type` (text)                 | `type` (varchar)               | Direct Mapping                 |
| `model` (text)                | `model` (varchar)              | Direct Mapping                 |
| `purchase_date` (date)        | `purchase_date` (date)         | Direct Mapping                 |
| -    | `created_at` (timestamp)       |  default value  |


In [44]:
from pyspark.sql import SparkSession
from src.utils.helper import load_log
from datetime import datetime

def transform_bike(spark, df):
    current_timestamp = datetime.now()
    try:
        # rename column bike_id to bike_nk
        df = df.withColumnRenamed("bike_id", "bike_nk")

        # drop column created_at
        df = df.drop("created_at")

        #log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "transform", "success", "staging", "bike", current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
        return df
    except Exception as e:
        print(e)

        # log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "transform", "failed", "staging", "bike", current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
    finally:
        # load log
        load_log(spark, log_msg)

In [45]:
# Transform data bike
from src.warehouse.transformation.bike import transform_bike

bike_transformed = transform_bike(spark, df_bike)

In [46]:
bike_transformed.show(5)

+-------+--------+------+-------------+
|bike_nk|    type| model|purchase_date|
+-------+--------+------+-------------+
|      1|    road|R88W4N|   2020-12-02|
|      4|    road|G79YM9|   2022-10-08|
|      7|foldable|IOEZRL|   2024-07-07|
|      8|foldable|ADQ2DA|   2021-03-26|
|      9|foldable|8EEQ8Z|   2023-03-04|
+-------+--------+------+-------------+
only showing top 5 rows



In [47]:
# Load data to warehouse
load_warehouse(spark, bike_transformed, "dim_bike", 'staging')

Transform Data Station

Source to Target Mapping

| Source Table: `station`   | Target Table: `dim_station`   |Description                 |
|---------------------------|-------------------------------|-------------------------------|
| `station_id` (int8)       | `station_nk` (int4)           | Rename from `station_id` to `station_nk` |
|                           | `station_id` (UUID)           |Default Value |
| `station_name` (text)     | `station_name` (varchar)      | Direct Mapping                   |
| `latitude` (float8)       | `latitude` (numeric)          | Direct Mapping |
| `longitude` (float8)      | `longitude` (numeric)         | Direct Mapping |
| `created_at` (timestamp)  | `created_at` (timestamp)      | Default Value |


In [48]:
from pyspark.sql import SparkSession
from src.utils.helper import load_log
from datetime import datetime

def transform_station(spark, df):
    current_timestamp = datetime.now()

    try:
        # rename column station_id to station_nk
        df = df.withColumnRenamed("station_id", "station_nk")

        # drop column created_at
        df = df.drop("created_at")
      
        #log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "transform", "success", "staging", "station", current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
        return df
    except Exception as e:
        print(e)

        # log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "transform", "failed", "staging", "station", current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
    finally:
        # load log
        load_log(spark, log_msg)

In [49]:
from src.warehouse.transformation.station import transform_station

station_transformed = transform_station(spark, df_station)

In [50]:
station_transformed.show(5)

+----------+--------------------+------------------+------------------+
|station_nk|        station_name|          latitude|         longitude|
+----------+--------------------+------------------+------------------+
|         1|18 Dorrance Wareh...|         42.387151|        -71.075978|
|         3|Colleges of the F...| 42.34011512249237|-71.10061883926393|
|         4|Tremont St at E B...|         42.345392|        -71.069616|
|         5|Northeastern Univ...|         42.341814|        -71.090179|
|         6|Cambridge St at J...|42.361211653079856|-71.06530619789737|
+----------+--------------------+------------------+------------------+
only showing top 5 rows



In [51]:
# Load data to warehouse
load_warehouse(spark, station_transformed, "dim_station", 'staging')

Extarct Data for Fact Table

Extract data from view : `combined_trip_data`


In [52]:
# Extarct data for Fact Table
df_trip = extract_staging(spark, 'combined_trip_data')

Transfrom data fact_trip_data

Source to Target Mapping

| Source Tables: `trip_data_2019`, `trip_data_2020` | Target Table: `fact_trip_data` | **Description**                              |
|--------------------------------------------------|-------------------------------|-------------------------------                 |
| `tripduration` (text)                             | `trip_duration` (int4)        | Data Type Conversion                          |
| `starttime` (text)                                | `start_date` (int4)           | Convert Date Part Lookup to `dim_date`        |
| `starttime` (text)                                | `start_time` (int4)           | Convert Time Part Lookup to `dim_time`        |
| `stoptime` (text)                                 | `stop_date` (int4)            | Convert Date Part Lookup to `dim_date`        |
| `stoptime` (text)                                 | `stop_time` (int4)            | Convert Time Part Lookup to `dim_time`        |
| `"start station id"` (text)                       | `start_station_id` (uuid)     | Lookup to `dim_station`, Data Type Conversion |
| `"end station id"` (text)                         | `end_station_id` (uuid)       | Lookup to `dim_station`, Data Type Conversion |
| `"start station name"` (text)                     | -                             | Not Mapped |
| `"start station latitude"` (text)                 | -                             | Not Mapped |
| `"start station longitude"` (text)                | -                             | Not Mapped |
| `"end station name"` (text)                       | -                             | Not Mapped |
| `"end station latitude"` (text)                   | -                             | Not Mapped |
| `"end station longitude"` (text)                  | -                             | Not Mapped |
| `bikeid` (text)                                   | `bike_id` (uuid)              | Lookup to `dim_bike`, Data Type Conversion    |
| `usertype` (text)                                 | `user_type_id` (uuid)         | Lookup to `dim_user_type`, Data Type Conversion |
| `year` (text)                                     | `year` (varchar)              | Direct Mapping                                |
| `month` (text)                                    | `month` (varchar)             | Direct Mapping                                |
| `postal code` (text)                              | -                             | Not Mapped                                    |
| `birth year` (text)                               | -                             | Not Mapped                                    |
| `gender` (text)                                   | -                             | Not Mapped                                    |
| `created_at` (timestamp)                          | -                             | Default Value                                 |
| -                                                 | `trip_id` (uuid)              | Generated UUID                                |


`Create Module to Extract Data from Dimension Tables`

Use the `extract_warehouse.py` functions to extract data from dimension tables. Ensure that the data extracted is accurate and complete, as it will be used to establish foreign key relationships with fact tables.

In [53]:
# Function Extarct with log
from src.utils.helper import wh_engine
from pyspark.sql import SparkSession

def extract_warehouse(spark: SparkSession, table_name):
    # get config
    DB_URL, DB_USER, DB_PASS = wh_engine()

    # set config
    connection_properties = {
        "user": DB_USER,
        "password": DB_PASS,
        "driver": "org.postgresql.Driver" # set driver postgres
    }
    
    try:
        # read data
        df = spark \
                .read \
                .jdbc(url = DB_URL,
                        table = table_name,
                        properties = connection_properties)
        return df
    except Exception as e:
        print(e)

In [54]:
from pyspark.sql import SparkSession
from src.utils.helper import load_log
from datetime import datetime
from pyspark.sql.functions import date_format, col
from src.warehouse.extract.extract_warehouse import extract_warehouse

def transform_fact_trip(spark: SparkSession, df):
    current_timestamp = datetime.now()

    try:
        # Extarct data dimension table
        df_user_type = extract_warehouse(spark, "dim_user_type", ["user_type_id", "user_type_name"])
        df_station = extract_warehouse(spark, "dim_station", ["station_id", "station_nk"])
        df_bike = extract_warehouse(spark, "dim_bike", ["bike_id", "bike_nk"])
        df_date = extract_warehouse(spark, "dim_date", ["date_id", "date_actual"])
        df_time = extract_warehouse(spark, "dim_time", ["time_id", "time_actual"])

        # convert df_time column time_actual to HH:mm:ss
        df_time = df_time.withColumn("time_actual", date_format(col("time_actual"),  "HH:mm:ss"))

        #convert tripduration to int 
        df = df.withColumn("tripduration", df["tripduration"].cast("int"))

        # rename column
        df = df.withColumnRenamed("tripduration", "trip_duration") \
                .withColumnRenamed("start station id", "start_station_nk") \
                .withColumnRenamed("end station id", "end_station_nk") \
                .withColumnRenamed("bikeid", "bike_nk") \
                .withColumnRenamed("usertype", "user_type_name")
    
        
        # extract date and time from starttime and stoptime
        df = df.withColumn("start_date_temp", date_format(col("starttime"),  "yyyy-MM-dd")) \
                .withColumn("start_time_temp", date_format(col("starttime"),  "HH:mm:00")) \
                .withColumn("stop_date_temp", date_format(col("stoptime"),  "yyyy-MM-dd")) \
                .withColumn("stop_time_temp", date_format(col("stoptime"),  "HH:mm:00"))
        
        # get bike_id from dim_bike
        df = df.join(df_bike, df.bike_nk == df_bike.bike_nk, "left") \
                .drop(df_bike.bike_nk)
        
        # get user_type_id from dim_user_type
        df = df.join(df_user_type, df.user_type_name == df_user_type.user_type_name, "left") \
                .drop(df_user_type.user_type_name)
        
        # get start_station_id from dim_station
        df = df.join(df_station, df.start_station_nk == df_station.station_nk, "left") \
                .drop(df_station.station_nk)
        # rename column station_id to start_station_id
        df = df.withColumnRenamed("station_id", "start_station_id")

        # get end_station_id from dim_station
        df = df.join(df_station, df.end_station_nk == df_station.station_nk, "left") \
                .drop(df_station.station_nk)
        # rename column station_id to end_station_id
        df = df.withColumnRenamed("station_id", "end_station_id")

        # get date_id from dim_date
        df = df.join(df_date, df.start_date_temp == df_date.date_actual, "left") \
                .drop(df_date.date_actual)
        # rename column date_id to start_date
        df = df.withColumnRenamed("date_id", "start_date")

        # get date_id from dim_date
        df = df.join(df_date, df.stop_date_temp == df_date.date_actual, "left") \
                .drop(df_date.date_actual)
        # rename column date_id to stop_date
        df = df.withColumnRenamed("date_id", "stop_date")

        # get time_id from dim_time
        df = df.join(df_time, df.start_time_temp == df_time.time_actual, "left") \
                .drop(df_time.time_actual)
        # rename column time_id to start_time
        df = df.withColumnRenamed("time_id", "start_time")

        # get time_id from dim_time
        df = df.join(df_time, df.stop_time_temp == df_time.time_actual, "left") \
                .drop(df_time.time_actual)
        # rename column time_id to stop_time
        df = df.withColumnRenamed("time_id", "stop_time")

        
        # drop unnecessary column created_at, postal code, birth year, start station name, end station name, 
        # start station latitude, start station longitude, end station latitude, end station longitude,
        # bike_nk, user_type_name, start_station_nk, end_station_nk
        if "postal code" in df.columns:
            df = df.drop("postal code")

        df = df.drop("created_at", "birth year", "start station name", "end station name", "gender",
                    "start station latitude", "start station longitude", "end station latitude", "end station longitude",
                    "bike_nk", "user_type_name", "start_station_nk", "end_station_nk", "starttime","stoptime", "station_nk",
                    "start_date_temp","stop_date_temp", "start_time_temp","stop_time_temp", "date_actual", "time_actual")
        # log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "transform", "success", "staging", "trip_data", current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
        return df
        
    except Exception as e:
        print(e)

        # log message
        log_msg = spark.sparkContext\
            .parallelize([("warehouse", "transform", "failed", "staging", "trip_data", current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
    finally:
        # load log
        load_log(spark, log_msg)

In [55]:
from src.warehouse.transformation.fact_trip import transform_fact_trip

In [56]:
trip_transformed = transform_fact_trip(spark, df_trip)

In [57]:
# show column name
trip_transformed.dtypes

[('trip_duration', 'int'),
 ('year', 'string'),
 ('month', 'string'),
 ('bike_id', 'bigint'),
 ('user_type_id', 'bigint'),
 ('start_station_id', 'bigint'),
 ('end_station_id', 'bigint'),
 ('start_date', 'int'),
 ('stop_date', 'int'),
 ('start_time', 'int'),
 ('stop_time', 'int')]

In [58]:
# Load data to warehouse
load_warehouse(spark, trip_transformed, "fact_trip_data", 'staging')

Transform data fact_bike_usage

`Create fact_bike_usage from Aggregated `fact_trip_data`

**Steps:**
1. **Extract Data from `fact_trip_data`:**
   - Use the `extract_staging` module to retrieve data from the `fact_trip_data` table in the staging area. This data will be used as the basis for aggregation.

2. **Group by `bike_id`:**
   - Apply a `GROUP BY` operation on the `bike_id` column to aggregate the data based on each bike.

3. **Count `trip_id`:**
   - For each `bike_id`, count the number of trips (`trip_id`) to determine the total number of trips made by each bike.

4. **Sum `trip_duration`:**
   - Calculate the total trip duration by summing the `trip_duration` for each `bike_id`. This will provide the total time spent on trips for each bike.

In [59]:
from pyspark.sql import SparkSession
from src.utils.helper import load_log
from datetime import datetime
from pyspark.sql.functions import date_format, col
from src.warehouse.extract.extract_warehouse import extract_warehouse

def transform_fact_bike_usage(spark: SparkSession):
        current_timestamp = datetime.now()

        try:
                # Extarct data fact_trip_data
                columns = ['trip_duration', 'trip_id', 'bike_id']

                df_trip = extract_warehouse(spark, "fact_trip_data", columns)

                # group by bike_id and count trip_id, sum trip_duration
                df_bike_usage = df_trip.groupBy("bike_id") \
                                .agg({"trip_duration": "sum", "trip_id": "count"}) \
                                .withColumnRenamed("sum(trip_duration)", "total_duration") \
                                .withColumnRenamed("count(trip_id)", "trip_count")
                
                #log message
                log_msg = spark.sparkContext\
                .parallelize([("warehouse", "transform", "success", "staging", "fact_bike_usage", current_timestamp)])\
                .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])

                return df_bike_usage
        except Exception as e:
                #log message
                print(e)
                log_msg = spark.sparkContext\
                .parallelize([("warehouse", "transform", "success", "staging", "fact_bike_usage", current_timestamp, str(e))])\
                .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])

In [60]:
from src.warehouse.transformation.fact_bike_usage import transform_fact_bike_usage
fact_bike_usage = transform_fact_bike_usage(spark)

In [61]:
fact_bike_usage.columns

['bike_id', 'trip_count', 'total_duration']

In [62]:
# Load data to warehouse
load_warehouse(spark, fact_bike_usage, "fact_bike_usage", 'staging')