# 1. Preparation

In [0]:
# Libs
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
from pyspark.sql.types import DateType

#Datalake connection
blobAccessKey = dbutils.secrets.get(scope = "myscope", key = "accesskey")
spark.conf.set("fs.azure.account.key.datalakeetlproject.dfs.core.windows.net", 
               blobAccessKey) 
               
# Paths
path_bronze_profile = "abfss://bronze@datalakeetlproject.dfs.core.windows.net/profile/*.json"
path_bronze_historical = "abfss://bronze@datalakeetlproject.dfs.core.windows.net/historical/*.json"
path_silver_profile = "abfss://silver@datalakeetlproject.dfs.core.windows.net/profile/"
path_silver_historical = "abfss://silver@datalakeetlproject.dfs.core.windows.net/historical/"

# 2. Explore tables

## 2.1 Profile data

In [0]:
# Read data and show schema
df_bronze_profile = spark.read.json(path_bronze_profile)
df_bronze_profile.printSchema()

In [0]:
# Show data
display(df_bronze_profile.limit(5))

## 2.2 Hitorical data

In [0]:
# Read data and show schema
df_bronze_historical = spark.read.json(path_bronze_historical)
df_bronze_historical.printSchema()

In [0]:
# Expand to first level (historicalStockList)
df_level1 = df_bronze_historical.withColumn("stock", explode(col("historicalStockList")))

# Expand to sencond level (historical)
df_level2 = df_level1.withColumn("data", explode(col("stock.historical")))

# Select fields
df_bronze_historical = df_level2.select(
    col("stock.symbol"),
    col("data.date"),
    col("data.open"),
    col("data.high"),
    col("data.low"),
    col("data.close"),
    col("data.adjClose"),
    col("data.volume"),
    col("data.change"),
    col("data.changePercent"),
    col("data.vwap")
)

# Show data
display(df_bronze_historical.limit(20))

## 3. Data Transformation

## 3.1 Profile

In [0]:
# Copy 'profile' data from bronze layer
df_silver_profile = df_bronze_profile

# Select relevant fields and change column names
df_silver_profile_clean = df_silver_profile.select(
    col("symbol").alias("ticker"),
    col("companyName").alias("company_name"),
    col("sector"),
    col("industry"),
    col("country"),
    col("ceo").alias("ceo_name"),
    col("exchange"),
    col("mktCap").alias("market_cap"),
    col("price"),
    col("website"),
    col("description")
).dropDuplicates(["ticker"])



In [0]:
# display 'profile' data transformed
display(df_silver_profile_clean.limit(20))

In [0]:
# Save into Silver layer
df_silver_profile_clean.write.mode("overwrite").format("delta").save(path_silver_profile)

## 3.2 Historical

In [0]:
df_bronze_historical.printSchema()


In [0]:
# Copy 'historical' data from bronze layer
df_silver_historical = df_bronze_historical

# Expand arrays
df_silver_historical_level1 = df_silver_historical.withColumn("stock", explode(col("historicalStockList")))
df_silver_historical_level2 = df_silver_historical_level1.withColumn("data", explode(col("stock.historical")))

# Select relevant fields and change column names
df_silver_historical_clean = df_silver_historical.select(
    col("symbol").alias("ticker"),
    col("date").cast(DateType()).alias("date"),
    col("open"),
    col("high"),
    col("low"),
    col("close"),
    col("adjClose").alias("adj_close"),
    col("volume"),
    col("change"),
    col("changePercent").alias("change_percent"),
    col("vwap")
).dropDuplicates(["ticker", "date"])

In [0]:
# display 'historical' data transformed
display(df_silver_historical_clean.limit(20))

In [0]:
# Save into Silver layer
df_silver_historical_clean.write.mode("overwrite").format("delta").save(path_silver_historical)