In [1]:
# Fetching real-time daily stock data from an API
import requests
import pandas as pd

def fetch_daily_stock_data(api_key, symbol):
    url = "https://www.alphavantage.co/query"
    params = {
        'function': 'TIME_SERIES_DAILY',
        'symbol': symbol,
        'apikey': api_key
    }

    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        data = response.json()

        # Retrieving the time series information
        time_series_data = data['Time Series (Daily)']

        # Transforming into a DataFrame
        df = pd.DataFrame.from_dict(time_series_data, orient='index')
        # Converting the index to datetime and sorting by date
        df.index = pd.to_datetime(df.index)
        df.sort_index(inplace=True)

        # Optionally, renaming columns for improved readability
        df.columns = ['Open', 'High', 'Low', 'Close', 'Volume']

        # Converting column data types
        df = df.astype({
            'Open': 'float64',
            'High': 'float64',
            'Low': 'float64',
            'Close': 'float64',
            'Volume': 'int64'
        })

        return df

    except requests.exceptions.HTTPError as errh:
        print(f"HTTP Error: {errh}")
    except requests.exceptions.ConnectionError as errc:
        print(f"Error Connecting: {errc}")
    except requests.exceptions.Timeout as errt:
        print(f"Timeout Error: {errt}")
    except requests.exceptions.RequestException as err:
        print(f"Other Error: {err}")

# API credentials
api_key = "SS94UTZVB6T3KCNX"
stock_symbol = "META"
df = fetch_daily_stock_data(api_key, stock_symbol)

# Displaying the initial rows of the DataFrame
print(df.head())

              Open    High       Low   Close    Volume
2023-07-19  313.03  318.68  310.5200  316.01  21763688
2023-07-20  313.50  315.54  302.2200  302.52  23836876
2023-07-21  304.57  305.46  291.2000  294.26  42139259
2023-07-24  295.78  297.52  288.3001  291.61  24949405
2023-07-25  295.19  298.30  291.8600  294.47  19585584


In [2]:
df = df.reset_index().rename(columns={'index': 'Date'})

In [3]:
df

Unnamed: 0,Date,Open,High,Low,Close,Volume
0,2023-07-19,313.03,318.68,310.5200,316.01,21763688
1,2023-07-20,313.50,315.54,302.2200,302.52,23836876
2,2023-07-21,304.57,305.46,291.2000,294.26,42139259
3,2023-07-24,295.78,297.52,288.3001,291.61,24949405
4,2023-07-25,295.19,298.30,291.8600,294.47,19585584
...,...,...,...,...,...,...
95,2023-12-01,325.48,326.86,320.7600,324.82,15276375
96,2023-12-04,317.29,320.86,313.6600,320.02,19037080
97,2023-12-05,318.98,321.88,315.3900,318.29,16952128
98,2023-12-06,321.93,322.25,317.0400,317.45,11264491


In [4]:
# Setting up the environment
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Initiating the Spark Session
spark = SparkSession.builder.appName("StockDataApp").getOrCreate()

# Obtaining SparkContext from the SparkSession
sc = spark.sparkContext

# Initializing StreamingContext with a batch duration of 1 second
ssc = StreamingContext(sc, batchDuration=1)

# 'spark' can now be used for DataFrame operations, and 'ssc' for streaming



In [5]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
import pandas as pd

import os

# Ensuring single initialization of SparkContext
if 'sc' not in globals():
    print("creating context")
    # Rectifying the initialization of SparkContext
    sc = SparkContext('local', 'StockDataApp')  # 'local' specifies to run Spark locally
sqlContext = SQLContext(sc)


# Assuming 'df' is the supplied pandas DataFrame
data = df

# Converting pandas DataFrame to Spark DataFrame
sdf = sqlContext.createDataFrame(data)
print(sdf)
# If there are challenges with collect(), consider an alternative approach
try:
    # Simulating streaming data
    rdd_queue = []
    for row in sdf.rdd.toLocalIterator():
        rdd_queue.append(sc.parallelize([row]))

    input_stream = ssc.queueStream(rdd_queue)
except Exception as e:
    print(f"Error: {e}")



DataFrame[Date: timestamp, Open: double, High: double, Low: double, Close: double, Volume: bigint]


In [6]:
def process(time, rdd):
    if not rdd.isEmpty():
        df = rdd.toDF()
        # Conducting Exploratory Data Analysis (EDA), such as presenting fundamental statistics
        df.describe().show()

        # Implementing window operations, for instance, calculating the moving average
        windowed_df = df.withColumn("MovingAverage", avg("Close").over(Window.orderBy("Date").rangeBetween(-2, 0)))
        windowed_df.show()

input_stream.foreachRDD(process)

In [7]:
print(df.describe())

             Open        High         Low       Close        Volume
count  100.000000  100.000000  100.000000  100.000000  1.000000e+02
mean   310.370630  314.387626  306.063775  310.168000  2.059178e+07
std     14.014033   13.768451   14.547000   14.410074  9.424891e+06
min    279.030000  285.690000  274.380000  283.250000  5.467488e+06
25%    299.280000  303.660000  295.922575  299.147500  1.567382e+07
50%    309.715000  313.250000  302.535000  308.105000  1.862845e+07
75%    319.015000  324.234925  315.422500  319.840000  2.186197e+07
max    340.130000  342.920000  338.580000  341.490000  6.668414e+07


In [8]:
df.isnull()

Unnamed: 0,Date,Open,High,Low,Close,Volume
0,False,False,False,False,False,False
1,False,False,False,False,False,False
2,False,False,False,False,False,False
3,False,False,False,False,False,False
4,False,False,False,False,False,False
...,...,...,...,...,...,...
95,False,False,False,False,False,False
96,False,False,False,False,False,False
97,False,False,False,False,False,False
98,False,False,False,False,False,False


In [9]:
sdf.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)



In [10]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Assuming 'Close' serves as the target variable, and the rest are features
vectorAssembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol='features')
df_vector = vectorAssembler.transform(sdf).select(['features', 'Close'])

# Dividing the data into training and testing sets
train_data, test_data = df_vector.randomSplit([0.7, 0.3])

# Establishing and training the model
lr = LinearRegression(featuresCol='features', labelCol='Close')
lr_model = lr.fit(train_data)

# Assessing the model's performance
predictions = lr_model.transform(test_data)
predictions.select("prediction", "Close").show()

+------------------+------+
|        prediction| Close|
+------------------+------+
| 295.2616188615741|294.47|
|297.64707641452526|295.73|
|296.36272927554126|298.57|
|307.26969720843374|307.56|
| 303.2223775417529|305.07|
|302.48190594098816|301.64|
|303.09049533349935|299.67|
|301.90077655237076|301.95|
| 303.0949284917445|301.66|
| 310.1982218305071|311.72|
|314.43178532961224|313.19|
| 305.6789041944862|305.21|
| 306.4292793264588|302.52|
| 316.0937270706587|311.71|
|303.08103871198824|303.96|
|312.22634233937754|315.43|
| 310.1737974322919|311.85|
|305.32089196048395|306.82|
| 303.9991101220918|300.21|
| 318.1857811375818|318.36|
+------------------+------+
only showing top 20 rows



In [11]:
from pyspark.ml.evaluation import RegressionEvaluator

# Generating an evaluator for Root Mean Square Error (RMSE)
rmse_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Close", metricName="rmse")

# Computing the RMSE
rmse = rmse_evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE) on test data = {rmse}")

# Establishing an evaluator for R-squared
r2_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Close", metricName="r2")

# Calculating the R-squared value
r2 = r2_evaluator.evaluate(predictions)
print(f"R-squared on test data = {r2}")

Root Mean Square Error (RMSE) on test data = 2.0002979175113866
R-squared on test data = 0.9720975667929075
