## GOLD LAYER TRANSFORMATIONS

### CONNECT DATA LAKE 

In [0]:

spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<ms-entra-app-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", "<secret-value>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")

### IMPORT MODULES AND VARIABLES

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, avg, sum as sum_, abs as abs_, when
from pyspark.sql.window import Window
import os

### MAKE TRANSFORMATIONS

In [0]:

spark = SparkSession.builder.appName("StockDataPipeline").getOrCreate()

for crypto in ['btc', 'eth', 'doge', 'xrp', 'usdt']:
    print(f"Processing {crypto}...")

    silver_path = f"abfss://silver@<storage-account>.dfs.core.windows.net/crypto/{crypto}.csv"
    gold_path = f"abfss://gold@<storage-account>.dfs.core.windows.net/crypto/{crypto}.parquet"
    df = spark.read.option("header", "true").csv(silver_path)

    # Cast types
    df = df.withColumn("date", col("date").cast("date")) \
           .withColumn("close", col("close").cast("float")) \
           .withColumn("volume", col("volume").cast("int"))

    # Define window
    w = Window.partitionBy("name").orderBy("date")

    # Add enrichments
    df = df.withColumn("prev_close", lag("close", 1).over(w))
    df = df.withColumn("pct_change", (col("close") - col("prev_close")) / col("prev_close"))
    df = df.withColumn("rolling_avg_7d", avg("close").over(w.rowsBetween(-6, 0)))
    df = df.withColumn("rolling_volume_7d", sum_("volume").over(w.rowsBetween(-6, 0)))
    df = df.withColumn("price_spike_flag", when(
        abs_(col("close") - col("rolling_avg_7d")) / col("rolling_avg_7d") > 0.10, 1).otherwise(0)
    )

    # Drop temporary column
    df = df.drop("prev_close")

    # Write to gold layer (overwrite or append depending on your preference)
    df.write.mode("overwrite").parquet(gold_path)
    
    print(f"✅ {crypto} written to gold layer.")

Processing btc...
✅ btc written to gold layer.
Processing eth...
✅ eth written to gold layer.
Processing doge...
✅ doge written to gold layer.
Processing xrp...
✅ xrp written to gold layer.
Processing usdt...
✅ usdt written to gold layer.


In [0]:
# Create public csv for PowerBI access
from functools import reduce

print("🔁 Combining all crypto into one DataFrame...")

# Read each enriched parquet into a list of DataFrames
dfs = [
    spark.read.parquet(f"abfss://gold@<storage-account>.dfs.core.windows.net/crypto/{crypto}.parquet")
    for crypto in ['btc', 'eth', 'doge', 'xrp', 'usdt']
]

# Union all DataFrames into one
merged_df = reduce(lambda df1, df2: df1.unionByName(df2), dfs)

# Write as a single CSV file to the public folder
merged_df.coalesce(1).write.mode("overwrite").option("header", "true") \
    .csv("abfss://gold@<storage-account>.dfs.core.windows.net/public/gold_merged_temp")

print("✅ Merged CSV written to temporary folder.")

🔁 Combining all crypto into one DataFrame...
✅ Merged CSV written to temporary folder.
