# Delta Lake Data Exploration
This notebook explores the stock market data and tests the steps before implementing them in Python files.

In [None]:
import pyspark
from pyspark.sql import SparkSession
from delta import *
import pandas as pd
import os
from dotenv import load_dotenv

# Load environment variables if .env file exists
try:
    load_dotenv()
except:
    print("No .env file found, using default paths")

# Define data paths using environment variables or defaults
DATA_DIR = os.getenv("DATA_DIR", "data")
CSV_FILE_PATH = os.getenv("CSV_FILE_PATH", os.path.join(DATA_DIR, "tech_stocks.csv"))
DELTA_TABLE_PATH = os.getenv("DELTA_TABLE_PATH", os.path.join(DATA_DIR, "delta_tables/tech_stocks"))

# Ensure directories exist
os.makedirs(os.path.dirname(CSV_FILE_PATH), exist_ok=True)
os.makedirs(DELTA_TABLE_PATH, exist_ok=True)

In [2]:
spark = (SparkSession.builder
    .appName("DeltaLakeProcessing")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0")
    .getOrCreate())

25/05/01 19:12:06 WARN Utils: Your hostname, abdelhalim resolves to a loopback address: 127.0.1.1; using 192.168.1.6 instead (on interface enp8s0)
25/05/01 19:12:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/abdelhalim/Desktop/Temp%20/StockMarketAnalysis/stock/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/abdelhalim/.ivy2/cache
The jars for the packages stored in: /home/abdelhalim/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-eac916b9-324a-40ff-8a75-23f68dcc00d3;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.0/delta-core_2.12-2.1.0.jar ...
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.0/delta-core_2.12-2.1.0.jar ...
	[SUCCESSFUL ] io.delta#delta-core_2.12;2.1.0!delta-core_2.12.jar (1188ms)
downloading h

In [3]:
DELTA_TABLE_PATH = "data/delta_tables"
CSV_FILE_PATH = "/home/abdelhalim/Desktop/Temp /StockMarketAnalysis/data/tech_stocks.csv"

In [None]:
# Load the CSV file
print(f"Loading data from {CSV_FILE_PATH}")
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(CSV_FILE_PATH)
df.show(5)

25/05/01 19:12:29 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


+----------+------------------+------------------+------------------+------------------+--------+------+
|      Date|              Open|              High|               Low|             Close|  Volume|Ticker|
+----------+------------------+------------------+------------------+------------------+--------+------+
|2024-03-01| 178.7061901764464| 179.6815803178056|176.54639012704013|  178.815673828125|73488000|  AAPL|
|2024-03-04|175.32213749041347| 176.0686126971914|172.97322823226915|174.27708435058594|81510100|  AAPL|
|2024-03-05|169.95748724050745| 171.2314705105848|168.82284541599856|169.32049560546875|95132400|  AAPL|
|2024-03-06|  170.256065044738|170.43522699407845|   167.88724549652|168.32518005371094|68587700|  AAPL|
|2024-03-07|168.35505375316498|169.92763018116116| 167.6981670423948| 168.2057647705078|71765100|  AAPL|
+----------+------------------+------------------+------------------+------------------+--------+------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col

# Add YearMonth column for partitioning
df = df.withColumn("YearMonth", col("Date").substr(0, 7))

# Write to Delta Lake with partitioning
print(f"Saving data to {DELTA_TABLE_PATH}")
df.write.format("delta").partitionBy("Ticker", "YearMonth").mode("overwrite").save(DELTA_TABLE_PATH)
print("✅ Data partitioned and saved in Delta Lake format.")

In [None]:
# Verify data was saved correctly
df_read = spark.read.format("delta").load(DELTA_TABLE_PATH)
print(f"Row count: {df_read.count()}")
df_read.show(5)