# Dependency Installation

In [None]:
!pip install delta-spark==2.4.0 yfinance==0.2.55

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import *
import yfinance as yf
import pandas as pd

# Spark Session

In [None]:
spark_conf = SparkConf()
spark_conf.set("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0,com.amazonaws:aws-java-sdk-bundle:1.11.1026,org.apache.hadoop:hadoop-aws:3.3.4")
spark_conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
spark_conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark_conf.set("spark.cores", "8")
spark_conf.set("spark.driver.cores", "8")
spark_conf.set("hive.metastore.uris", "thrift://hive-metastore:9083")

spark = SparkSession \
    .builder.master("local") \
    .appName("Example") \
    .config(conf=spark_conf) \
    .enableHiveSupport() \
    .getOrCreate()

# Database creation

In [None]:
spark.sql(f"""
    CREATE DATABASE IF NOT EXISTS datalake
    LOCATION 's3a://raw.datalake.mydomain.com/';
""").show()

# Download Financial Data from Yahoo

In [None]:
data = yf.download('PETR4.SA', start='2024-01-01')
data = data.reset_index()

In [None]:
spark.createDataFrame(pd.DataFrame(data.values, columns=['Date','Close','High','Low','Open','Volume'])).createOrReplaceTempView('tmp_stocks')

In [None]:
spark.sql("SELECT * FROM tmp_stocks").printSchema()

In [None]:
df = spark.sql("SELECT 'PETR4.SA' AS ticker, `Close`, `High`, `Low`, `Open`, `Volume`, DATE(`Date`) AS dt FROM tmp_stocks")

# Write table to datalake

In [None]:
df.write.format("delta").mode("overwrite").saveAsTable("datalake.stock_price_history")

# Query examples

In [None]:
spark.sql("show tables from datalake").show()

In [None]:
spark.sql("select count(1) from datalake.stock_price_history").show()

# Delta Lake table update

In [None]:
spark.sql("DELETE FROM datalake.stock_price_history WHERE (volume % 1000) = 0").show()

# Delta Table Vacuum

In [None]:
table = DeltaTable.forPath(spark, "s3a://raw.datalake.mydomain.com/stock_price_history")

In [None]:
table.vacuum()

# Query Table for Version

In [None]:
spark.sql("SELECT count(1) as total FROM datalake.stock_price_history FOR VERSION AS OF 0").show()

In [None]:
spark.sql("SELECT count(1) as total FROM datalake.stock_price_history FOR VERSION AS OF 1").show()