In [10]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
!tar xf spark-3.3.3-bin-hadoop3.tgz
!pip install -q findspark

In [93]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
import random

spark = SparkSession.builder.appName("YourTest").master("local[2]").config('spark.ui.port', random.randrange(4000,5000)).getOrCreate()

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, when, avg, abs, col, sum
from pyspark.sql import functions as F

In [102]:
## change file to read based on stock
df = spark.read.csv("/content/drive/MyDrive/LYFT.csv", sep=',',inferSchema=True, header=True)

# RSI
diff = df['Close'] - lag(df["Close"], 1).over(Window().orderBy("Date"))
gain = when(diff > 0, diff).otherwise(0)
loss = when(diff < 0, diff).otherwise(0)
avg_gain = avg(gain).over(Window.orderBy("Date").rowsBetween(-7, -1))
avg_loss = avg(abs(loss)).over(Window.orderBy("Date").rowsBetween(-7, -1))
RSI = 100 - (100/(1+(avg_gain/avg_loss)))
RSI = lag(RSI, -1).over(Window().orderBy("Date"))
df = df.withColumn("RSI", when(col("_c0") < '7', None).otherwise(RSI))

# SMA
SMA = avg(df["Close"]).over(Window().rowsBetween(-7,-1))
SMA = lag(SMA, -1).over(Window().orderBy("Date"))
df = df.withColumn("SMA", when(col("_c0") < '6', None).otherwise(SMA))

# OBV
diff = df['Close'] - lag(df["Close"], 1).over(Window().orderBy("Date"))
gain = when(diff > 0, df['Volume']).otherwise(0)
loss = when(diff < 0, -1 * df['Volume']).otherwise(0)
s = gain + loss
cummalative_sum = F.sum(s).over(Window().orderBy("Date"))
df = df.withColumn("OBV", when(col("_c0") < '1', None).otherwise(cummalative_sum))
df.show()
df.write.options(header='True', delimiter=',').csv("/content/drive/MyDrive/LYFT_processed.csv")


+---+-------------------+------------------+------------------+------------------+------------------+------------------+-------+------------------+------------------+--------+
|_c0|               Date|              Open|              High|               Low|             Close|         Adj Close| Volume|               RSI|               SMA|     OBV|
+---+-------------------+------------------+------------------+------------------+------------------+------------------+-------+------------------+------------------+--------+
|  0|2020-01-02 00:00:00|43.220001220703125| 44.06999969482422|43.165000915527344| 43.58000183105469| 43.58000183105469|3912100|              null|              null|    null|
|  1|2020-01-03 00:00:00|42.849998474121094| 43.36000061035156|42.233001708984375|43.189998626708984|43.189998626708984|4407200|              null|              null|-4407200|
|  2|2020-01-06 00:00:00|42.790000915527344|43.810001373291016|42.380001068115234|43.099998474121094|43.099998474121094|