In [0]:
pip install yfinance

In [0]:
pip install s3fs

In [0]:
dbutils.library.restartPython()

In [0]:
import yfinance as yf
import pandas as pd
from pathlib import PurePosixPath

In [0]:
#Check the file info of AWS S3
dbutils.fs.ls("s3a://databricks-stock-project-2025-10-02/")

In [0]:
import yfinance as yf
import pandas as pd
import os
import time
from datetime import datetime
import boto3
from botocore.exceptions import ClientError

# === CONFIG ===
bucket = "databricks-stock-project-2025-10-02"
prefix = "raw/stocks"
tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "TSLA", "NFLX", "AVGO", "AMD"]
start = "2015-01-01"

# === AWS S3 Client ===
s3 = boto3.client("s3")

def s3_exists(bucket, key):
    """Check if S3 object already exists"""
    try:
        s3.head_object(Bucket=bucket, Key=key)
        return True
    except ClientError:
        return False

# === MAIN LOOP ===
for t in tickers:
    print(f"\n⬇️ Downloading {t} ...")

    # --- Download Yahoo Finance Data ---
    try:
        df = yf.download(t, start=start, interval="1d", auto_adjust=True, progress=False)
        if df.empty:
            print(f" {t} No data, pass")
            continue
    except Exception as e:
        print(f"Error occurred while downloading {t}: {e}")
        continue

    df = df.reset_index()

    # --- Flatten MultiIndex Column ---
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = [
            "_".join([str(c) for c in col if c]).strip().lower().replace(" ", "_")
            for col in df.columns
        ]
    else:
        df.columns = [str(c).lower().replace(" ", "_") for c in df.columns]

    # --- Standardize column names (remove ticker suffix, e.g., close_nvda → close)---
    df.columns = [c.replace(f"_{t.lower()}", "") for c in df.columns]

    # --- Ensure all required columns exist ---
    required_cols = ["date", "open", "high", "low", "close", "adj_close", "volume"]
    for col_name in required_cols:
        if col_name not in df.columns:
            df[col_name] = None  # Fill with None if column doesn't exist

    # --- Add metadata column ---
    df["ticker"] = t
    df["source"] = "yfinance"
    df["interval"] = "1d"
    df["ingestion_date"] = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    df["is_valid"] = True

    # --- Reorder columns ---
    df = df[
        [
            "date",
            "open",
            "high",
            "low",
            "close",
            "adj_close",
            "volume",
            "ticker",
            "source",
            "interval",
            "ingestion_date",
            "is_valid",
        ]
    ]

    # --- Save temporary CSV locally ---
    local_file = f"/tmp/stocks_{t}.csv"
    df.to_csv(local_file, index=False)

    # --- Define S3 target path ---
    s3_key = f"{prefix}/ticker={t}/stocks_{t}.csv"

    # --- Check if file already exists on S3 ---
    if s3_exists(bucket, s3_key):
        print(f"{t} already exists in S3, will overwrite.")

    # --- Upload file to S3 ---
    try:
        s3.upload_file(local_file, bucket, s3_key)
        print(f"Successfully uploaded to s3://{bucket}/{s3_key}")
    except Exception as e:
            print(f"Upload failed for {t}: {e}")
        continue
    finally:
        # Remove temporary file
        os.remove(local_file)

    # --- Sleep to prevent API rate limiting ---
    time.sleep(1)


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year

spark = SparkSession.builder.getOrCreate()

# 1. Read raw data
df = spark.read.option("header", True).csv("s3://databricks-stock-project-2025-10-02/raw/stocks/")

# 2️. Data cleaning and type casting
df_clean = (df
    .dropna(subset=["close"])                 # Drop rows where 'close' is null
    .withColumn("close", col("close").cast("double"))
    .withColumn("open", col("open").cast("double"))
    .withColumn("high", col("high").cast("double"))
    .withColumn("low", col("low").cast("double"))
    .withColumn("volume", col("volume").cast("bigint"))
    .withColumn("year", year(col("date")))    # Extract year for partitioning
)

# 3. Write processed data (Parquet format + partitioned)
df_clean.write.mode("overwrite") \
    .partitionBy("ticker", "year") \
    .parquet("s3://databricks-stock-project-2025-10-02/processed/stocks/")


In [0]:
df.printSchema()

In [0]:
df = spark.read.option("header", True).csv("s3://databricks-stock-project-2025-10-02/raw/stocks/")
df.select("ticker", "date", "close").show(5)

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read raw data directly from S3
df = spark.read.option("header", True).csv("s3://databricks-stock-project-2025-10-02/raw/stocks/")

df.printSchema()
df.show(5)

In [0]:
# Read Parquet data from the processed layer
df_clean = spark.read.parquet("s3://databricks-stock-project-2025-10-02/processed/stocks/")

# View schema
df_clean.printSchema()

# Preview data
df_clean.show(5)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, avg, stddev, lag, when
)

spark = SparkSession.builder.getOrCreate()

# Step 1. Read the processed layer (df_clean)
# df_clean = spark.read.parquet("s3://databricks-stock-project-2025-10-02/processed/stocks/")

# === Define window specifications ===
w_ticker = Window.partitionBy("ticker").orderBy("date")
window_sma20 = w_ticker.rowsBetween(-19, 0)
window_sma50 = w_ticker.rowsBetween(-49, 0)
window_sma200 = w_ticker.rowsBetween(-199, 0)
window_rsi = w_ticker.rowsBetween(-13, 0)
window_vol = w_ticker.rowsBetween(-19, 0)
window_macd12 = w_ticker.rowsBetween(-11, 0)
window_macd26 = w_ticker.rowsBetween(-25, 0)
window_signal = w_ticker.rowsBetween(-8, 0)

# Step 2. 
# === Technical indicator calculations ===

# Moving averages (SMA)
df_feat = (
    df_clean
    # Moving averages (SMA)
    .withColumn("sma_20", avg(col("close")).over(window_sma20))
    .withColumn("sma_50", avg(col("close")).over(window_sma50))
    .withColumn("sma_200", avg(col("close")).over(window_sma200))
)

# 20-day volatility
df_feat = df_feat.withColumn("prev_close", lag("close").over(w_ticker))
df_feat = df_feat.withColumn("return", (col("close") - col("prev_close")) / col("prev_close"))
df_feat = df_feat.withColumn("volatility_20", stddev(col("return")).over(window_vol))

# RSI(14)
df_feat = df_feat.withColumn("delta", col("close") - lag("close").over(w_ticker))
df_feat = df_feat.withColumn("gain", when(col("delta") > 0, col("delta")).otherwise(0.0))
df_feat = df_feat.withColumn("loss", when(col("delta") < 0, -col("delta")).otherwise(0.0))
df_feat = df_feat.withColumn("avg_gain", avg(col("gain")).over(window_rsi))
df_feat = df_feat.withColumn("avg_loss", avg(col("loss")).over(window_rsi))
df_feat = df_feat.withColumn("rs", col("avg_gain") / col("avg_loss"))
df_feat = df_feat.withColumn("rsi", 100 - (100 / (1 + col("rs"))))

# MACD(12,26,9) — simplified version using rolling mean
df_feat = df_feat.withColumn("ema12", avg(col("close")).over(window_macd12))
df_feat = df_feat.withColumn("ema26", avg(col("close")).over(window_macd26))
df_feat = df_feat.withColumn("macd", col("ema12") - col("ema26"))
df_feat = df_feat.withColumn("signal_line", avg(col("macd")).over(window_signal))

# Bollinger Bands
df_feat = df_feat.withColumn("bb_mid", avg(col("close")).over(window_sma20))
df_feat = df_feat.withColumn("bb_std", stddev(col("close")).over(window_sma20))
df_feat = df_feat.withColumn("bollinger_upper", col("bb_mid") + 2 * col("bb_std"))
df_feat = df_feat.withColumn("bollinger_lower", col("bb_mid") - 2 * col("bb_std"))

# Volume moving average
df_feat = df_feat.withColumn("vol_ma_20", avg(col("volume")).over(window_vol))

# Simple buy/sell flags
df_feat = df_feat.withColumn("buy_flag", (col("sma_20") > col("sma_50")) & (col("rsi") < 30))
df_feat = df_feat.withColumn("sell_flag", (col("sma_20") < col("sma_50")) & (col("rsi") > 70))

# Golden Cross / Death Cross
df_feat = df_feat.withColumn("prev_sma50", lag("sma_50").over(w_ticker))
df_feat = df_feat.withColumn("prev_sma200", lag("sma_200").over(w_ticker))
df_feat = df_feat.withColumn(
    "golden_cross",
    (col("sma_50") > col("sma_200")) & (col("prev_sma50") <= col("prev_sma200"))
)
df_feat = df_feat.withColumn(
    "death_cross",
    (col("sma_50") < col("sma_200")) & (col("prev_sma50") >= col("prev_sma200"))
)

# Step 3. Write to the curated layer
df_feat.write.mode("overwrite") \
    .partitionBy("ticker", "year") \
    .parquet("s3://databricks-stock-project-2025-10-02/curated/stocks_features/")

print("Feature engineering completed!")


In [0]:
# df_feat = spark.read.parquet("s3://databricks-stock-project-2025-10-02/curated/stocks_features/")
df_feat.select("ticker", "date", "close", "sma_20", "rsi", "macd", "bollinger_upper").show(10)


In [0]:
df_raw = spark.read.option("header", True).csv("s3://databricks-stock-project-2025-10-02/raw/stocks/")
df_raw.select("ticker").distinct().show()
df_raw.count()

In [0]:
df_clean = spark.read.parquet("s3://databricks-stock-project-2025-10-02/processed/stocks/")
df_clean.printSchema()
df_clean.select("ticker", "year").distinct().orderBy("ticker", "year").show()

In [0]:
df_feat = spark.read.parquet("s3://databricks-stock-project-2025-10-02/curated/stocks_features/")
df_feat.select("ticker", "date", "close", "sma_20", "rsi", "macd").show(10)

In [0]:
#Check RSI 
df_feat.select("rsi").summary().show()

In [0]:
#Check SMA
df_feat.filter(col("ticker")=="AAPL").select("date","close","sma_20","sma_50").orderBy("date").show(10)

In [0]:
#Check Golden/Dead Flag
df_feat.filter((col("ticker")=="NVDA") & (col("golden_cross")==True)).select("date","sma_50","sma_200").show(5)

In [0]:
#Check Boolinger
df_feat.filter(col("ticker")=="TSLA").select("date","close","bollinger_upper","bollinger_lower").show(10)

In [0]:
#Check total court
df_feat.groupBy("ticker").count().orderBy("ticker").show()

In [0]:
#Check performance()
df_feat.select("ticker").distinct().count()

In [0]:
df_feat.filter(col("ticker")=="AAPL").count()

In [0]:
#Check missing value
from pyspark.sql.functions import count, when, isnan
df_feat.select([count(when(col(c).isNull(), c)).alias(c) for c in df_feat.columns]).show()

In [0]:
#Simple Visualization
display(df_feat.filter(col("ticker")=="NVDA").select("date","close","sma_20","sma_50"))