In [1]:
# Using  API to fetch real time stock data
import requests
import pandas as pd

def fetch_daily_stock_data(api_key, symbol):
    url = "https://www.alphavantage.co/query"
    params = {
        'function': 'TIME_SERIES_DAILY',
        'symbol': symbol,
        'apikey': api_key
    }

    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        data = response.json()

        # Extraction of Time Series Data
        time_series_data = data['Time Series (Daily)']

        # Dataframe conversion
        df = pd.DataFrame.from_dict(time_series_data, orient='index')
        # Convertion of index to datetime and sort by date
        df.index = pd.to_datetime(df.index)
        df.sort_index(inplace=True)

        # Optional, rename columns to more readable names
        df.columns = ['Open', 'High', 'Low', 'Close', 'Volume']

        # Data type conversion
        df = df.astype({
            'Open': 'float64',
            'High': 'float64',
            'Low': 'float64',
            'Close': 'float64',
            'Volume': 'int64'
        })

        return df

    except requests.exceptions.HTTPError as errh:
        print(f"HTTP Error: {errh}")
    except requests.exceptions.ConnectionError as errc:
        print(f"Error Connecting: {errc}")
    except requests.exceptions.Timeout as errt:
        print(f"Timeout Error: {errt}")
    except requests.exceptions.RequestException as err:
        print(f"Other Error: {err}")

# API credentials
api_key = "Q66H35ZTGXJ7NKG0"
stock_symbol = "AAPL"
df = fetch_daily_stock_data(api_key, stock_symbol)


print(df.head())

              Open    High      Low   Close    Volume
2023-07-19  193.10  198.23  192.650  195.10  80507323
2023-07-20  195.09  196.47  192.495  193.13  59581196
2023-07-21  194.10  194.97  191.230  191.94  71951683
2023-07-24  193.41  194.91  192.250  192.75  45505097
2023-07-25  193.33  194.44  192.915  193.62  37283201


In [13]:
df = df.reset_index().rename(columns={'index': 'Date'})

In [2]:
df

Unnamed: 0,Open,High,Low,Close,Volume
2023-07-19,193.10,198.23,192.6500,195.10,80507323
2023-07-20,195.09,196.47,192.4950,193.13,59581196
2023-07-21,194.10,194.97,191.2300,191.94,71951683
2023-07-24,193.41,194.91,192.2500,192.75,45505097
2023-07-25,193.33,194.44,192.9150,193.62,37283201
...,...,...,...,...,...
2023-12-01,190.33,191.56,189.2300,191.24,45704823
2023-12-04,189.98,190.05,187.4511,189.43,43389519
2023-12-05,190.21,194.40,190.1800,193.42,66628398
2023-12-06,194.45,194.76,192.1100,192.32,40895115


In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=8b845c969e3fcf7041be9dc7fe752464fc78fc4c736000f21ce08ef0dad4e168
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [14]:
# Environment setup
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Initializing Spark Session
spark = SparkSession.builder.appName("StockDataApp").getOrCreate()

# SparkContext from SparkSession
sc = spark.sparkContext

# Initializing StreamingContext with a batch duration of 5 seconds
ssc = StreamingContext(sc, batchDuration=5)



In [6]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
import pandas as pd

import os



# Ensuring SparkContext is initialized only once
if 'sc' not in globals():
    print("creating context")
    # SparkContext initialization correction
    sc = SparkContext('local', 'StockDataApp')
sqlContext = SQLContext(sc)


# Assuming 'df' is your provided pandas DataFrame
data = df

# Pandas DataFrame to Spark DataFrame conversion
sdf = sqlContext.createDataFrame(data)
print(sdf)
# different approach if collect() is causing issues
try:
    # Simulating streaming data
    rdd_queue = []
    for row in sdf.rdd.toLocalIterator():
        rdd_queue.append(sc.parallelize([row]))

    input_stream = ssc.queueStream(rdd_queue)
except Exception as e:
    print(f"Error: {e}")



DataFrame[Open: double, High: double, Low: double, Close: double, Volume: bigint]


In [7]:
def process(time, rdd):
    if not rdd.isEmpty():
        df = rdd.toDF()
        # Perform EDA, like showing basic statistics
        df.describe().show()

        # Window operations (e.g., moving average)
        windowed_df = df.withColumn("MovingAverage", avg("Close").over(Window.orderBy("Date").rangeBetween(-2, 0)))
        windowed_df.show()

input_stream.foreachRDD(process)

In [8]:
print(df.describe())

            Open        High         Low       Close        Volume
count  100.00000  100.000000  100.000000  100.000000  1.000000e+02
mean   181.51774  183.055909  180.079278  181.583750  5.655269e+07
std      8.04823    7.810586    8.075050    7.825078  1.567387e+07
min    166.91000  168.960000  165.670000  166.890000  2.404834e+07
25%    175.27750  177.244750  173.850000  175.677500  4.627709e+07
50%    179.37000  180.285000  177.465000  179.030000  5.339191e+07
75%    189.79500  190.335000  188.352500  189.692500  6.143993e+07
max    196.23500  198.230000  195.280000  196.450000  1.159568e+08


In [9]:
df.isnull()

Unnamed: 0,Open,High,Low,Close,Volume
2023-07-19,False,False,False,False,False
2023-07-20,False,False,False,False,False
2023-07-21,False,False,False,False,False
2023-07-24,False,False,False,False,False
2023-07-25,False,False,False,False,False
...,...,...,...,...,...
2023-12-01,False,False,False,False,False
2023-12-04,False,False,False,False,False
2023-12-05,False,False,False,False,False
2023-12-06,False,False,False,False,False


In [10]:
sdf.printSchema()

root
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)



In [11]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler


vectorAssembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol='features')
df_vector = vectorAssembler.transform(sdf).select(['features', 'Close'])

# Data splitting into train & test sets
train_data, test_data = df_vector.randomSplit([0.7, 0.3])

# Model creating & training
lr = LinearRegression(featuresCol='features', labelCol='Close')
lr_model = lr.fit(train_data)

# Model Evaluation
predictions = lr_model.transform(test_data)
predictions.select("prediction", "Close").show()

+------------------+------+
|        prediction| Close|
+------------------+------+
|175.80163947288787|177.56|
|177.19363389106456|177.23|
| 174.6384576562723| 174.0|
| 177.8684217152933|177.79|
| 178.9205136801972|179.07|
|180.67465262662583|181.12|
|178.13462034914633|177.45|
| 179.0553474232227|177.97|
| 176.1377017113623| 176.3|
|178.82684134153766|178.85|
| 183.5405207236848|181.99|
|188.42369902465745|187.87|
|191.32435150354732|191.17|
|192.18447835986126|191.94|
| 193.8128733984012|193.13|
| 195.7094405670107|196.45|
|167.57588598568444|166.89|
|172.86394226526195|173.97|
|173.48550002607305|173.75|
|172.61981999233387|173.44|
+------------------+------+
only showing top 20 rows



In [12]:
from pyspark.ml.evaluation import RegressionEvaluator

# RMSE evaluator
rmse_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Close", metricName="rmse")

# RMSE calculation
rmse = rmse_evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE) on test data = {rmse}")

# R-squared evaluator
r2_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Close", metricName="r2")

# R-squared Calculation
r2 = r2_evaluator.evaluate(predictions)
print(f"R-squared on test data = {r2}")

Root Mean Square Error (RMSE) on test data = 0.7281846575358627
R-squared on test data = 0.9906381979944923
