In [1]:
import os
import sys
from pyspark.sql import SparkSession
from delta import *

os.environ['PYSPARK_PYTHON'] = "/usr/bin/python3"
os.environ['PYSPARK_DRIVER_PYTHON'] = "/usr/bin/python3"

builder = SparkSession.builder\
        .appName("Lakehouse")\
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


:: loading settings :: url = jar:file:/Users/mars_su/Library/Python/3.9/lib/python/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/mars_su/.ivy2/cache
The jars for the packages stored in: /Users/mars_su/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-05a6c8e9-8bd3-4b64-9d86-2d24e123f105;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in local-m2-cache
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in local-m2-cache
:: resolution report :: resolve 346ms :: artifacts dl 18ms
	:: modules in use:
	io.delta#delta-core_2.12;2.1.0 from central in [default]
	io.delta#delta-storage;2.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from local-m2-cache in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from local-m2-cache in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf  

23/12/09 14:46:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Raw data to Bronze 

In [2]:
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    ArrayType
)

rawdata_schema = StructType([
    StructField('partner', StringType(), True),
    StructField('course', StringType(), True),
    StructField('rating', StringType(), True),
    StructField('reviewcount', StringType(), True),
    StructField('level', StringType(), True),
    StructField('certificatetype', StringType(), True),
    StructField('duration', StringType(), True),
    StructField('crediteligibility', StringType(), True),
])


raw_df = spark.read.csv("./rawdata/Coursera.csv", header=True, schema=rawdata_schema)
raw_df.show(2)

+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
|partner|              course|rating|reviewcount|    level|     certificatetype|     duration|crediteligibility|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
| Google|Google Cybersecurity|   4.8|      16.4k|Beginner | Professional Cer...| 3 - 6 Months|            FALSE|
| Google|Google Data Analy...|   4.8|     133.4k|Beginner | Professional Cer...| 3 - 6 Months|             TRUE|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
only showing top 2 rows



                                                                                

In [3]:
raw_df.write\
    .partitionBy("partner")\
    .format("delta")\
    .mode("append")\
    .save("./bronze/")

                                                                                

#### Delta log for Bronze

In [4]:
from delta.tables import DeltaTable

bronze_delta_table = DeltaTable.forPath(spark, "./bronze/")
operation_df = bronze_delta_table.history()
operation_df.show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2023-12-09 14:47:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|       null|  Serializable|         true|{numFiles -> 180,...|        null|Apache-Spark/3.3....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+



In [5]:
operation_df.select(
    "version",
    "timestamp",
    "operationParameters.mode",
    "operationParameters.partitionBy",
    "operation",
    "operationMetrics.numFiles",
    "operationMetrics.numFiles"
).show(20)

+-------+--------------------+------+-----------+---------+--------+--------+
|version|           timestamp|  mode|partitionBy|operation|numFiles|numFiles|
+-------+--------------------+------+-----------+---------+--------+--------+
|      0|2023-12-09 14:47:...|Append|["partner"]|    WRITE|     180|     180|
+-------+--------------------+------+-----------+---------+--------+--------+



### Bronze to Silver

In [6]:
bronze_df = spark.read.format("delta").load("./bronze/")
bronze_df.show(2)

+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
|partner|              course|rating|reviewcount|    level|     certificatetype|     duration|crediteligibility|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
| Google|Google Cybersecurity|   4.8|      16.4k|Beginner | Professional Cer...| 3 - 6 Months|            FALSE|
| Google|Google Data Analy...|   4.8|     133.4k|Beginner | Professional Cer...| 3 - 6 Months|             TRUE|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
only showing top 2 rows



In [7]:
import pyspark.sql.functions as F

def convert_k_count(count):
    if count[-1] == "k":
        return float(count[:-1])*1000
    return count

udf = F.udf(convert_k_count, DoubleType())

bronze_df = bronze_df.filter(F.col("reviewcount").isNotNull())
bronze_df = bronze_df.withColumn("reviewcount", udf(F.col("reviewcount")))
bronze_df = bronze_df.filter(F.col("level").isNotNull())
bronze_df.show(2)

[Stage 14:>                                                         (0 + 1) / 1]

+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
|partner|              course|rating|reviewcount|    level|     certificatetype|     duration|crediteligibility|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
| Google|Google Cybersecurity|   4.8|    16400.0|Beginner | Professional Cer...| 3 - 6 Months|            FALSE|
| Google|Google Data Analy...|   4.8|   133400.0|Beginner | Professional Cer...| 3 - 6 Months|             TRUE|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
only showing top 2 rows



                                                                                

In [8]:
bronze_df.write\
    .partitionBy("partner", "level")\
    .format("delta")\
    .mode("append")\
    .save("./silver/")

[Stage 17:>                                                         (0 + 8) / 8]

23/12/09 14:49:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

#### Delta log for Silver

In [9]:
from delta.tables import DeltaTable

silver_delta_table = DeltaTable.forPath(spark, "./silver/")
silver_operation_df = silver_delta_table.history()
silver_operation_df.show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2023-12-09 14:49:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|       null|  Serializable|         true|{numFiles -> 283,...|        null|Apache-Spark/3.3....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+



In [10]:
silver_operation_df.select(
    "version",
    "timestamp",
    "operationParameters.mode",
    "operationParameters.partitionBy",
    "operation",
    "operationMetrics.numFiles",
    "operationMetrics.numFiles"
).show(20)

+-------+--------------------+------+-------------------+---------+--------+--------+
|version|           timestamp|  mode|        partitionBy|operation|numFiles|numFiles|
+-------+--------------------+------+-------------------+---------+--------+--------+
|      0|2023-12-09 14:49:...|Append|["partner","level"]|    WRITE|     283|     283|
+-------+--------------------+------+-------------------+---------+--------+--------+



### Silver to Golden

In [11]:
silver_df = spark.read.format("delta").load("./silver/")
silver_df.show(2)

+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
|partner|              course|rating|reviewcount|    level|     certificatetype|     duration|crediteligibility|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
| Google|Google Cybersecurity|   4.8|    16400.0|Beginner | Professional Cer...| 3 - 6 Months|            FALSE|
| Google|Google Data Analy...|   4.8|   133400.0|Beginner | Professional Cer...| 3 - 6 Months|             TRUE|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
only showing top 2 rows



In [12]:
final_df = silver_df\
            .groupby("partner", "level")\
            .agg(F.avg("reviewcount").alias("avg_reviewcount"))\
            .filter(F.col("avg_reviewcount").isNotNull())
final_df.show()



+--------------------+-------------+------------------+
|             partner|        level|   avg_reviewcount|
+--------------------+-------------+------------------+
|University of Vir...|    Beginner |            7000.0|
|                Meta|    Beginner | 5964.705882352941|
|Coursera Project ...|    Beginner |            2800.0|
|     Duke University|    Beginner |            7630.0|
|University of Cal...|    Beginner |           19137.5|
|University of Cal...|    Beginner | 7627.272727272727|
|              Google|    Beginner |           23235.0|
|University of Ill...|    Beginner |            6412.5|
|University of Pen...|       Mixed |           6156.25|
|University of Ill...|Intermediate |4066.6666666666665|
|University of Col...|Intermediate |            3120.0|
|University of Mic...|Intermediate |11342.857142857143|
|University of Pen...|    Beginner |            6605.0|
|                 IBM|    Beginner |25444.897959183672|
|     Rice University|    Beginner |            

                                                                                

In [13]:
final_df.write\
    .partitionBy("partner", "level")\
    .format("delta")\
    .mode("append")\
    .save("./golden/")

                                                                                

#### Delta log for Golden

In [14]:
from delta.tables import DeltaTable

golden_delta_table = DeltaTable.forPath(spark, "./golden//")
golden_operation_df = golden_delta_table.history()
golden_operation_df.show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2023-12-09 14:50:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|       null|  Serializable|         true|{numFiles -> 167,...|        null|Apache-Spark/3.3....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+



In [15]:
golden_operation_df.select(
    "version",
    "timestamp",
    "operationParameters.mode",
    "operationParameters.partitionBy",
    "operation",
    "operationMetrics.numFiles",
    "operationMetrics.numFiles"
).show(20)

+-------+--------------------+------+-------------------+---------+--------+--------+
|version|           timestamp|  mode|        partitionBy|operation|numFiles|numFiles|
+-------+--------------------+------+-------------------+---------+--------+--------+
|      0|2023-12-09 14:50:...|Append|["partner","level"]|    WRITE|     167|     167|
+-------+--------------------+------+-------------------+---------+--------+--------+



### TimeTravel(Schema Evolution) Example

In [16]:
v1_silver_df = spark.read.format("delta")\
                    .option("versionAsOf", "0")\
                    .load("./silver/")
v1_silver_df.show(2)

+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
|partner|              course|rating|reviewcount|    level|     certificatetype|     duration|crediteligibility|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
| Google|Google Cybersecurity|   4.8|    16400.0|Beginner | Professional Cer...| 3 - 6 Months|            FALSE|
| Google|Google Data Analy...|   4.8|   133400.0|Beginner | Professional Cer...| 3 - 6 Months|             TRUE|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+
only showing top 2 rows



In [17]:
v2_silver_df = v1_silver_df.filter(
    (F.col("certificatetype")!="None") &\
    (F.col("level")!="None") &\
    (F.col("crediteligibility") == True)
)
v2_silver_df = v2_silver_df.withColumn(
                "concat_label", 
                F.concat(F.col("partner"), F.lit("-"), F.col("level")))

v2_silver_df.show()

                                                                                

+--------------------+--------------------+------+-----------+-------------+--------------------+-------------+-----------------+--------------------+
|             partner|              course|rating|reviewcount|        level|     certificatetype|     duration|crediteligibility|        concat_label|
+--------------------+--------------------+------+-----------+-------------+--------------------+-------------+-----------------+--------------------+
|              Google|Google Data Analy...|   4.8|   133400.0|    Beginner | Professional Cer...| 3 - 6 Months|             TRUE|    Google-Beginner |
|              Google|Google Project Ma...|   4.8|    97300.0|    Beginner | Professional Cer...| 3 - 6 Months|             TRUE|    Google-Beginner |
|              Google|   Google IT Support|   4.8|   181400.0|    Beginner | Professional Cer...| 3 - 6 Months|             TRUE|    Google-Beginner |
|              Google|    Google UX Design|   4.8|    73700.0|    Beginner | Professional Cer.

In [18]:
v1_silver_df.count()

                                                                                

1120

In [19]:
v2_silver_df.count()

                                                                                

45

In [20]:
v2_silver_df.write\
    .partitionBy("partner", "level")\
    .option("mergeSchema", "true")\
    .option("userMetadata", "filter-null-value-and-add-label")\
    .format("delta")\
    .mode("append")\
    .save("./silver/")

                                                                                

In [21]:
from delta.tables import DeltaTable

silver_delta_table = DeltaTable.forPath(spark, "./silver/")
silver_operation_df = silver_delta_table.history()
silver_operation_df.show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+--------------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|        userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+--------------------+--------------------+
|      1|2023-12-09 14:52:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|          0|  Serializable|         true|{numFiles -> 13, ...|filter-null-value...|Apache-Spark/3.3....|
|      0|2023-12-09 14:49:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|       null|  Serializable|         true|{numFiles -> 283,...|             

In [22]:
silver_operation_df.select(
    "version",
    "timestamp",
    "operationParameters.mode",
    "operationParameters.partitionBy",
    "operation",
    "operationMetrics.numFiles",
    "operationMetrics.numFiles"
).show(20)

+-------+--------------------+------+-------------------+---------+--------+--------+
|version|           timestamp|  mode|        partitionBy|operation|numFiles|numFiles|
+-------+--------------------+------+-------------------+---------+--------+--------+
|      1|2023-12-09 14:52:...|Append|["partner","level"]|    WRITE|      13|      13|
|      0|2023-12-09 14:49:...|Append|["partner","level"]|    WRITE|     283|     283|
+-------+--------------------+------+-------------------+---------+--------+--------+



In [28]:
test_silver_df = spark.read.format("delta")\
                    .option("versionAsOf", "1")\
                    .load("./silver/")
test_silver_df.show(2)

+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+------------+
|partner|              course|rating|reviewcount|    level|     certificatetype|     duration|crediteligibility|concat_label|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+------------+
| Google|Google Cybersecurity|   4.8|    16400.0|Beginner | Professional Cer...| 3 - 6 Months|            FALSE|        null|
| Google|Google Data Analy...|   4.8|   133400.0|Beginner | Professional Cer...| 3 - 6 Months|             TRUE|        null|
+-------+--------------------+------+-----------+---------+--------------------+-------------+-----------------+------------+
only showing top 2 rows



In [27]:
test_silver_df.count()

1165