In [1]:
import numpy as np
from pyspark.sql import SparkSession, Window
from datetime import date, timedelta, datetime
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import col, udf, collect_list, dayofyear, lag, round, to_date
from pyspark.sql.types import *

bucket_name = "web-app-project"
spark = SparkSession\
    .builder\
    .appName("write_nodes")\
    .getOrCreate()

today = date.today().isoformat()
prices_path = "s3://" + bucket_name + "/price-data-" + today + ".csv" 
df = spark.read.csv(prices_path, header=True)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
10,application_1604597248464_0015,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------------------+----+----+----+-----+------+
|symbol|          timeframe|open|high| low|close|volume|
+------+-------------------+----+----+----+-----+------+
|    GE|               time|open|high| low|close|volume|
|    GE|2020-04-27 20:00:00|6.44|6.44|6.44| 6.44|  7400|
|    GE|2020-04-27 19:59:00|6.44|6.44|6.44| 6.44|  2100|
|    GE|2020-04-27 19:58:00|6.44|6.44|6.44| 6.44|  4625|
|    GE|2020-04-27 19:57:00|6.44|6.44|6.44| 6.44| 10600|
+------+-------------------+----+----+----+-----+------+
only showing top 5 rows

In [3]:
# filter data from last two weeks
# two_weeks_ago = (date.today() - timedelta(days=13))
# df = df.filter(to_date(df["timeframe"]) > two_weeks_ago)

# extract day of year from timestamp and aggregate by day
df = df.withColumn("timeframe", col("timeframe").cast(TimestampType())) \
.withColumn("dayofyear", dayofyear("timeframe")) \
.groupBy("symbol", "dayofyear") \
.agg({"open": "avg"}) \
.orderBy("symbol", "dayofyear", ascending=[1, 1])

# We compute the return percent for the open price between each consecutive day 
window = Window.partitionBy("symbol").orderBy("dayofyear")
df = df.filter(df.dayofyear.isNotNull()) \
.withColumn("prev", lag(col("avg(open)"), 1).over(window)) 
df = df.withColumn("percent_change", round((col("avg(open)") - col("prev")) / col("prev") * 100, 2)) \
.filter(df.prev.isNotNull())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+---------+------------------+------------------+--------------+
|symbol|dayofyear|         avg(open)|              prev|percent_change|
+------+---------+------------------+------------------+--------------+
|   BMY|        3|62.991150684931505|  63.4208204225352|         -0.68|
|   BMY|        6|62.984082949308785|62.991150684931505|         -0.01|
|   BMY|        7| 63.80465313901342|62.984082949308785|           1.3|
|   BMY|        8| 64.31898832116785| 63.80465313901342|          0.81|
|   BMY|        9| 65.34006199524941| 64.31898832116785|          1.59|
+------+---------+------------------+------------------+--------------+
only showing top 5 rows

In [5]:
# We pivot the dataframe to compute a correlation matrix
df = df.groupBy("dayofyear") \
.pivot("symbol") \
.agg({"percent_change": "sum"}) \
.orderBy("dayofyear") \
.drop("dayofyear")
df = df.fillna(0)
symbols = df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
df.select(df.columns[:10]).show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
| AAPL| ABBV|  AES|  AFL|  AIG| AMAT|  AMD| AMZN| ATVI|   BA|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|-0.05| -0.3|-0.42| -0.1|-0.41|-1.17| 0.63|-0.26| -0.7| 0.22|
|-0.23| 0.02| 0.18|-0.38| 0.29|-2.28|-0.28| 0.84| 1.71|-0.26|
| 0.41| -0.1| 0.26| -0.6|-0.64| 2.43| 0.07| 0.49|  0.5| 2.02|
| 0.68| 1.13| 0.41| 0.37| 1.46| 0.68|-1.03| 0.06| 1.15| -1.6|
| 2.57| 0.15| 1.12|-0.46| 0.54| 0.22| 2.64| 0.24|-1.96|  1.1|
| 0.81|-0.29|-0.31|-0.07| 0.34|-0.09|-1.05| -0.7|-0.25|-1.01|
|  1.0|-1.17| 0.35|-0.23| 0.01|-0.24|  0.1|-0.01| 0.21|-0.04|
| 0.25|-0.95| 0.09|  0.1| 0.87| 1.48|-0.49|-1.09| 1.15| 0.03|
|-0.62| 1.55|  0.8| 0.08|-1.87|-1.42| 0.39|-0.34| -0.4|-0.33|
| 0.39| 0.27|-0.16| 0.64| 1.25| 1.88| 1.89| 0.42| 0.63| 0.05|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
only showing top 10 rows

In [7]:
# We compute the correlation matrix
assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
df = assembler.transform(df).select("features")
matrix = Correlation.corr(df, "features")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
n = len(symbols)
values = np.array(matrix.collect()[0][0].values).reshape(n,n)
schema = StructType([
    StructField("source", StringType(), True),
    StructField("target", StringType(), True), 
    StructField("value", FloatType(), True)
    ])

# The nodes values are in a symmetric matrix 
# Thus, we only keep the upper right triangle of the matrix
rows = []
for i in range(n):
    for j in range(i+1, n):
        rows.append((symbols[i], symbols[j], float(values[i][j])))
        
nodes_df = spark.createDataFrame(rows, schema)
#test_df.count() == int((n * (n-1)) / 2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
nodes_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+------------+
|source|target|       value|
+------+------+------------+
|  AAPL|  ABBV|  -0.0431814|
|  AAPL|   AES| -0.17248283|
|  AAPL|   AFL|    0.646242|
|  AAPL|   AIG|   0.6827023|
|  AAPL|  AMAT|-0.054861702|
+------+------+------------+
only showing top 5 rows

In [10]:
# We change to ordinal values : 1 mildly correlated, 2 strongly correlated and vice versa
def ordinal_encoder(val):
    if -0.5 <= val <= 0.5: return 0
    if -0.7 <= val < -0.5: return -1
    if val < -0.7: return -2
    if 0.5 < val <= 0.7: return 1
    if val > 0.7: return 2

ordinal_encoder = udf(ordinal_encoder, StringType())
nodes_df = nodes_df.withColumn("value", ordinal_encoder("value"))
# we remove the low correlated nodes
nodes_df = nodes_df.filter(nodes_df.value != 0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
nodes_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+-----+
|source|target|value|
+------+------+-----+
|  AAPL|   AFL|    1|
|  AAPL|   AIG|    1|
|  AAPL|   AMD|   -1|
|  AAPL|  AMZN|   -1|
|  AAPL|  ATVI|   -1|
+------+------+-----+
only showing top 5 rows