4.4 Saving Processed Data to Azure SQL Database

In [0]:
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine


    4.4.1. Spliting the cleaned&transformed data into fact&dim tables:
        Fact Table: Order Items with calculated columns.
        Dimension Tables: Customers, Products, Sellers, Date.



In [0]:
#reading cleaned & merged datasets
order_items_data_cleaned = pd.read_csv('order_items_data_cleaned.csv')
orders_customers_data_cleaned = pd.read_csv('orders_customers_data_cleaned.csv')
products_data_cleaned = pd.read_csv('products_data_cleaned.csv')
order_payments_data_cleaned = pd.read_csv('order_payments_data_cleaned.csv')
customers_data_cleaned = pd.read_csv('customers_data_cleaned.csv')
sellers_data_cleaned = pd.read_csv('sellers_data_cleaned.csv')
geolocation_data_cleaned = pd.read_csv('geolocation_data_cleaned.csv')

In [0]:
#1. Fact Table Order Items with calculated columns (already done): order_items_data_cleaned

#2. Customers dim table
# checking data types are the same before merging
customers_data_cleaned['customer_zip_code_prefix'] = customers_data_cleaned['customer_zip_code_prefix'].astype(str)
geolocation_data_cleaned['geolocation_zip_code_prefix'] = geolocation_data_cleaned['geolocation_zip_code_prefix'].astype(str)

# dropping duplicates in geolocation data for merging
geolocation_data_cleaned = geolocation_data_cleaned.drop_duplicates(subset=['geolocation_zip_code_prefix'])

# merging geolocation data with customers to include geographic information
customers_data_cleaned = customers_data_cleaned.merge(
    geolocation_data_cleaned,
    left_on='customer_zip_code_prefix',
    right_on='geolocation_zip_code_prefix',
    how='left',
    suffixes=('', '_geo')
)

#3. Sellers dim table
# checking data type match
sellers_data_cleaned['seller_zip_code_prefix'] = sellers_data_cleaned['seller_zip_code_prefix'].astype(str)

# merging geolocation data with sellers to include geographic information
sellers_data_cleaned = sellers_data_cleaned.merge(
    geolocation_data_cleaned,
    left_on='seller_zip_code_prefix',
    right_on='geolocation_zip_code_prefix',
    how='left',
    suffixes=('', '_geo')
)

#4. Products dim table (already done): products_data_cleaned
#5. Orders customers data

In [0]:
#adding effective and end dates to track historical changes in the data
customers_data_cleaned['effective_date'] = pd.to_datetime('now')
customers_data_cleaned['end_date'] = pd.NaT

products_data_cleaned['effective_date'] = pd.to_datetime('now')
products_data_cleaned['end_date'] = pd.NaT

sellers_data_cleaned['effective_date'] = pd.to_datetime('now')
sellers_data_cleaned['end_date'] = pd.NaT

order_items_data_cleaned['effective_date'] = pd.to_datetime('now')
order_items_data_cleaned['end_date'] = pd.NaT

order_customers_data_cleaned['effective_date'] = pd.to_datetime('now')
order_customers_data_cleaned['end_date'] = pd.NaT

#creating a date dimension table from the orders and customers data
date_dim = orders_customers_data_cleaned[
    ['order_purchase_timestamp', 'order_estimated_delivery_date']
].drop_duplicates().reset_index(drop=True)
#renaming columns for clarity
date_dim.rename(
    columns={
        'order_purchase_timestamp': 'purchase_date', 
        'order_estimated_delivery_date': 'estimated_delivery_date'
    }, 
    inplace=True
)

  customers_data_cleaned['effective_date'] = pd.to_datetime('now')
  products_data_cleaned['effective_date'] = pd.to_datetime('now')
  sellers_data_cleaned['effective_date'] = pd.to_datetime('now')
  order_items_data_cleaned['effective_date'] = pd.to_datetime('now')


In [0]:
#constructing the JDBC URL and connection properties for connecting to the Azure SQL Database.

from msal import ConfidentialClientApplication

tenant_id = 'xxxx'  #tenantID
client_id = 'xxxx'  #clientID
client_secret = 'xxxx' #secret value generated during App Registration

# Azure SQL Server details
jdbcHostname = "test-server-2.database.windows.net"
jdbcDatabase = "olist_db"
jdbcPort = 1433

# Get the Access Token from Azure AD
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = ConfidentialClientApplication(client_id, authority=authority, client_credential=client_secret)

token_response = app.acquire_token_for_client(scopes=["https://database.windows.net/.default"])
access_token = token_response.get('access_token')

if not access_token:
    raise Exception("Failed to acquire access token.")

# Construct the JDBC URL
jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

# Connection properties for the JDBC driver
connectionProperties = {
    "accessToken": access_token,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


In [0]:
# Convert Pandas DataFrames to Spark DataFrames
customers_spark_df = spark.createDataFrame(customers_data_cleaned)
products_spark_df = spark.createDataFrame(products_data_cleaned)
sellers_spark_df = spark.createDataFrame(sellers_data_cleaned)
date_dim_spark_df = spark.createDataFrame(date_dim)
order_items_spark_df = spark.createDataFrame(order_items_data_cleaned)

# Write the Spark DataFrames to Azure SQL Database
customers_spark_df.write.jdbc(
    url=jdbcUrl,
    table="dim_customers",
    mode="overwrite",
    properties=connectionProperties
)

products_spark_df.write.jdbc(
    url=jdbcUrl,
    table="dim_products",
    mode="overwrite",
    properties=connectionProperties
)

sellers_spark_df.write.jdbc(
    url=jdbcUrl,
    table="dim_sellers",
    mode="overwrite",
    properties=connectionProperties
)

date_dim_spark_df.write.jdbc(
    url=jdbcUrl,
    table="dim_date",
    mode="overwrite",
    properties=connectionProperties
)

order_items_spark_df.write.jdbc(
    url=jdbcUrl,
    table="fact_order_items",
    mode="overwrite",
    properties=connectionProperties
)

print("Data successfully written to Azure SQL Database!")


Data successfully written to Azure SQL Database!


In [0]:
#Implementing SCD Type 1

#1. SCD Type 1 - performed on customers_data_cleaned; update customer city and state directly, overwriting previous values
def scd_type_1_update(customers_spark_df, customer_id, updated_data):
    print("\nBefore SCD Type 1 Update:")
    display(customers_spark_df.filter(customers_spark_df['customer_id'] == customer_id))
    
    for key, value in updated_data.items():
        customers_spark_df = customers_spark_df.withColumn(
            key, 
            F.when(customers_spark_df['customer_id'] == customer_id, value).otherwise(customers_spark_df[key])
        )
    
    print("\nAfter SCD Type 1 Update:")
    display(customers_spark_df.filter(customers_spark_df['customer_id'] == customer_id))
    
    return customers_spark_df

# Changing one record
updated_data_scd1 = {'customer_city': 'Curitiba', 'customer_state': 'PR'}
customers_spark_df = scd_type_1_update(customers_spark_df, '69ad55d4bfffc89d50f84aa4e3648d5e', updated_data_scd1)



Before SCD Type 1 Update:


customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state,effective_date,end_date
69ad55d4bfffc89d50f84aa4e3648d5e,8ea8b926af8086a391b130944d230549,92310,Canoas,RS,92310,-29.915258224808355,-51.18616902904408,Canoas,RS,2024-10-04T17:40:48.273131Z,



After SCD Type 1 Update:


customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state,effective_date,end_date
69ad55d4bfffc89d50f84aa4e3648d5e,8ea8b926af8086a391b130944d230549,92310,Curitiba,PR,92310,-29.915258224808355,-51.18616902904408,Canoas,RS,2024-10-04T17:40:48.273131Z,


In [0]:
# writing the updated Spark DataFrame to Azure SQL Database
customers_spark_df.write.jdbc(
    url=jdbcUrl,
    table="dim_customers2",
    mode="overwrite",
    properties=connectionProperties
)