In [4]:
from pyspark.sql import SparkSession

In [5]:
spark=SparkSession.builder.appName('Stock-Data-Analysis').getOrCreate()

23/04/15 00:01:52 WARN Utils: Your hostname, PJ-Ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.0.251 instead (on interface wlo1)
23/04/15 00:01:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/15 00:01:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
spark

In [7]:
# Read the dataset 
df = spark.read.option('header', 'true').csv("./Data/FS_sp500_Value.csv").drop("_c0")

                                                                                

In [5]:
# Analysis functions 

from pyspark.sql import functions as F
from pyspark.sql.window import Window

def calcSimpleMovingAvg(df, col, span):
  window = Window.partitionBy("Ticker").orderBy("Date").rowsBetween(-span, 0)
  df = df.withColumn("moving_avg", F.avg(col).over(window))

  return df.rdd.map(lambda x: x.moving_avg).collect()

def calcDailyPercentChange(df):
  window = Window.partitionBy("Ticker").orderBy("Date")
  df = df.withColumn("prev_close", F.lag(df.Close).over(window))
  df = df.withColumn("change", F.when(F.isnull( (df.Close - df.prev_close)/df.prev_close ), 0).otherwise(F.round(F.abs((df.Close - df.prev_close)/df.prev_close) *100, 3)) )

  return df.rdd.map(lambda x: x.change).collect()

def calcATR(df):
  window = Window.partitionBy("Ticker").orderBy("Date")
  df = df.withColumn("prev_close", F.lag(df.Close).over(window))
  df = df.withColumn("h-l", df.High-df.Low)
  df = df.withColumn("h-p", F.when(F.isnull( F.abs(df.High-df.prev_close)), 0).otherwise( F.abs(df.High-df.prev_close)))
  df = df.withColumn("l-p", F.when(F.isnull( F.abs(df.Low-df.prev_close)), 0).otherwise( F.abs(df.Low-df.prev_close)))
  df = df.withColumn("true_range", F.greatest("h-l", "h-p", "l-p"))

  return calcSimpleMovingAvg(df, "true_range", 14)

def calcRSI(df):
  window = Window.partitionBy("Ticker").orderBy("Date")
  df = df.withColumn("prev_close", F.lag(df.Close).over(window)) 
  df = df.withColumn("change", F.when(F.isnull(df.Close - df.prev_close), 0).otherwise(df.Close - df.prev_close))

  window = Window.partitionBy("Ticker").orderBy("Date").rowsBetween(-14, 0)
  df = df.withColumn("change_up", F.when(df.change < 0, 0).otherwise(df.change))
  df = df.withColumn("change_down", F.when(df.change > 0, 0).otherwise(df.change))
  df = df.withColumn("avg_up", F.avg(df.change_up).over(window)).drop(df.change_up)
  df = df.withColumn("avg_down", F.avg(df.change_down).over(window)).drop(df.change_down)
  df = df.withColumn("rsi", F.round((100 * df.avg_up / (df.avg_up + F.abs(df.avg_down))), 4))
  df = df.fillna(0)
  return df.rdd.map(lambda x: x.rsi).collect()

def calcVPT(df):
  window = Window.partitionBy("Ticker").orderBy("Date")
  df = df.withColumn("prev_close", F.lag(df.Close).over(window))
  df = df.withColumn("temp_vpt", F.when(F.isnull((df.Volume * (df.Close - df.prev_close))/df.prev_close), 0).otherwise((df.Volume * (df.Close - df.prev_close))/df.prev_close))
  df = df.withColumn("prev_vpt", F.lag(df.temp_vpt).over(window))
  df = df.withColumn("vpt", F.when(F.isnull( df.prev_vpt), df.temp_vpt).otherwise(df.temp_vpt + df.prev_vpt)).drop(df.temp_vpt).drop(df.prev_vpt)
  return df.rdd.map(lambda x: x.vpt).collect()

def getDates(df): 
  return df.rdd.map(lambda x: x.Date).collect()

In [8]:
from cassandra.cluster import Cluster

cluster = Cluster(['localhost']) # replace localhost with your Cassandra host IP
session = cluster.connect()

# Create keyspace 
session.execute("CREATE KEYSPACE main WITH replication = {'class':'SimpleStrategy', 'replication_factor': 3};")

# Create stock table
session.execute("CREATE TABLE main.stock(TICKER text, MOVING_AVG LIST<double>, DAILY_PERCENTAGE LIST<double>, ATR LIST<double>, VPT LIST<double>, RSI LIST<double>, DATES LIST<date>, PRIMARY KEY(TICKER));")

# Create prepared statement
insert_stock = session.prepare("""
    INSERT INTO main.stock (TICKER, MOVING_AVG, DAILY_PERCENTAGE, ATR, VPT, RSI, DATES) 
    VALUES (?, ?, ?, ?, ?, ?, ?)
    """)

NoHostAvailable: ('Unable to connect to any servers', {'127.0.0.1:9042': ConnectionRefusedError(111, "Tried connecting to [('127.0.0.1', 9042)]. Last error: Connection refused")})

In [7]:
def executeCalc(df): 
  data = [df.collect()[0][0], calcSimpleMovingAvg(df, 'Close', 14),calcDailyPercentChange(df), calcATR(df), calcVPT(df), calcRSI(df), getDates(df)]
  session.execute(insert_stock, (data[0], data[1], data[2], data[3], data[4], data[5], data[6]))
  print(data[0] + " data entered into Cassandra \n")

In [None]:
from pyspark.sql.functions import col

tickers = df.select(col("Ticker")).distinct().rdd.flatMap(lambda x: x).collect()
stock_dfs = [df.where(df["Ticker"] == ticker) for ticker in tickers]

for df in stock_dfs:
  executeCalc(df)
