 requests – Calls the stock API to get real-time data
🔹 pandas – Stores stock data before processing
🔹 PySpark (SparkSession, functions) – Handles big data processing
🔹 Plotly Express – Creates interactive visualizations

In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, window
import plotly.express as px
import time

# Initialize Spark Session
spark = SparkSession.builder.appName("StockMarketPipeline").getOrCreate()

# Alpha Vantage API details
API_KEY = "YOUR_ALPHA_VANTAGE_API_KEY"
STOCK_SYMBOL = "AAPL"
BASE_URL = "https://www.alphavantage.co/query"


In [0]:
# Function to fetch real-time stock data
def fetch_stock_data(symbol, api_key):
    params = {
        "function": "TIME_SERIES_INTRADAY",
        "symbol": symbol,
        "interval": "5min",
        "apikey": api_key
    }
    response = requests.get(BASE_URL, params=params)
    data = response.json()
    
    if "Time Series (5min)" not in data:
        return None
    
    time_series = data["Time Series (5min)"]
    records = []
    for timestamp, values in time_series.items():
        records.append({
            "datetime": timestamp,
            "open": float(values["1. open"]),
            "high": float(values["2. high"]),
            "low": float(values["3. low"]),
            "close": float(values["4. close"]),
            "volume": int(values["5. volume"])
        })
    return pd.DataFrame(records)


In [0]:
# Fetch data
stock_df = fetch_stock_data(STOCK_SYMBOL, API_KEY)
if stock_df is not None:
    print("Fetched Stock Data Successfully!")
else:
    print("Failed to fetch stock data.")


Fetched Stock Data Successfully!


In [0]:
# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(stock_df)

In [0]:
# Process data in Databricks using PySpark
processed_df = spark_df.withColumn("close", col("close").cast("double"))
windowed_df = processed_df.groupBy(window(col("datetime"), "5 minutes")).agg(avg("close").alias("avg_close"))

# Show processed data
windowed_df.show()


+--------------------+---------+
|              window|avg_close|
+--------------------+---------+
|{2025-03-25 19:15...|   224.07|
|{2025-03-25 19:10...|   224.07|
|{2025-03-25 19:50...|    224.1|
|{2025-03-25 19:30...|    224.1|
|{2025-03-25 19:35...| 224.0701|
|{2025-03-25 19:00...|   224.02|
|{2025-03-25 19:25...|    224.1|
|{2025-03-25 19:20...|   224.02|
|{2025-03-25 19:40...|   224.07|
|{2025-03-25 19:55...|   224.24|
|{2025-03-25 19:45...|   224.28|
|{2025-03-25 19:05...|   224.07|
|{2025-03-25 18:50...|   224.07|
|{2025-03-25 18:30...|   223.96|
|{2025-03-25 18:10...| 223.9999|
|{2025-03-25 18:15...|    224.1|
|{2025-03-25 18:00...|    223.9|
|{2025-03-25 18:40...|   224.07|
|{2025-03-25 17:55...|    223.9|
|{2025-03-25 18:25...|    224.0|
+--------------------+---------+
only showing top 20 rows



In [0]:

# Convert Spark DataFrame to Pandas for visualization
pandas_df = windowed_df.toPandas()

In [0]:
import plotly.express as px

pandas_df.dropna(subset=["window", "avg_close"], inplace=True)  # Remove missing values
pandas_df["window"] = pandas_df["window"].astype(str)  # Ensure window is string

fig = px.line(pandas_df, x="window", y="avg_close", title=f"{STOCK_SYMBOL} Stock Price Trends", markers=True)
fig.show()
