In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F 
from delta import *

In [0]:
#Initialize a Spark Session which loads up the Delta Lake Extensions
spark = SparkSession.builder.appName('stock-prices')\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
            .getOrCreate()

In [0]:
%fs ls dbfs:/FileStore/tables/ 

path,name,size,modificationTime
dbfs:/FileStore/tables/AAPL.csv,AAPL.csv,712282,1700921500000
dbfs:/FileStore/tables/ADBE.csv,ADBE.csv,676979,1700921500000
dbfs:/FileStore/tables/AMZN.csv,AMZN.csv,693298,1700921504000
dbfs:/FileStore/tables/GOOGL.csv,GOOGL.csv,563457,1700921503000
dbfs:/FileStore/tables/META.csv,META.csv,327826,1700921505000
dbfs:/FileStore/tables/MSFT.csv,MSFT.csv,703911,1700921507000
dbfs:/FileStore/tables/NVDA.csv,NVDA.csv,701824,1700921509000
dbfs:/FileStore/tables/TSLA.csv,TSLA.csv,392793,1700921509000
dbfs:/FileStore/tables/nflx.csv,nflx.csv,612464,1700921508000


In [0]:
# Define the path to your dataset
dataset_path_nflx = "dbfs:/FileStore/tables/nflx.csv"
dataset_path_appl = "dbfs:/FileStore/tables/AAPL.csv"
dataset_path_amzn = "dbfs:/FileStore/tables/AMZN.csv"
dataset_path_google = "dbfs:/FileStore/tables/GOOGL.csv"
dataset_path_meta = "dbfs:/FileStore/tables/META.csv"
dataset_path_nvda = "dbfs:/FileStore/tables/NVDA.csv"
dataset_path_tsla = "dbfs:/FileStore/tables/TSLA.csv"
dataset_path_msft = "dbfs:/FileStore/tables/MSFT.csv"

### Data Preparation: Examine the data strucutre and decide on the schema for the Delta Lake Table

In [0]:
# Read the dataset 
nflx_df = spark.read.csv(
    path=dataset_path_nflx,
    sep=",", 
    header=True, 
    inferSchema=True,
    )
nflx_df.show(5)


+-------------------+------------------+------------------+------------------+------------------+---------+---------+------------+
|               Date|              Open|              High|               Low|             Close|   Volume|Dividends|Stock Splits|
+-------------------+------------------+------------------+------------------+------------------+---------+---------+------------+
|2002-05-23 04:00:00|1.1564290523529053|1.2428569793701172|1.1457140445709229|1.1964290142059326|104790000|      0.0|         0.0|
|2002-05-24 04:00:00| 1.214285969734192| 1.225000023841858|1.1971429586410522|1.2100000381469727| 11104800|      0.0|         0.0|
|2002-05-28 04:00:00|1.2135709524154663|1.2321430444717407| 1.157142996788025| 1.157142996788025|  6609400|      0.0|         0.0|
|2002-05-29 04:00:00|1.1642860174179077|1.1642860174179077|1.0857139825820923|1.1035710573196411|  6757800|      0.0|         0.0|
|2002-05-30 04:00:00|1.1078569889068604|1.1078569889068604|1.0714290142059326|1.071

In [0]:
nflx_df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Dividends: double (nullable = true)
 |-- Stock Splits: double (nullable = true)



##### Correct the schema

In [0]:
# Based on the examination, define the schema explicitly if needed
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

schema = StructType([
    StructField("Date", DateType(), True),
    StructField("Open", FloatType(), True),
    StructField("High", FloatType(), True),
    StructField("Low", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("Volume", FloatType(), True),
    StructField("Dividends", FloatType(), True),
    StructField("Stock Splits", FloatType(), True),  
])

In [0]:
# Read the dataset again with the defined schema
nflx = spark.read.csv(
    path=dataset_path_nflx,
    sep=",", 
    header=True, 
    schema=schema
    )
    
apple = spark.read.csv(
    path=dataset_path_appl,
    sep=",", 
    header=True, 
    schema=schema
    )

amazon = spark.read.csv(
    path=dataset_path_amzn,
    sep=",", 
    header=True, 
    schema=schema
    )

meta = spark.read.csv(
    path=dataset_path_meta,
    sep=",", 
    header=True, 
    schema=schema
    )

msft = spark.read.csv(
    path=dataset_path_msft,
    sep=",", 
    header=True, 
    schema=schema
    )

google = spark.read.csv(
    path=dataset_path_google,
    sep=",", 
    header=True, 
    schema=schema
    )

nvda = spark.read.csv(
    path=dataset_path_nvda,
    sep=",", 
    header=True, 
    schema=schema
    )

tsla = spark.read.csv(
    path=dataset_path_tsla,
    sep=",", 
    header=True, 
    schema=schema
    )

#### Data Cleaning & Transformation

In [0]:
nflx = nflx.withColumn('ticker', F.lit('NFLX'))
apple = apple.withColumn('ticker', F.lit('AAPL'))
amazon = amazon.withColumn('ticker', F.lit('AMZN'))
meta = meta.withColumn('ticker', F.lit('META'))
google = google.withColumn('ticker', F.lit('GOOGL'))
msft = msft.withColumn('ticker', F.lit('MSFT'))
nvda = nvda.withColumn('ticker', F.lit('NVDA'))
tsla = tsla.withColumn('ticker', F.lit('TSLA'))

dataframes = [nflx, apple, amazon, meta, google, msft, nvda, tsla]

df = dataframes[0]

for i in dataframes[1:]:
    df = df.union(i)


df.show()

+----------+--------+--------+--------+--------+---------+---------+------------+------+
|      Date|    Open|    High|     Low|   Close|   Volume|Dividends|Stock Splits|ticker|
+----------+--------+--------+--------+--------+---------+---------+------------+------+
|2002-05-23|1.156429|1.242857|1.145714|1.196429| 1.0479E8|      0.0|         0.0|  NFLX|
|2002-05-24|1.214286|   1.225|1.197143|    1.21|1.11048E7|      0.0|         0.0|  NFLX|
|2002-05-28|1.213571|1.232143|1.157143|1.157143|6609400.0|      0.0|         0.0|  NFLX|
|2002-05-29|1.164286|1.164286|1.085714|1.103571|6757800.0|      0.0|         0.0|  NFLX|
|2002-05-30|1.107857|1.107857|1.071429|1.071429|1.01542E7|      0.0|         0.0|  NFLX|
|2002-05-31|1.078571|1.078571|1.071429|1.076429|8464400.0|      0.0|         0.0|  NFLX|
|2002-06-03|    1.08|1.149286|1.076429|1.128571|3151400.0|      0.0|         0.0|  NFLX|
|2002-06-04|1.135714|    1.14|1.110714|1.117857|3105200.0|      0.0|         0.0|  NFLX|
|2002-06-05|1.110714|

##### Remove Unnecessary Columns

In [0]:
df = df.drop('Stock Splits')
df.show(5)

+----------+--------+--------+--------+--------+---------+---------+------+
|      Date|    Open|    High|     Low|   Close|   Volume|Dividends|ticker|
+----------+--------+--------+--------+--------+---------+---------+------+
|2002-05-23|1.156429|1.242857|1.145714|1.196429| 1.0479E8|      0.0|  NFLX|
|2002-05-24|1.214286|   1.225|1.197143|    1.21|1.11048E7|      0.0|  NFLX|
|2002-05-28|1.213571|1.232143|1.157143|1.157143|6609400.0|      0.0|  NFLX|
|2002-05-29|1.164286|1.164286|1.085714|1.103571|6757800.0|      0.0|  NFLX|
|2002-05-30|1.107857|1.107857|1.071429|1.071429|1.01542E7|      0.0|  NFLX|
+----------+--------+--------+--------+--------+---------+---------+------+
only showing top 5 rows



##### Handle Missing Values

In [0]:
#Show missing values 
from functools import reduce
null_values = df.filter(reduce(lambda a, b: a | b, [F.col(c).isNull() for c in df.columns]))
null_values.show()

+----+----+----+---+-----+------+---------+------+
|Date|Open|High|Low|Close|Volume|Dividends|ticker|
+----+----+----+---+-----+------+---------+------+
+----+----+----+---+-----+------+---------+------+



### Save to Delta Lake Table

In [0]:
df.write.format("delta").save('dbfs:/FileStore/tables/deltaData')
print(f"The number of file is: {df.rdd.getNumPartitions()}")

The number of file is: 8


In [0]:
 %fs ls dbfs:/FileStore/tables/deltaData

path,name,size,modificationTime
dbfs:/FileStore/tables/deltaData/_delta_log/,_delta_log/,0,0
dbfs:/FileStore/tables/deltaData/part-00000-e31887d5-8463-4301-8365-6cba0a1275c3-c000.snappy.parquet,part-00000-e31887d5-8463-4301-8365-6cba0a1275c3-c000.snappy.parquet,131715,1700921555000
dbfs:/FileStore/tables/deltaData/part-00001-2e7904f4-34d5-461f-b98e-33ebde12b22b-c000.snappy.parquet,part-00001-2e7904f4-34d5-461f-b98e-33ebde12b22b-c000.snappy.parquet,146702,1700921555000
dbfs:/FileStore/tables/deltaData/part-00002-e68be46a-a60a-4a6a-a988-566708752548-c000.snappy.parquet,part-00002-e68be46a-a60a-4a6a-a988-566708752548-c000.snappy.parquet,145062,1700921555000
dbfs:/FileStore/tables/deltaData/part-00003-f3af5ff7-6d2f-488a-9acd-7a6e5e18ca34-c000.snappy.parquet,part-00003-f3af5ff7-6d2f-488a-9acd-7a6e5e18ca34-c000.snappy.parquet,71850,1700921555000
dbfs:/FileStore/tables/deltaData/part-00004-6d543892-c380-47fd-9561-1288f0d57a1f-c000.snappy.parquet,part-00004-6d543892-c380-47fd-9561-1288f0d57a1f-c000.snappy.parquet,118781,1700921555000
dbfs:/FileStore/tables/deltaData/part-00005-8e2d7944-3f58-47c4-8efe-104ecf46d7e9-c000.snappy.parquet,part-00005-8e2d7944-3f58-47c4-8efe-104ecf46d7e9-c000.snappy.parquet,147029,1700921555000
dbfs:/FileStore/tables/deltaData/part-00006-24f8e63b-26a2-478d-927e-65d0173022fa-c000.snappy.parquet,part-00006-24f8e63b-26a2-478d-927e-65d0173022fa-c000.snappy.parquet,146856,1700921555000
dbfs:/FileStore/tables/deltaData/part-00007-09072444-11f5-48d4-b0d8-fd53197100f9-c000.snappy.parquet,part-00007-09072444-11f5-48d4-b0d8-fd53197100f9-c000.snappy.parquet,83039,1700921555000


In [0]:
 %fs ls dbfs:/FileStore/tables/deltaData/_delta_log

path,name,size,modificationTime
dbfs:/FileStore/tables/deltaData/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1700921557000
dbfs:/FileStore/tables/deltaData/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1700921557000
dbfs:/FileStore/tables/deltaData/_delta_log/.s3-optimization-2,.s3-optimization-2,0,1700921557000
dbfs:/FileStore/tables/deltaData/_delta_log/00000000000000000000.crc,00000000000000000000.crc,8895,1700921569000
dbfs:/FileStore/tables/deltaData/_delta_log/00000000000000000000.json,00000000000000000000.json,7923,1700921557000


## Time Travel Feature Exploration

##### Add a stock to the list 

In [0]:
adbe = spark.read.csv(
    path="dbfs:/FileStore/tables/ADBE.csv",
    sep=",", 
    header=True, 
    schema=schema
    )

In [0]:
adbe = adbe.withColumn('ticker', F.lit('ADBE')).drop('Stock Splits')
null_values = adbe.filter(reduce(lambda a, b: a | b, [F.col(c).isNull() for c in adbe.columns]))
null_values.show()

+----+----+----+---+-----+------+---------+------+
|Date|Open|High|Low|Close|Volume|Dividends|ticker|
+----+----+----+---+-----+------+---------+------+
+----+----+----+---+-----+------+---------+------+



In [0]:
adbe.write.format("delta").mode("append").save('dbfs:/FileStore/tables/deltaData')

In [0]:
 %fs ls dbfs:/FileStore/tables/deltaData

path,name,size,modificationTime
dbfs:/FileStore/tables/deltaData/_delta_log/,_delta_log/,0,0
dbfs:/FileStore/tables/deltaData/part-00000-af8d539f-b948-49c3-99c3-33d678b215ce-c000.snappy.parquet,part-00000-af8d539f-b948-49c3-99c3-33d678b215ce-c000.snappy.parquet,146704,1700921575000
dbfs:/FileStore/tables/deltaData/part-00000-e31887d5-8463-4301-8365-6cba0a1275c3-c000.snappy.parquet,part-00000-e31887d5-8463-4301-8365-6cba0a1275c3-c000.snappy.parquet,131715,1700921555000
dbfs:/FileStore/tables/deltaData/part-00001-2e7904f4-34d5-461f-b98e-33ebde12b22b-c000.snappy.parquet,part-00001-2e7904f4-34d5-461f-b98e-33ebde12b22b-c000.snappy.parquet,146702,1700921555000
dbfs:/FileStore/tables/deltaData/part-00002-e68be46a-a60a-4a6a-a988-566708752548-c000.snappy.parquet,part-00002-e68be46a-a60a-4a6a-a988-566708752548-c000.snappy.parquet,145062,1700921555000
dbfs:/FileStore/tables/deltaData/part-00003-f3af5ff7-6d2f-488a-9acd-7a6e5e18ca34-c000.snappy.parquet,part-00003-f3af5ff7-6d2f-488a-9acd-7a6e5e18ca34-c000.snappy.parquet,71850,1700921555000
dbfs:/FileStore/tables/deltaData/part-00004-6d543892-c380-47fd-9561-1288f0d57a1f-c000.snappy.parquet,part-00004-6d543892-c380-47fd-9561-1288f0d57a1f-c000.snappy.parquet,118781,1700921555000
dbfs:/FileStore/tables/deltaData/part-00005-8e2d7944-3f58-47c4-8efe-104ecf46d7e9-c000.snappy.parquet,part-00005-8e2d7944-3f58-47c4-8efe-104ecf46d7e9-c000.snappy.parquet,147029,1700921555000
dbfs:/FileStore/tables/deltaData/part-00006-24f8e63b-26a2-478d-927e-65d0173022fa-c000.snappy.parquet,part-00006-24f8e63b-26a2-478d-927e-65d0173022fa-c000.snappy.parquet,146856,1700921555000
dbfs:/FileStore/tables/deltaData/part-00007-09072444-11f5-48d4-b0d8-fd53197100f9-c000.snappy.parquet,part-00007-09072444-11f5-48d4-b0d8-fd53197100f9-c000.snappy.parquet,83039,1700921555000


In [0]:
 %fs ls dbfs:/FileStore/tables/deltaData/_delta_log

path,name,size,modificationTime
dbfs:/FileStore/tables/deltaData/_delta_log/.s3-optimization-0,.s3-optimization-0,0,1700921557000
dbfs:/FileStore/tables/deltaData/_delta_log/.s3-optimization-1,.s3-optimization-1,0,1700921557000
dbfs:/FileStore/tables/deltaData/_delta_log/.s3-optimization-2,.s3-optimization-2,0,1700921557000
dbfs:/FileStore/tables/deltaData/_delta_log/00000000000000000000.crc,00000000000000000000.crc,8895,1700921569000
dbfs:/FileStore/tables/deltaData/_delta_log/00000000000000000000.json,00000000000000000000.json,7923,1700921557000
dbfs:/FileStore/tables/deltaData/_delta_log/00000000000000000001.crc,00000000000000000001.crc,9703,1700921580000
dbfs:/FileStore/tables/deltaData/_delta_log/00000000000000000001.json,00000000000000000001.json,1331,1700921576000


### Update Operations

In [0]:
# Load Delta Table
delta_table_path = "dbfs:/FileStore/tables/deltaData"
deltaTable = DeltaTable.forPath(spark, delta_table_path)

In [0]:
#Update a specific row 
condition = "Date = '2002-05-23' AND ticker = 'NFLX'"
new_close_price = 1.20
deltaTable.update(condition, {"Close": str(new_close_price)})

In [0]:
#View the data after the modificatons
updated_df = spark.read.format("delta").load("dbfs:/FileStore/tables/deltaData")
updated_df.filter(F.col('ticker') == 'NFLX').show()

+----------+--------+--------+--------+--------+---------+---------+------+
|      Date|    Open|    High|     Low|   Close|   Volume|Dividends|ticker|
+----------+--------+--------+--------+--------+---------+---------+------+
|2002-05-23|1.156429|1.242857|1.145714|     1.2| 1.0479E8|      0.0|  NFLX|
|2002-05-24|1.214286|   1.225|1.197143|    1.21|1.11048E7|      0.0|  NFLX|
|2002-05-28|1.213571|1.232143|1.157143|1.157143|6609400.0|      0.0|  NFLX|
|2002-05-29|1.164286|1.164286|1.085714|1.103571|6757800.0|      0.0|  NFLX|
|2002-05-30|1.107857|1.107857|1.071429|1.071429|1.01542E7|      0.0|  NFLX|
|2002-05-31|1.078571|1.078571|1.071429|1.076429|8464400.0|      0.0|  NFLX|
|2002-06-03|    1.08|1.149286|1.076429|1.128571|3151400.0|      0.0|  NFLX|
|2002-06-04|1.135714|    1.14|1.110714|1.117857|3105200.0|      0.0|  NFLX|
|2002-06-05|1.110714|1.159286|1.107143|1.147143|1531600.0|      0.0|  NFLX|
|2002-06-06|    1.15|1.232143|1.148571|1.182143|2305800.0|      0.0|  NFLX|
|2002-06-07|

### Delete Operations

In [0]:
updated_df.filter(F.col('ticker') == 'ADBE').show(5)

+----------+---------+---------+---------+---------+---------+---------+------+
|      Date|     Open|     High|      Low|    Close|   Volume|Dividends|ticker|
+----------+---------+---------+---------+---------+---------+---------+------+
|2000-01-03| 16.69356|16.755617|15.948866| 16.27467|7384400.0|      0.0|  ADBE|
|2000-01-04| 15.63858|16.336731|14.878372|14.909401|7813200.0|      0.0|  ADBE|
|2000-01-05|14.459477|15.576518|14.459477|15.204171|1.49272E7|      0.0|  ADBE|
|2000-01-06|15.250718|15.545492|15.049029| 15.32829|1.02212E7|      0.0|  ADBE|
|2000-01-07|15.281749|16.072987| 15.11109|16.072987|8253200.0|      0.0|  ADBE|
+----------+---------+---------+---------+---------+---------+---------+------+
only showing top 5 rows



In [0]:
#Delete the ADBE stock from the dataframe
del_condition = "ticker = 'ADBE'"
deltaTable.delete(del_condition)

In [0]:
#View the data after the delete modificatons
updated_df1 = spark.read.format("delta").load("dbfs:/FileStore/tables/deltaData")
updated_df1.filter(F.col('ticker') == 'ADBE').show()

+----+----+----+---+-----+------+---------+------+
|Date|Open|High|Low|Close|Volume|Dividends|ticker|
+----+----+----+---+-----+------+---------+------+
+----+----+----+---+-----+------+---------+------+



In [0]:
#Show the history
deltaTable.history().show()

+-------+-------------------+---------------+-----------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|         userId|         userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+---------------+-----------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      3|2023-11-25 14:13:26|192325061983218|wiajayi@gmail.com|   DELETE|{predicate -> ["(...|null|{3208108577835549}|1125-134628-7x4wuv1m|          2|WriteSerializable|        false|{numRemovedFiles ...|        null|Databricks-Runtim...|
|      2|2023-11-25 14:13:14|192325061983218

In [0]:
# Go back before the DELETE Operation 
version_02_df = spark.read.format("delta").option("versionAsOf", 2).load("dbfs:/FileStore/tables/deltaData")
version_02_df.filter(F.col('ticker') == 'ADBE').show()

+----------+---------+----------+----------+---------+---------+---------+------+
|      Date|     Open|      High|       Low|    Close|   Volume|Dividends|ticker|
+----------+---------+----------+----------+---------+---------+---------+------+
|2000-01-03| 16.69356| 16.755617| 15.948866| 16.27467|7384400.0|      0.0|  ADBE|
|2000-01-04| 15.63858| 16.336731| 14.878372|14.909401|7813200.0|      0.0|  ADBE|
|2000-01-05|14.459477| 15.576518| 14.459477|15.204171|1.49272E7|      0.0|  ADBE|
|2000-01-06|15.250718| 15.545492| 15.049029| 15.32829|1.02212E7|      0.0|  ADBE|
|2000-01-07|15.281749| 16.072987|  15.11109|16.072987|8253200.0|      0.0|  ADBE|
|2000-01-10|16.228132| 16.693565| 15.793726|16.693565|1.09872E7|      0.0|  ADBE|
|2000-01-11| 16.67805| 16.693563| 15.483436|15.545493|9616000.0|      0.0|  ADBE|
|2000-01-12|15.576523|15.7316675| 15.328291|15.467921|8051200.0|      0.0|  ADBE|
|2000-01-13|15.592037| 16.383274| 15.576523|16.290188|5527200.0|      0.0|  ADBE|
|2000-01-14|16.5