In [None]:
!pip install pyspark pandas

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=024e5f53e06cce61b44140cb714d5178f9f016986027b157f2464e5d80cd7fb2
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
import pyspark
import pandas as pd
import requests
from datetime import datetime, timedelta, date
from pyspark.sql import SparkSession
from pyspark.context import SparkContext as sc
import os
import numpy as np
import statistics as st

spark = SparkSession.builder.getOrCreate()

In [None]:
STOCK_SYMBOLS = ['TSLA', 'NVDA', 'MSFT', 'IBM']
# STOCK_SYMBOLS = ['IBM']
csv_data_root = '/content/drive/MyDrive/isdl_data'

In [None]:
def fetch_api_data(stock_symbol):
  request_url = f'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol={stock_symbol}&interval=5min&outputsize=full&apikey=4DTF3RRYQJERNX8Y'
        # f'&apikey=demo'
  stock_response_obj = requests.get(request_url)
  raw_data = stock_response_obj.json()['Time Series (5min)']
  pandas_df = pd.DataFrame(raw_data).transpose()
  pandas_df = pandas_df.reset_index().rename(columns={
      "index":"Date", '1. open': 'Open', '2. high': 'High','3. low': 'Low', '4. close': 'Close', '5. volume': 'Volume'
      })
  cleaned_pandas_df = clean_raw_df(pandas_df)
  return cleaned_pandas_df


In [None]:
def clean_raw_df(pandas_df):
  pandas_df = pandas_df.astype({
      'Open': float, 'High': float, 'Low': float, 'Close': float, 'Volume': int
  })
  pandas_df['Date'] = pd.to_datetime(pandas_df['Date'])
  return pandas_df

In [None]:
def read_or_create_stock_file(stock_symbol: str):
  # Open file for given stock symbol
  file_name = f'{csv_data_root}/{stock_symbol}_cleaned.csv'
  if os.path.isfile(file_name):
    existing_stock_df = pd.read_csv(file_name)
    existing_stock_df.sort_values(by='Date', ascending=True, inplace=True)
    # Check last date of entry in stock data
    # If last entry is of today, return it
    last_date_in_existing_data = pd.to_datetime(existing_stock_df['Date'].iloc[-1]).date()
    if last_date_in_existing_data == date.today() - timedelta(days=1):
      return existing_stock_df

    # else, add new data to existing data
    else:
      cleaned_new_df = fetch_api_data(stock_symbol)
      cleaned_new_df = cleaned_new_df[pd.to_datetime(cleaned_new_df['Date']).dt.date > last_date_in_existing_data]
      existing_stock_df.set_index('Date')
      cleaned_new_df.set_index('Date')
      updated_stock_df = pd.concat([existing_stock_df, cleaned_new_df])
      updated_stock_df.reset_index().rename({"index": "Date"})
      updated_stock_df.to_csv(f'{csv_data_root}/{stock_symbol}_cleaned.csv', index=False)
      return updated_stock_df

  # If no existing file, fetch data and save into new file
  else:
    cleaned_new_df = fetch_api_data(stock_symbol)
    cleaned_new_df.to_csv(f'{csv_data_root}/{stock_symbol}_cleaned.csv', index=False)
    return cleaned_new_df

In [None]:
def calculate_ema(series, period):
  ema = pd.Series(np.nan, index=np.arange(series.shape[0]))
  ema[period-1] = (series[:period].sum())/period
  multiplier = (2/(period + 1))

  for i in range(period, series.shape[0]):
    ema.iloc[i] = ((series.iloc[i] - ema.iloc[i-1]) * multiplier) + ema.iloc[i-1]
  return ema

In [None]:
def calculate_macd(series, fast_period=12, slow_period=26, signal_period=9):
  macd_line = calculate_ema(series, fast_period) - calculate_ema(series, slow_period)
  signal_line = pd.Series(np.nan, index=np.arange(series.shape[0]))
  signal_line[slow_period-1:] = calculate_ema(macd_line[slow_period-1:], signal_period)
  macd_histogram = macd_line - signal_line
  return macd_line, signal_line, macd_histogram

In [None]:
# considering df['Close'] passed as series over here:
def calculate_rsi(series, period = 14):
  avg_gain = pd.Series(np.nan, index=np.arange(series.shape[0]))
  avg_loss = pd.Series(np.nan, index=np.arange(series.shape[0]))
  difference_in_close = pd.Series(np.nan, index=np.arange(series.shape[0]))
  relative_strength = pd.Series(np.nan, index=np.arange(series.shape[0]))
  relative_strength_index = pd.Series(np.nan, index=np.arange(series.shape[0]))


  for i in range(1, series.shape[0]):
      difference_in_close[i] = series.iloc[i] - series.iloc[i-1]

  avg_gain[period] = difference_in_close[:period].sum()/period
  avg_loss[period] = difference_in_close[:period].sum()/period

  for i in range(period+1, series.shape[0]):
    if difference_in_close[i] > 0:
      avg_gain[i] = (avg_gain[i-1] * (period-1) + difference_in_close.iloc[i])/period
      avg_loss[i] = (avg_loss[i-1] * (period-1))/period
    else:
      avg_loss[i] = (avg_loss[i-1] * (period-1) + abs(difference_in_close.iloc[i]))/period
      avg_gain[i] = (avg_gain[i-1] * (period-1))/period

  for i in range(period, series.shape[0]):
    relative_strength[i] = avg_gain.iloc[i]/avg_loss.iloc[i]

  for i in range(period, series.shape[0]):
    relative_strength_index[i] = 100 - (100/ (1 + relative_strength[i]))

  return relative_strength_index

In [None]:
def calculate_bollinger_bands(series, period=20):
  middle_band = pd.Series(np.nan, index=np.arange(0, series.shape[0]))
  lower_band = pd.Series(np.nan, index=np.arange(0, series.shape[0]))
  upper_band = pd.Series(np.nan, index=np.arange(0, series.shape[0]))
  period_stdev = pd.Series(np.nan, index=np.arange(0, series.shape[0]))

  for i in range(period, series.shape[0]):
    middle_band[i] = (series.iloc[i-period:i].sum())/period
    period_stdev[i] = st.stdev(series.iloc[i-period:i])
    upper_band[i] = middle_band.iloc[i] + (period_stdev.iloc[i]*2)
    lower_band[i] = middle_band.iloc[i] - (period_stdev.iloc[i]*2)

  return middle_band, upper_band, lower_band

In [None]:
# main running tab

dict_of_spark_dfs = {}
for stock in STOCK_SYMBOLS:
  stock_data_df = read_or_create_stock_file(stock)

  macd_line, signal_line, macd_histogram = calculate_macd(stock_data_df['Close'], 12, 26, 9)
  stock_data_df['MACD_histogram'] = macd_histogram
  stock_data_df['MACD_line'] = macd_line
  stock_data_df['MACD_signal'] = signal_line

  rsi = calculate_rsi(stock_data_df['Close'],14)
  stock_data_df['RSI'] = rsi

  middle_band, upper_band, lower_band = calculate_bollinger_bands(stock_data_df['Close'])
  stock_data_df['Middle_band'] = middle_band
  stock_data_df['Upper_band'] = upper_band
  stock_data_df['Lower_band'] = lower_band

  dict_of_spark_dfs[stock] = stock_data_df


In [None]:
dict_of_spark_dfs['IBM']

Unnamed: 0,Date,Open,High,Low,Close,Volume,MACD_histogram,MACD_line,MACD_signal,RSI,Middle_band,Upper_band,Lower_band
0,2024-06-24 04:00:00,172.05,173.59,172.05,172.20,10,,,,,,,
1,2024-06-24 04:10:00,172.46,172.90,172.46,172.90,243,,,,,,,
2,2024-06-24 04:15:00,173.23,173.61,173.23,173.50,1335,,,,,,,
3,2024-06-24 04:30:00,173.48,173.49,173.21,173.49,16,,,,,,,
4,2024-06-24 04:35:00,173.22,173.59,173.20,173.50,1098,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
3833,2024-07-29 19:10:00,191.00,191.00,191.00,191.00,1,-0.027172,-0.040410,-0.013239,42.986108,192.06015,192.485190,191.635110
3832,2024-07-29 19:20:00,191.01,191.01,191.01,191.01,49,-0.016748,-0.023194,-0.006446,47.601012,192.06165,192.485560,191.637740
3831,2024-07-29 19:30:00,191.00,191.00,191.00,191.00,3,-0.016950,-0.019209,-0.002259,47.601012,192.06565,192.487423,191.643877
3830,2024-07-29 19:45:00,190.50,190.50,190.50,190.50,6,-0.015822,-0.013843,0.001979,51.868973,192.07665,192.516575,191.636725
