In [0]:
%sql
SELECT * FROM workspace;

In [0]:
%sql
CREATE TABLE bronze_view USING DELTA AS select * from workspace;

Bronze Layer


In [0]:
%sql

SELECT COUNT(*) FROM bronze_view;

In [0]:
df = spark.read.table("bronze_view")


df.write.format("delta").mode("overwrite").saveAsTable("bronzetable")

Silver Layer

In [0]:
from pyspark.sql.functions import col

df_clean = df.withColumn("Quantity", col("Quantity").cast("int")) \
             .withColumn("UnitPrice", col("UnitPrice").cast("double")) \
             .withColumn("CustomerID", col("CustomerID").cast("string"))

In [0]:
df_clean = df_clean.filter(
    (col("Quantity") > 0) &
    (col("UnitPrice") > 0) &
    (col("CustomerID").isNotNull())
)

In [0]:
df_clean = df_clean.dropDuplicates()

In [0]:
from pyspark.sql.functions import expr

df_clean = df_clean.withColumn(
    "Revenue",
    expr("Quantity * UnitPrice")
)

In [0]:
silver_df = df_clean.drop("_rescued_data")

In [0]:
silver_df.write.format("delta").mode("overwrite").saveAsTable("silvertable")

Gold Layer

In [0]:
df = spark.read.table("silvertable")


In [0]:
df.show()

In [0]:
from pyspark.sql.functions import sum

gold_df = df.groupBy("Country").agg(sum("Revenue").alias("TotalRevenue"))
gold_df.show()
gold_df.write.format("delta").mode("overwrite").saveAsTable("goldtable_country_revenue")

In [0]:
from pyspark.sql.functions import countDistinct

gold1_df = df.groupBy("Country").agg(countDistinct("CustomerID").alias("Customers_per_Country"))
display(gold1_df)
gold1_df.write.format("delta").mode("overwrite").saveAsTable("goldtable_customers_per_country")

In [0]:
gold2_df = df.groupBy("CustomerID").agg(countDistinct("InvoiceNo").alias("Orders_per_Customer"))
display(gold2_df)
gold2_df.write.format("delta").mode("overwrite").saveAsTable("goldtable_orders_per_customer")


In [0]:
gold3_df = df.groupBy("StockCode").agg(countDistinct("InvoiceNo").alias("Orders_per_Item"))
display(gold3_df)
gold3_df.write.format("delta").mode("overwrite").saveAsTable("goldtable_orders_per_item")

In [0]:
from pyspark.sql.functions import (
    max, countDistinct, sum,
    datediff, current_date,
    col, to_timestamp
)
df = spark.table("silvertable")
df = silver_df.withColumn(
    "InvoiceDate",
    to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")
)

gold4_df = df.groupBy("CustomerID").agg(
    max("InvoiceDate").alias("last_purchase"),
    countDistinct("InvoiceNo").alias("frequency"),
    sum("Revenue").alias("monetary")
)
gold4_df = gold4_df.withColumn(
    "recently_purchased_days",
    datediff(current_date(), col("last_purchase"))
)

display(gold4_df)


gold4_df.write.format("delta").mode("overwrite").saveAsTable("goldtable_customer_metrics")

In [0]:
%sql


SHOW TABLES;

In [0]:
%sql
show tables;

In [0]:
%pip install databricks-sql-connector

In [0]:
dbutils.library.restartPython()

In [0]:
from databricks import sql
import os

DATABRICKS_TOKEN = ""

connection = sql.connect(
    server_hostname="dbc-e75fc35a-1074.cloud.databricks.com",
    http_path="/sql/1.0/warehouses/3f95d8e84153f412",
    access_token=DATABRICKS_TOKEN
)

cursor = connection.cursor()

cursor.execute("SELECT * FROM range(10)")
result = cursor.fetchall()

In [0]:
%sql
SHOW TABLES