# Week #3 - Live Class
Data Pipeline Course - Sekolah Engineer - Pacmann Academy 



## Review

`Data Ingestion` emphasizes pulling data from various sources (`Extract`) and directing it to targets (`Load`).
- `Data Extraction` involves retrieving data from various sources
- `Data Loading` involves transferring this data into target storage systems

Consideration when carrying out Ingestion
- What’s the format? (Unstructured, Semi-structured, Structured)
- What’s the frequency? (Batch, Micro-Batch, Streaming)
- Is the data ingestion process synchronous or asynchronous?
- What is the expected data volume?
- What measures are in place to ensure data reliability during ingestion?


- Full Extraction: Extracting all the data from the source system
- Full Load: Load all the data to destination system
- Incremental Extraction: Extracting only the new data or data that has changed since the last extraction
- Incremental Load: Load only new or updated data at regular intervals, rather than moving all data at once

<img src='pict/live_w3_01.png' width="800"> <br>

## Study Case: Dell DVD Store

### Case Description

`Problem`

The Dell DVD Store is facing challenges with its current data processing. The store needs to handle data from multiple sources such as spreadsheets, databases, and APIs. The key challenges include:
- Database: The Dell DVD Store saves data from current system.
- API: Retrieves data from the old system and contains historical data from the old system.
- Spreadsheet: Contains analysis results from the team about order status based on the current product stock.

<img src='pict/live_w3_03.png' width="800"> <br>

`Solution`

To address these challenges, we propose creating a comprehensive data pipeline for the Dell DVD Store. This pipeline will involve the following steps:
- Data Extraction:
Sources: Extract data from spreadsheets, databases, and APIs.
Techniques: Use both full and incremental extraction methods to retrieve data efficiently.
- Data Load:
Staging: Load raw data into a staging database (PostgreSQL) without transformation.
Final Load: Transfer clean and transformed data to the final destination.
Failure Handling: Log failed data loads to MinIO object storage for reprocessing
- Data Transformation:
Cleaning: Handle missing values, incorrect data formats, and other data quality issues.
Trasnforming: Add derived fields and calculated metrics as needed.

`Tools and Technologies`:
- Python: For build Data Pipeline
- PostgreSQL: For log, staging and final data storage.
- MinIO: For load failed data.
- Docker: For running MinIO

### 1. Preparation


Dataset
1. Restore Database Dell DVD Store [Link](https://drive.google.com/drive/folders/1ED0sg2AZNH_Kl5Pb1cBUufnPCphpM21R)
2. Dupplicate Spreadsheet [Link](https://drive.google.com/drive/folders/1ED0sg2AZNH_Kl5Pb1cBUufnPCphpM21R)
3. Check Data API [Link](https://api-history-order.vercel.app/api/dummydata?start_date=2004-01-01&end_date=2004-01-01)

For Porject
1. Save Your Credential Google Service Account
2. Prepare Your MiniO (Access Key, Secreet Key, Bucket Name: "error-dellstore")
3. Create Your Database (log and staging)
4. create your .env

    ```
    DB_HOST="localhost"
    DB_USER="YOUR POSTGRES USER"
    DB_PASS="YOUR POSGRES PASS"

    DB_NAME_SOURCE="dellstore"
    DB_NAME_STG="staging"
    DB_SHCHEMA_STG="staging"
    DB_NAME_log="etl_log"

    CRED_PATH='YOUR_PATH/creds/data-pipeline-427506-50d868a444ee.json'
    MODEL_PATH='YOUR_PATH/models/'
    KEY_SPREADSHEET="YOUR SPREADSHEET KEY"

    ACCESS_KEY_MINIO = 'YOUR MINIO ACCESS KEY'
    SECRET_KEY_MINIO = 'YOUR MINIO SECRET KEY'

    ```

In [1]:
from oauth2client.service_account import ServiceAccountCredentials
import gspread
import pandas as pd
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
import sqlalchemy
import requests
from pangres import upsert
from datetime import datetime, timedelta

load your file .env

In [2]:
load_dotenv(".env")

DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")

DB_NAME_SOURCE = os.getenv("DB_NAME_SOURCE")
DB_NAME_STG = os.getenv("DB_NAME_STG")
DB_SHCHEMA_STG = os.getenv("DB_SHCHEMA_STG")
DB_NAME_log = os.getenv("DB_NAME_log")


CRED_PATH = os.getenv("CRED_PATH")
KEY_SPREADSHEET = os.getenv("KEY_SPREADSHEET")
MODEL_PATH = os.getenv("MODEL_PATH")

ACCESS_KEY_MINIO = os.getenv("ACCESS_KEY_MINIO")
SECRET_KEY_MINIO = os.getenv("SECRET_KEY_MINIO")

To manage SQL queries efficiently using external `.sql` files, you can create a function that reads these files and returns their content.<br>
Each `.sql` file should contain the SQL query for the respective table.

In [3]:
def read_sql(table_name):
    #open your file .sql
    with open(f"{MODEL_PATH}{table_name}.sql", 'r') as file:
        content = file.read()
    
    #return query text
    return content

### 2. Log Function

A log is a record of events that occur during the execution of a data pipeline. It captures essential information about the processes and their status, making it easier to monitor, debug, and audit the pipeline operations 

The etl_log function is designed to log ETL (Extract, Transform, Load) operations into a PostgreSQL database.

Your Log Message: <br>
<code>
log_msg = { <br>
            "step": "staging | warehouse"
            "process" : "extraction | transformation | load", <br>
            "status": "success | failed", <br>
            "source": "spreadsheet | database | api", <br>
            "table_name": "worksheet_name | table_name", <br>
            "etl_date": "Current timestamp" <br>
            "error_msg": "Error Message when error occur" <br>
        }
</code>

In [4]:
def etl_log(log_msg: dict):
    try:
        # create connection to database
        conn = create_engine(f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME_log}")
        
        # convert dictionary to dataframe
        df_log = pd.DataFrame([log_msg])

        #extract data log
        df_log.to_sql(name = "etl_log",  # Your log table
                        con = conn,
                        if_exists = "append",
                        index = False)
    except Exception as e:
        print("Can't save your log message. Cause: ", str(e))

Fuction read_etl_log untuk membaca informasi log dari tabel log

In [5]:
def read_etl_log(filter_params: dict):
    """
    function read_etl_log that reads log information from the etl_log table and extracts the maximum etl_date for a specific process, step, table name, and status.
    """
    try:
        # create connection to database
        conn = create_engine(f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME_log}")
        
        # To help with the incremental process, get the etl_date from the relevant process
        """
        SELECT MAX(etl_date)
        FROM etl_log "
        WHERE 
            step = %s and
            table_name ilike %s and
            status = %s and
            process = %s
        """
        query = sqlalchemy.text(read_sql("log"))

        # Execute the query with pd.read_sql
        df = pd.read_sql(sql=query, con=conn, params=(filter_params,))

        #return extracted data
        return df
    except Exception as e:
        print("Can't execute your query. Cause: ", str(e))

### 3. Data Extraction

#### 1. Extract Data From Database

Extract Data From PostgreSQL

- Full Extraction: Initial Load
- Incremental Extraction: Get new and updated data

Function Steps:
1. Establish Database Connection: Connects to a PostgreSQL database named dellstore.
2. Read Log Data: Reads existing log data from log.csv to determine the last successful extraction timestamp (etl_date).
3. Initial Load or Incremental Extraction:
    - If no previous extraction has been recorded (etl_date is empty), set etl_date to '1111-01-01' indicating the initial load.
    - Otherwise, retrieve data added since the last successful extraction (etl_date).
4. Query Execution: Constructs a SQL query to select all columns from the specified table_name where created_at is greater than etl_date.
5. Data Extraction: Executes the SQL query using pd.read_sql to fetch the data into a Pandas DataFrame (df).

In [6]:
def extract_database(table_name: str): 
    
    try:
        # create connection to database
        conn = create_engine(f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME_SOURCE}")

        # Get date from previous process
        filter_log = {"step_name": "staging",
                    "table_name": table_name,
                    "status": "success",
                    "process": "load"}
        etl_date = read_etl_log(filter_log)


        # If no previous extraction has been recorded (etl_date is empty), set etl_date to '1111-01-01' indicating the initial load.
        # Otherwise, retrieve data added since the last successful extraction (etl_date).
        if(etl_date['max'][0] == None):
            etl_date = '1111-01-01'
        else:
            etl_date = etl_date

        # Constructs a SQL query to select all columns from the specified table_name where created_at is greater than etl_date.
        """
        SELECT * 
        FROM customers 
        WHERE created_at > :etl_date
        """
        query = sqlalchemy.text(read_sql(table_name))

        # Execute the query with pd.read_sql
        df = pd.read_sql(sql=query, con=conn, params=({"etl_date":etl_date},))
        log_msg = {
                "step" : "staging",
                "process":"extraction",
                "status": "success",
                "source": "database",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
        return df
    except Exception as e:
        log_msg = {
            "step" : "staging",
            "process":"extraction",
            "status": "failed",
            "source": "database",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
            "error_msg": str(e)
        }
    finally:
        etl_log(log_msg)

#### 2. Extract Data From API

Extract Data From API (context: Old data from previous system)

- Backfilling: Specify a date range in the date parameter to retrieve historical data

Function Steps: 
1. Establish API endpoint: [Link](https://api-history-order.vercel.app)
2. List of parameter API
    - start_date
    - end_date
5. Data Extraction: Hit the endpoint API to fetch the data into a Pandas DataFrame (df).

In [7]:
def extract_api(link_api:str, list_parameter:dict, data_name):
    try:
        # Establish connection to API
        resp = requests.get(link_api, params=list_parameter)

        # Parse the response JSON
        raw_response = resp.json()

        # Convert the JSON data to a pandas DataFrame
        df_api = pd.DataFrame(raw_response)

        # create success log message
        log_msg = {
                "step" : "staging",
                "process":"extraction",
                "status": "success",
                "source": "api",
                "table_name": data_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
        return df_api

    except requests.exceptions.RequestException as e:
        print(f"An error occurred while making the API request: {e}")

        # create fail log message
        log_msg = {
                "step" : "staging",
                "process":"extraction",
                "status": "failed",
                "source": "api",
                "table_name": data_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
                "error_msg": str(e)
            }
        return pd.DataFrame()
    

    except ValueError as e:
        print(f"An error occurred while parsing the response JSON: {e}")

        # create fail log message
        log_msg = {
                "step" : "staging",
                "process":"extraction",
                "status": "failed",
                "source": "api",
                "table_name": data_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
                "error_msg": str(e)
            }
        return pd.DataFrame()
    
    finally:
        etl_log(log_msg)

#### 3. Extract Data From Spreadsheet

Steps:
1. Define the credentials needed to access the spreadsheet.
2. Open file by key
3. Retrieve data from a specific sheet.
4. Perform Data Extraction 

This function aims to define the credentials needed to access the spreadsheet.

In [8]:
def auth_gspread():
    scope = ['https://spreadsheets.google.com/feeds',
             'https://www.googleapis.com/auth/drive']

    #Define your credentials
    credentials = ServiceAccountCredentials.from_json_keyfile_name(CRED_PATH, scope) # Your json file here

    gc = gspread.authorize(credentials)

    return gc

The key_file is a unique identifier for the Google Sheets file.
Access to the spreadsheet file is obtained using the key from that file.

In [9]:
def init_key_file(key_file:str):
    #define credentials to open the file
    gc = auth_gspread()
    
    #open spreadsheet file by key
    sheet_result = gc.open_by_key(key_file)
    
    return sheet_result

The extract_spreadsheet function is used to retrieve data from a specific sheet

In [10]:
def extract_sheet(key_file:str, worksheet_name: str) -> pd.DataFrame:
    # init sheet
    sheet_result = init_key_file(key_file)
    
    worksheet_result = sheet_result.worksheet(worksheet_name)
    
    df_result = pd.DataFrame(worksheet_result.get_all_values())
    
    # set first rows as columns
    df_result.columns = df_result.iloc[0]
    
    # get all the rest of the values
    df_result = df_result[1:].copy()
    
    return df_result

 Perform Data Extraction

In [11]:
def extract_spreadsheet(worksheet_name: str, key_file: str):

    try:
        # extract data
        df_data = extract_sheet(worksheet_name = worksheet_name,
                                    key_file = key_file)
        
        # success log message
        log_msg = {
                "step" : "staging",
                "process":"extraction",
                "status": "success",
                "source": "spreadsheet",
                "table_name": worksheet_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
    except Exception as e:
        # fail log message
        log_msg = {
                "step" : "staging",
                "process":"extraction",
                "status": "failed",
                "source": "spreadsheet",
                "table_name": worksheet_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
                "error_msg": str(e)
            }
    finally:
        # load log to csv file
        etl_log(log_msg)
        
    return df_data


### 4. Data Load

#### 1. Handle Failure Data

we will learn how to handle any data failures processing by storing the failed data in object storage using MinIO

In [12]:
#The Minio libray is used to interact with a MinIO server. 
from minio import Minio

# BytesIO provides a way to work with binary data in memory as if it were a file.
from io import BytesIO

Create Function handle_error to dump failure data to MiniO

In [13]:
def handle_error(data, bucket_name:str, table_name:str):

    current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    # Initialize MinIO client
    client = Minio('localhost:9000',
                access_key=ACCESS_KEY_MINIO,
                secret_key=SECRET_KEY_MINIO,
                secure=False)

    # Make a bucket if it doesn't exist
    if not client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)

    # Convert DataFrame to CSV and then to bytes
    csv_bytes = data.to_csv().encode('utf-8')
    csv_buffer = BytesIO(csv_bytes)

    # Upload the CSV file to the bucket
    client.put_object(
        bucket_name=bucket_name,
        object_name=f"{table_name}_{current_date}.csv", #name the fail source name and current etl date
        data=csv_buffer,
        length=len(csv_bytes),
        content_type='application/csv'
    )

    # List objects in the bucket
    objects = client.list_objects(bucket_name, recursive=True)
    for obj in objects:
        print(obj.object_name)

#### 2. Load Data to Satging Area
- Load raw data to staging area
- strategy: apply upsert using libray pangres based on primary key for each data

In [14]:
def load_staging(data, schema:str, table_name: str, idx_name:str, source):
    try:
        # create connection to database
        conn = create_engine(f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME_STG}")
        
        # set data index or primary key
        data = data.set_index(idx_name)
        
        # Do upsert (Update for existing data and Insert for new data)
        upsert(con = conn,
                df = data,
                table_name = table_name,
                schema = schema,
                if_row_exists = "update")
        
        #create success log message
        log_msg = {
                "step" : "staging",
                "process":"load",
                "status": "success",
                "source": source,
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
        # return data
    except Exception as e:

        #create fail log message
        log_msg = {
            "step" : "staging",
            "process":"load",
            "status": "failed",
            "source": source,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") , # Current timestamp
            "error_msg": str(e)
        }

        # Handling error: save data to Object Storage
        try:
            handle_error(data = data, bucket_name='error-dellstore', table_name= table_name)
        except Exception as e:
            print(e)
    finally:
        etl_log(log_msg)

    

### 5. Data Pipeline


<img src='pict/live_w3_04.png' width="800"> <br>

#### 1. Ingestion Dell DVD Store Database

Schema Source: <br>
<img src='pict/live_w3_05.png' width="500"> <br>

Extract and Load Data Customer

In [15]:
#Extract
df_customer = extract_database(table_name = "customers")

In [16]:
df_customer.head()

Unnamed: 0,customerid,firstname,lastname,address1,address2,city,state,zip,country,region,...,phone,creditcardtype,creditcard,creditcardexpiration,username,password,age,income,gender,created_at
0,11,Becky,Cochran,"193 Hailey Views\nMichaelside, AS 48241",,East Charleneshire,Pennsylvania,53868,US,1,...,2415449050,5,6630987872369588,2010/03,beckycochran123,password,58,60000,M,2002-03-01
1,12,Raymond,Yang,"683 Albert Ports\nLake Waltershire, CO 77913",,Laneberg,Pennsylvania,18452,US,1,...,1896033667,2,3715867913328111,2011/10,raymondyang123,password,27,20000,F,2003-10-01
2,13,Melanie,Wade,"514 Tonya Heights Suite 730\nSouth Davidfurt, ...",,Port Jessica,Delaware,53356,US,1,...,3029418206,5,3617457962129265,2009/11,melaniewade123,password,43,100000,M,2001-11-01
3,14,Heather,Cruz,"75935 Flynn Island Suite 933\nSouth Alexis, MD...",,Maryton,North Dakota,44395,US,1,...,3748672054,4,3344003576319665,2011/07,heathercruz123,password,85,80000,M,2003-07-01
4,15,Heather,Burgess,6732 Brandi Trafficway Suite 104\nNorth Jonath...,,South Ryan,New Hampshire,37471,US,1,...,3354132892,4,8717996907886119,2008/05,heatherburgess123,password,66,100000,M,2000-05-01


In [17]:
#Load
load_staging(data = df_customer.iloc[:, :-1], schema="staging",
             table_name="customers", idx_name="customerid",
             source="database")

Extract and Load Data Categories

In [18]:
#Extract
df_categories = extract_database(table_name = "categories")
df_categories.head()

# Load
load_staging(data = df_categories.iloc[:, :-1], schema="staging",
             table_name="categories", idx_name="category",
             source="database")

Extract and Load Data Products

In [19]:
#Extract
df_products = extract_database(table_name = "products")
df_products.head()

# Load
load_staging(data = df_products.iloc[:, :-1], schema="staging",
             table_name="products", idx_name="prod_id",
             source="database")

Extract and Load Data Inventory

In [20]:
#Extract
df_inventory = extract_database(table_name = "inventory")
df_inventory.head()

# Load
load_staging(data = df_inventory.iloc[:, :-1], schema="staging",
             table_name="inventory", idx_name="prod_id",
             source="database")

Extract and Load Data Orders

In [21]:
#Extract
df_orders = extract_database(table_name = "orders")
df_orders.head()

# Load
load_staging(data = df_orders.iloc[:, :-1], schema="staging",
             table_name="orders", idx_name="orderid",
             source="database")

Extract and Load Data Orderlines

In [22]:
#Extract
df_orderlines = extract_database(table_name = "orderlines")
df_orderlines.head()

# Load
load_staging(data = df_orderlines.iloc[:, :-1], schema="staging",
             table_name="orderlines", idx_name=["orderid","orderlineid"],
             source="database")

Extract and Load Data Customer History

In [23]:
#Extract
df_cust_hist = extract_database(table_name = "cust_hist")
df_cust_hist.head()

# Load
load_staging(data = df_cust_hist.iloc[:, :-1], schema="staging",
             table_name="cust_hist", idx_name=['customerid','orderid','prod_id'],
             source="database")

#### 2. Ingestion Data History API

The Data API contains data for the period between '2004-01-01' and '2004-02-29'. 

The API cannot send large amounts of data in a single request, so iterate through the data on a daily basis from '2004-01-01' to '2004-02-29'.

Create Function to create list of date

In [24]:
# Function to generate a list of date strings
def date_range(start_date, end_date):
    delta = end_date - start_date
    return [(start_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(delta.days + 1)]

Create date range  between '2004-01-01' and '2004-02-29'. 

In [25]:
# Define the date range
start_date = datetime.strptime("2004-01-01", "%Y-%m-%d")
end_date = datetime.strptime("2004-02-29", "%Y-%m-%d")

# Generate list of dates
dates = date_range(start_date, end_date)

In [26]:
link_api = "https://api-history-order.vercel.app/api/dummydata"

# Iterate over each day and extract data
for date in dates:
    list_parameter = {
        "start_date": date,
        "end_date": date,
    }

    # Extract Data
    df_backfilling = extract_api(link_api, list_parameter, "customer_orders_history")
    
    #Load Data
    if(not df_backfilling.empty):
        load_staging(data = df_backfilling, schema="staging",
                table_name="customer_orders_history", idx_name=['customer_id','order_id','orderline_id'],
                source="api")


#### 3. Ingestion Data Spreadsheet

In [27]:
#Extract
df_analytic = extract_spreadsheet(worksheet_name = 'dellstore_analytic',
                                    key_file = KEY_SPREADSHEET)

In [28]:
df_analytic.head()

Unnamed: 0,orderid,sum_stock,status
1,6114,0,fulfilled
2,11233,100,backordered
3,4790,0,fulfilled
4,273,0,fulfilled
5,11719,0,fulfilled


In [30]:
#Load
load_staging(data = df_analytic, schema="staging",
                table_name="order_status_analytic", idx_name="orderid",
                source="spreadsheet")

link git repository: [git repository](https://github.com/Kurikulum-Sekolah-Pacmann/data_pipeline_dellstore.git)