# Spark DataFrame 

In [111]:
import os
from dotenv import load_dotenv
import requests
import json
import findspark
findspark.init('C:\BigData\spark-3.1.2-bin-hadoop3.2')

In [112]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, to_date
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, MapType, DoubleType
from pyspark.sql.functions import count, sum, avg, min, max

In [113]:
load_dotenv()
# Retrieve the API key from environment variables
api_key = os.getenv("ALPHA_VANTAGE_API_KEY")

In [114]:
# Fetch JSON data from API
url = "https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=JPM&apikey={api_key}&outputsize=full"
response = requests.get(url)
json_data = response.json()

# Print a portion of the JSON data to understand its structure
print(json.dumps(json_data, indent=4)[:2000])

{
    "Meta Data": {
        "1. Information": "Daily Prices (open, high, low, close) and Volumes",
        "2. Symbol": "JPM",
        "3. Last Refreshed": "2024-07-12",
        "4. Output Size": "Full size",
        "5. Time Zone": "US/Eastern"
    },
    "Time Series (Daily)": {
        "2024-07-12": {
            "1. open": "204.0000",
            "2. high": "207.4500",
            "3. low": "202.1000",
            "4. close": "204.9400",
            "5. volume": "15443441"
        },
        "2024-07-11": {
            "1. open": "206.2100",
            "2. high": "208.1000",
            "3. low": "205.3800",
            "4. close": "207.4500",
            "5. volume": "10666937"
        },
        "2024-07-10": {
            "1. open": "206.1400",
            "2. high": "207.9700",
            "3. low": "205.5800",
            "4. close": "207.8000",
            "5. volume": "8328493"
        },
        "2024-07-09": {
            "1. open": "205.6300",
            "2. high": "20

In [51]:
# Define the schema
schema = StructType([
    StructField("Meta Data", StructType([
        StructField("1. Information", StringType(), True),
        StructField("2. Symbol", StringType(), True),
        StructField("3. Last Refreshed", StringType(), True),
        StructField("4. Output Size", StringType(), True),
        StructField("5. Time Zone", StringType(), True)
    ]), True),
    StructField("Time Series (Daily)", MapType(StringType(), StructType([
        StructField("1. open", StringType(), True),
        StructField("2. high", StringType(), True),
        StructField("3. low", StringType(), True),
        StructField("4. close", StringType(), True),
        StructField("5. volume", StringType(), True)
    ])), True)
])

In [44]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("StockMarketDataAnalysis") \
    .getOrCreate()

In [52]:
# Convert JSON data to a Spark DataFrame with schema
df = spark.read.schema(schema).json(spark.sparkContext.parallelize([json_data]))

In [53]:
# Show the schema of the DataFrame
df.printSchema()

root
 |-- Meta Data: struct (nullable = true)
 |    |-- 1. Information: string (nullable = true)
 |    |-- 2. Symbol: string (nullable = true)
 |    |-- 3. Last Refreshed: string (nullable = true)
 |    |-- 4. Output Size: string (nullable = true)
 |    |-- 5. Time Zone: string (nullable = true)
 |-- Time Series (Daily): map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- 1. open: string (nullable = true)
 |    |    |-- 2. high: string (nullable = true)
 |    |    |-- 3. low: string (nullable = true)
 |    |    |-- 4. close: string (nullable = true)
 |    |    |-- 5. volume: string (nullable = true)



In [None]:
#df.show(truncate=False)

In [68]:
# Flatten the nested structure
df_flat = df.select(explode(col("Time Series (Daily)")).alias("date", "data"))

In [69]:
# Adjust the column selection based on the schema using backticks
df_flat = df_flat.select(
    col("date"),
    col("data.`1. open`").alias("open").cast(DoubleType()),
    col("data.`2. high`").alias("high").cast(DoubleType()),
    col("data.`3. low`").alias("low").cast(DoubleType()),
    col("data.`4. close`").alias("close").cast(DoubleType()),
    col("data.`5. volume`").alias("volume").cast(DoubleType())
)

df_flat.show()

+----------+------+------+--------+------+-----------+
|      date|  open|  high|     low| close|     volume|
+----------+------+------+--------+------+-----------+
|2024-07-12| 204.0|207.45|   202.1|204.94|1.5443441E7|
|2024-07-11|206.21| 208.1|  205.38|207.45|1.0666937E7|
|2024-07-10|206.14|207.97|  205.58| 207.8|  8328493.0|
|2024-07-09|205.63|209.76|  205.45|207.63|  9060258.0|
|2024-07-08|205.04| 206.9|  203.97|205.17|  8706967.0|
|2024-07-05|206.99|207.37|  204.52|204.79|  8093096.0|
|2024-07-03|209.55|210.38|  207.65|208.69|  5560925.0|
|2024-07-02|205.29|208.86|  204.77|208.83|  7802936.0|
|2024-07-01|202.84|207.09|  202.66|205.45|1.0205836E7|
|2024-06-28|200.01| 202.6|199.3018|202.26|1.5307616E7|
|2024-06-27|197.44|199.86|   196.9|199.17|  7913453.0|
|2024-06-26|197.45|197.94| 196.275|197.43|  7758582.0|
|2024-06-25|198.09|200.07|  197.74|198.07|  6915909.0|
|2024-06-24|197.81|199.23|   197.1|198.88|  9785929.0|
|2024-06-21|196.71|197.17|  194.22| 196.3|2.0972495E7|
|2024-06-2

In [70]:
df_flat.printSchema()

root
 |-- date: string (nullable = false)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)



In [73]:
# Convert date column to date type
df_flat = df_flat.withColumn("symbol", lit("JPM"))

In [74]:
df_flat.describe().show()

+-------+----------+-----------------+-----------------+-----------------+-----------------+--------------------+------+
|summary|      date|             open|             high|              low|            close|              volume|symbol|
+-------+----------+-----------------+-----------------+-----------------+-----------------+--------------------+------+
|  count|      6213|             6213|             6213|             6213|             6213|                6213|  6213|
|   mean|      null| 69.9840859488169|70.73050180267185|69.20185910188319|69.97511115403191|1.9603391131981328E7|  null|
| stddev|      null|41.86119984262006|42.17952110198497|41.59444170915082| 41.8983194337179|1.8591209702773754E7|  null|
|    min|1999-11-01|            15.35|            16.35|            14.96|            15.45|            773600.0|   JPM|
|    max|2024-07-12|           209.55|           210.38|           207.65|           208.83|          2.172942E8|   JPM|
+-------+----------+------------

In [75]:
# Perform the pivot operation
pivot_df = df_flat.groupBy("date").pivot("symbol").sum("close")
pivot_df.show()

+----------+-------+
|      date|    JPM|
+----------+-------+
|1999-11-18|  83.37|
|2020-02-26| 126.64|
|2019-08-23| 106.02|
|2016-08-17|  65.89|
|2009-06-23|  33.57|
|2008-11-19|  28.47|
|2014-05-27|  55.14|
|2022-10-05| 110.39|
|2007-09-13|   45.6|
|2002-05-13|  36.34|
|2010-09-24|39.7525|
|2007-03-06|  48.52|
|2003-05-29|  32.27|
|2019-08-22| 108.72|
|2017-12-05| 105.72|
|2001-11-16|  39.41|
|2023-05-18|  139.5|
|2008-12-03|  30.25|
|2002-06-21|  32.99|
|2009-12-04|  41.74|
+----------+-------+
only showing top 20 rows



In [76]:
# Check average close price by date
agg_df = df_flat.groupBy("date").agg(avg("close").alias("avg_close"))
agg_df.show()

+----------+---------+
|      date|avg_close|
+----------+---------+
|2024-01-19|   170.31|
|2023-05-18|    139.5|
|2023-05-01|    141.2|
|2022-10-05|   110.39|
|2021-11-03|   170.53|
|2020-04-13|    98.19|
|2020-02-26|   126.64|
|2019-08-23|   106.02|
|2019-08-22|   108.72|
|2019-08-08|   109.86|
|2017-12-05|   105.72|
|2016-08-17|    65.89|
|2015-05-01|    63.61|
|2014-05-27|    55.14|
|2013-03-14|     51.0|
|2010-09-24|  39.7525|
|2010-02-12|    38.95|
|2009-12-04|    41.74|
|2009-06-23|    33.57|
|2008-12-03|    30.25|
+----------+---------+
only showing top 20 rows



In [60]:
# Filter rows based on a condition
df_filtered = df_flat.filter(df_flat["close"] > 143.00)
df_filtered.show()

+----------+--------+--------+--------+------+--------+
|      date|    open|    high|     low| close|  volume|
+----------+--------+--------+--------+------+--------+
|2024-07-12|204.0000|207.4500|202.1000|204.94|15443441|
|2024-07-11|206.2100|208.1000|205.3800|207.45|10666937|
|2024-07-10|206.1400|207.9700|205.5800| 207.8| 8328493|
|2024-07-09|205.6300|209.7600|205.4500|207.63| 9060258|
|2024-07-08|205.0400|206.9000|203.9700|205.17| 8706967|
|2024-07-05|206.9900|207.3700|204.5200|204.79| 8093096|
|2024-07-03|209.5500|210.3800|207.6500|208.69| 5560925|
|2024-07-02|205.2900|208.8600|204.7700|208.83| 7802936|
|2024-07-01|202.8400|207.0900|202.6600|205.45|10205836|
|2024-06-28|200.0100|202.6000|199.3018|202.26|15307616|
|2024-06-27|197.4400|199.8600|196.9000|199.17| 7913453|
|2024-06-26|197.4500|197.9400|196.2750|197.43| 7758582|
|2024-06-25|198.0900|200.0700|197.7400|198.07| 6915909|
|2024-06-24|197.8100|199.2300|197.1000|198.88| 9785929|
|2024-06-21|196.7100|197.1700|194.2200| 196.3|20

In [61]:
# Handle null or invalid values in close column
df_filtered = df_filtered.filter(df_filtered["close"].isNotNull())
df_filtered.show()

+----------+--------+--------+--------+------+--------+
|      date|    open|    high|     low| close|  volume|
+----------+--------+--------+--------+------+--------+
|2024-07-12|204.0000|207.4500|202.1000|204.94|15443441|
|2024-07-11|206.2100|208.1000|205.3800|207.45|10666937|
|2024-07-10|206.1400|207.9700|205.5800| 207.8| 8328493|
|2024-07-09|205.6300|209.7600|205.4500|207.63| 9060258|
|2024-07-08|205.0400|206.9000|203.9700|205.17| 8706967|
|2024-07-05|206.9900|207.3700|204.5200|204.79| 8093096|
|2024-07-03|209.5500|210.3800|207.6500|208.69| 5560925|
|2024-07-02|205.2900|208.8600|204.7700|208.83| 7802936|
|2024-07-01|202.8400|207.0900|202.6600|205.45|10205836|
|2024-06-28|200.0100|202.6000|199.3018|202.26|15307616|
|2024-06-27|197.4400|199.8600|196.9000|199.17| 7913453|
|2024-06-26|197.4500|197.9400|196.2750|197.43| 7758582|
|2024-06-25|198.0900|200.0700|197.7400|198.07| 6915909|
|2024-06-24|197.8100|199.2300|197.1000|198.88| 9785929|
|2024-06-21|196.7100|197.1700|194.2200| 196.3|20

In [63]:
# Add a new column for the price range (high - low)
df_with_range = df_filtered.withColumn("price_range", df_filtered["high"] - df_filtered["low"])
df_with_range.show()

+----------+--------+--------+--------+------+--------+------------------+
|      date|    open|    high|     low| close|  volume|       price_range|
+----------+--------+--------+--------+------+--------+------------------+
|2024-07-12|204.0000|207.4500|202.1000|204.94|15443441| 5.349999999999994|
|2024-07-11|206.2100|208.1000|205.3800|207.45|10666937| 2.719999999999999|
|2024-07-10|206.1400|207.9700|205.5800| 207.8| 8328493|2.3899999999999864|
|2024-07-09|205.6300|209.7600|205.4500|207.63| 9060258| 4.310000000000002|
|2024-07-08|205.0400|206.9000|203.9700|205.17| 8706967| 2.930000000000007|
|2024-07-05|206.9900|207.3700|204.5200|204.79| 8093096|2.8499999999999943|
|2024-07-03|209.5500|210.3800|207.6500|208.69| 5560925|2.7299999999999898|
|2024-07-02|205.2900|208.8600|204.7700|208.83| 7802936| 4.090000000000003|
|2024-07-01|202.8400|207.0900|202.6600|205.45|10205836| 4.430000000000007|
|2024-06-28|200.0100|202.6000|199.3018|202.26|15307616|3.2982000000000085|
|2024-06-27|197.4400|199.

In [65]:
df_flat.groupBy("date").agg(
    count("*").alias("count"),
    sum("close").alias("sum_close"),
    avg("close").alias("avg_close"),
    min("close").alias("min_close"),
    max("close").alias("max_close")
).show()

+----------+-----+---------+---------+---------+---------+
|      date|count|sum_close|avg_close|min_close|max_close|
+----------+-----+---------+---------+---------+---------+
|2023-06-22|    1|   139.58|   139.58|   139.58|   139.58|
|2022-03-28|    1|   140.87|   140.87|   140.87|   140.87|
|2021-10-11|    1|   166.64|   166.64|   166.64|   166.64|
|2021-08-27|    1|   163.05|   163.05|   163.05|   163.05|
|2021-06-22|    1|   150.21|   150.21|   150.21|   150.21|
|2021-01-27|    1|   127.86|   127.86|   127.86|   127.86|
|2020-08-24|    1|   100.06|   100.06|   100.06|   100.06|
|2019-06-04|    1|   109.74|   109.74|   109.74|   109.74|
|2019-05-08|    1|   112.61|   112.61|   112.61|   112.61|
|2018-08-10|    1|   115.73|   115.73|   115.73|   115.73|
|2017-09-11|    1|    89.79|    89.79|    89.79|    89.79|
|2017-08-11|    1|    91.42|    91.42|    91.42|    91.42|
|2016-03-01|    1|     59.2|     59.2|     59.2|     59.2|
|2015-05-19|    1|    67.01|    67.01|    67.01|    67.0

# Machine Learning with Spark MLlib

In [91]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [92]:
df_flat = df_flat.dropna()

In [94]:
# Feature Engineering
feature_columns = ["open", "high", "low", "volume"]
if 'features' in df_flat.columns:
    df_flat = df_flat.drop('features')
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_flat = assembler.transform(df_flat)

In [96]:
# Scale features: StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
if 'scaledFeatures' in df_flat.columns:
    df_flat = df_flat.drop('scaledFeatures')
scaler_model = scaler.fit(df_flat)
df_flat = scaler_model.transform(df_flat)

In [99]:
# Split the data into training and test sets
train_data, test_data = df_flat.randomSplit([0.8, 0.2], seed=42)

In [104]:
if 'scaledFeatures' in train_data.columns:
    train_data = train_data.drop('scaledFeatures')

In [105]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="close")
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=param_grid,
                          evaluator=RegressionEvaluator(labelCol="close"),
                          numFolds=3)

train_data = scaler_model.transform(train_data)

cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel

In [106]:
predictions = lr_model.transform(test_data)
predictions.select("features", "close", "prediction").show()

+--------------------+-----+-----------------+
|            features|close|       prediction|
+--------------------+-----+-----------------+
|[83.62,84.0,82.19...|82.44|82.83237680222356|
|[82.38,84.12,81.9...|83.37|83.41690041734047|
|[82.5,82.56,80.69...|81.87|81.17370778982134|
|[79.75,79.94,78.7...|79.56|79.13507468370776|
|[77.13,78.0,76.56...|77.39|77.37954473045431|
|[80.94,83.37,80.8...| 82.0| 82.7587705661341|
|[78.5,79.19,77.37...|78.63|78.18594276435968|
|[78.12,79.25,77.7...|78.53|78.72642147872777|
|[72.63,73.5,71.0,...|72.75|72.08758316950515|
|[75.38,79.0,75.0,...|78.44|77.96111994700654|
|[79.44,81.5,78.56...|80.69|80.40265816580782|
|[83.5,84.87,80.25...|83.19|82.13642209650591|
|[83.13,83.75,81.3...|83.13|82.29323448514629|
|[77.25,77.56,74.6...| 76.0| 75.5110520632515|
|[79.88,82.0,78.87...| 82.0|80.79389053793474|
|[82.94,84.38,82.2...|83.44|83.55023622206548|
|[80.5,80.94,78.5,...| 79.0|79.33492438929345|
|[90.0,90.5,84.5,5...|86.56|86.25613355106269|
|[97.0,100.75

In [108]:
# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator
predictions = best_model.transform(test_data)

evaluator_rmse = RegressionEvaluator(labelCol="close", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

evaluator_r2 = RegressionEvaluator(labelCol="close", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R²: {r2}")

# Calculating accuracy as the inverse of error for regression
accuracy = 1 - rmse / df_flat.select("close").rdd.map(lambda row: row[0]).mean()
print(f"Accuracy: {accuracy}")

Root Mean Squared Error (RMSE): 0.6349903628665771
R²: 0.9997737308447905
Accuracy: 0.990925482612399
