## Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, sqrt, pow
import pyspark.sql.functions as f

## Config

In [63]:
spark = SparkSession.builder.master("local[*]").appName("app").getOrCreate()

weather_dataset_paths = [
    "out_2017.txt",
    "out_2018.txt",
    "out_2019.txt",
    "out_2020.txt"
]

stocks_dataset_path = "MS1.txt"

number_stocks_and_weather = 1000

## Functions

In [4]:
def weather_transforms(path):
    # Read data
    df = spark.read \
        .option("header", "true") \
        .option("dateFormat", "yyyy-mm-dd") \
        .csv(path)
    # Split temperature into variables
    df = df.withColumn('TEMP', f.split(df['TMP'], ' ').getItem(0)) 
    # Drop unnecessary columns
    df = df.drop(
        "splitcount", "LOCATION", "WIND", "TMP", "DEW", "SLP", "tmp_quality"
    )

    return df

# Weather data

In [64]:
from pyspark.sql import Window
from pyspark.sql.types import *

import sys

# Load and concatenate all weather data
union = weather_transforms(weather_dataset_paths[0])
for weather_dataset_path in weather_dataset_paths[1:]:
    union = union.union(weather_transforms(weather_dataset_path))

# Select stations which easy start date and then first 1000 of those
weather_df = union.filter(union.DATE == "2017-01-01") \
    .select('STATION_ID') \
    .limit(number_stocks_and_weather)

print(f"Number of rows: {weather_df.count()}")

# Filter these over the complete dataset
filtered_union = weather_df.join(union, 'STATION_ID')

# Fill the easy start stations with min and max date
weather_df = weather_df \
    .withColumn("min_date", f.lit("2017-01-01").cast('date')) \
    .withColumn("max_date", f.lit("2019-12-31").cast('date')) # 1000 days from the start date

# Expand to add all days
weather_df = weather_df \
    .withColumn('DATE', f.explode(f.expr('sequence(min_date, max_date, interval 1 day)'))) \
    .drop("min_date", "max_date")

# Left join to make sure all dates are there (missing dates will get null)
weather_df = weather_df \
    .join(filtered_union, ["STATION_ID", "DATE"], "left") \
    .sort(['STATION_ID', 'DATE'])
weather_df = weather_df \
    .withColumn("TEMP" , weather_df.TEMP.cast('float')/10.0)

weather_df = weather_df.replace([999, 9999], None)

window_ff = Window.partitionBy('STATION_ID')\
               .orderBy('DATE')\
               .rowsBetween(Window.unboundedPreceding,Window.currentRow)

window_bf = Window.partitionBy('STATION_ID')\
               .orderBy('DATE')\
               .rowsBetween(Window.currentRow,Window.unboundedFollowing)
               

def interpol_middle(y, y_prev, y_next):
    if y_prev is None and y_next is not None:
        return y_next
    elif y_next is None and y_prev is not None:
        return y_prev
    elif y_next is None and y_prev is None:
        return 0.0
    else:
        return (y_prev + y_next) / 2.0

#Forward and backwards filling, takes around 6 minutes to complete
read_last = f.last(weather_df['TEMP'], ignorenulls=True).over(window_ff)
read_next = f.first(weather_df['TEMP'], ignorenulls=True).over(window_bf)

# add the columns to the dataframe
df_filled = weather_df.withColumn('readvalue_ff', read_last.cast('float'))\
                        .withColumn('readvalue_bf', read_next.cast('float'))
            
interpol_udf = f.udf(interpol, FloatType())   
interpol_middle_udf = f.udf(interpol_middle, FloatType())
## Set Date To Unix Time
from pyspark.sql.functions import unix_timestamp
df_filled = df_filled.withColumn('unix', unix_timestamp('DATE'))


# df_filled = df_filled.withColumn('TEMP_INTERPOL', interpol_udf('unix', 'readtime_ff', 'readtime_bf', 'readvalue_ff', 'readvalue_bf', 'TEMP'))\
#                     .drop('readtime_ff', 'readtime_bf')\

df_filled = df_filled.withColumn('TEMP', interpol_middle_udf('TEMP','readvalue_ff', 'readvalue_bf'))\
                    .drop('readtime_ff', 'readtime_bf', 'unix', 'readvalue_ff', 'readvalue_bf')\

# UNCOMMENT
df_filled.persist()

# df_filled.schema
df_filled.limit(10).show()
# print(f"Number of rows: {weather_df.count()}")
df_filled.filter(df_filled.TEMP.isNull()).limit(10).show()

Number of rows: 1000
+-----------+----------+-----+
| STATION_ID|      DATE| TEMP|
+-----------+----------+-----+
|01367099999|2017-01-01| -0.8|
|01367099999|2017-01-02| -2.8|
|01367099999|2017-01-03| -6.6|
|01367099999|2017-01-04| -7.9|
|01367099999|2017-01-05|-13.7|
|01367099999|2017-01-06|-15.1|
|01367099999|2017-01-07| -9.1|
|01367099999|2017-01-08| -9.2|
|01367099999|2017-01-09| -4.8|
|01367099999|2017-01-10|  1.9|
+-----------+----------+-----+

+----------+----+----+
|STATION_ID|DATE|TEMP|
+----------+----+----+
+----------+----+----+



In [65]:
df_filled.coalesce(1).write.csv('WEATHER_DATA_IMPUTED7.csv')

# Stock data

In [58]:
# Load data
stocks_df = spark.read \
    .option("header", "false") \
    .csv(stocks_dataset_path)

# Rename columns and drop volume
stocks_df = stocks_df.selectExpr(
    '_c0 AS STOCK',
    '_c1 AS DATE',
    '_c2 AS PRICE',
    '_c3 AS VOLUME'
)
stocks_df = stocks_df.drop("VOLUME")

# Select 1000 stocks with appropriate years
df_stocks_selection = stocks_df \
    .filter((stocks_df.DATE.contains("2017") | stocks_df.DATE.contains("2018") | stocks_df.DATE.contains("2019")))
df_stocks_selection_filtered = df_stocks_selection \
    .filter(df_stocks_selection.DATE == "01/02/2017") \
    .select('STOCK') \
    .limit(number_stocks_and_weather)

selected_stocks = df_stocks_selection_filtered \
    .join(df_stocks_selection, 'STOCK')

# 
stocks_df = df_stocks_selection_filtered \
    .withColumn("min_date", f.lit("2017-01-01").cast('date')) \
    .withColumn("max_date", f.lit("2019-12-31").cast('date'))
stocks_df = stocks_df \
    .withColumn('DATE', f.explode(f.expr('sequence(min_date, max_date, interval 1 day)'))) \
    .drop("min_date", "max_date")

# 
modifiedDF = selected_stocks \
    .withColumn("DATE", f.to_date("DATE", "MM/dd/yyyy")) \
    .dropDuplicates(["STOCK", "DATE"])
stocks_df = stocks_df \
    .join(modifiedDF, ["Stock", "DATE"], "left") \
    .sort(['STOCK', 'DATE'])\

#stocks_df = stocks_df.withColumn("PRICE", stocks_df.PRICE.cast("int"))

#Forward and backwards filling, takes around 6 minutes to complete
w_forward = Window.partitionBy('Stock').orderBy('Date').rowsBetween(Window.unboundedPreceding,Window.currentRow)
w_backward = Window.partitionBy('Stock').orderBy('Date').rowsBetween(Window.currentRow,Window.unboundedFollowing)

#Forward and backwards filling, takes around 6 minutes to complete
read_last_price = f.last(stocks_df['Price'], ignorenulls=True).over(w_forward)
#read_last_volume = f.last(stocks_df['Volume'], ignorenulls=True).over(w_forward)

read_next_price = f.first(stocks_df['Price'], ignorenulls=True).over(w_backward)
#read_next_volume = f.first(stocks_df['Volume'], ignorenulls=True).over(window_bf)

# add the columns to the dataframe
df_filled = stocks_df.withColumn('read_last_price', read_last_price.cast('float'))\
                        .withColumn('read_next_price', read_next_price.cast('float'))\
                       # .withColumn('read_last_volume', read_last_volume.cast('float'))\
                       # .withColumn('read_next_volume', read_next_volume.cast('float'))

df_filled = df_filled.withColumn('Price',interpol_middle_udf('Price','read_last_price', 'read_next_price'))\
                             .drop('read_last_price', 'read_next_price')


df_filled.persist()

df_filled.show()
print(f"Number of rows: {df_filled.count()}")

+--------------------+----------+------+
|               STOCK|      DATE| Price|
+--------------------+----------+------+
|19272.Europa_Deut...|2017-01-01| 6.072|
|19272.Europa_Deut...|2017-01-02| 6.072|
|19272.Europa_Deut...|2017-01-03| 6.115|
|19272.Europa_Deut...|2017-01-04| 6.111|
|19272.Europa_Deut...|2017-01-05| 6.348|
|19272.Europa_Deut...|2017-01-06| 6.295|
|19272.Europa_Deut...|2017-01-07|6.3005|
|19272.Europa_Deut...|2017-01-08|6.3005|
|19272.Europa_Deut...|2017-01-09| 6.306|
|19272.Europa_Deut...|2017-01-10| 6.292|
|19272.Europa_Deut...|2017-01-11| 6.296|
|19272.Europa_Deut...|2017-01-12| 6.291|
|19272.Europa_Deut...|2017-01-13| 6.313|
|19272.Europa_Deut...|2017-01-14|6.2995|
|19272.Europa_Deut...|2017-01-15|6.2995|
|19272.Europa_Deut...|2017-01-16| 6.286|
|19272.Europa_Deut...|2017-01-17| 6.292|
|19272.Europa_Deut...|2017-01-18| 6.301|
|19272.Europa_Deut...|2017-01-19|  6.31|
|19272.Europa_Deut...|2017-01-20| 6.155|
+--------------------+----------+------+
only showing top

In [59]:
df_filled.filter(df_filled.Price.isNull()).limit(10).show()
df_filled.coalesce(1).write.csv('STOCKS_DATA_IMPUTED5.csv')

+-----+----+-----+
|STOCK|DATE|Price|
+-----+----+-----+
+-----+----+-----+



# Part 2

In [None]:
"""
SQL query used:

SELECT sum(X*Y') / (sqrt(sum(X))*sqrt(sum(Y')))
FROM (
    SELECT S.DATE, S.STOCK, W.STATION_ID_1, W.STATION_ID_2, S.PRICE as X, W.Y'
    FROM stocks AS S, (
        SELECT W1.DATE, W1.STATION_ID AS STATION_ID_1, W2.STATION_ID AS STATION_ID_2, avg|max|min(W1.TEMP, W2.TEMP) as W.Y'
        FROM weather AS W1, weather AS W2,
        WHERE W1.DATE = W2.DATE
    ) AS W
    WHERE S.DATE = W.DATE
)
GROUPBY STOCK, STATION_ID_1, STATION_ID_2

"""

In [74]:
# agg_function = f.greatest
# agg_function = f.least
agg_function = lambda col1, col2: (col(col1) + col(col2)) / 2

Y_df = weather_df \
    .withColumnRenamed("STATION_ID", "STATION_ID_1") \
    .withColumnRenamed("TEMP", "TEMP_1") \
    .join(
        weather_df \
            .withColumnRenamed("STATION_ID", "STATION_ID_2") \
            .withColumnRenamed("TEMP", "TEMP_2"), 
        "DATE"
    )
Y_df = Y_df.withColumn("Y'", agg_function("TEMP_1", "TEMP_2"))
Y_df = Y_df.drop("TEMP_1", "TEMP_2")
Y_df.persist()
Y_df.show()
print(f"Number of rows: {Y_df.count()}")

+----------+------------+------------+------+
|      DATE|STATION_ID_1|STATION_ID_2|    Y'|
+----------+------------+------------+------+
|2017-01-01| 02265099999| 94804099999|  85.0|
|2017-01-01| 02265099999| 94677099999|  95.5|
|2017-01-01| 02265099999| 94584099999| 156.5|
|2017-01-01| 02265099999| 83650099999| 127.5|
|2017-01-01| 02265099999| 76749399999| 137.5|
|2017-01-01| 02265099999| 71889099999| -22.0|
|2017-01-01| 02265099999| 71453099999| -20.0|
|2017-01-01| 02265099999| 71450099999| -37.0|
|2017-01-01| 02265099999| 71322099999|-171.5|
|2017-01-01| 02265099999| 70333325518|  31.0|
|2017-01-01| 02265099999| 70259526559| -27.5|
|2017-01-01| 02265099999| 68487099999|  90.0|
|2017-01-01| 02265099999| 63708099999| 112.5|
|2017-01-01| 02265099999| 62398099999|  37.5|
|2017-01-01| 02265099999| 54292099999| -76.0|
|2017-01-01| 02265099999| 43278099999| 115.5|
|2017-01-01| 02265099999| 41862199999|  63.0|
|2017-01-01| 02265099999| 28552099999| -17.0|
|2017-01-01| 02265099999| 06022499

In [79]:
combined_df = stocks_df \
    .withColumnRenamed("PRICE", "X") \
    .join(Y_df, "DATE")

combined_df_grouped = combined_df \
    .groupBy("STOCK", "STATION_ID_1", "STATION_ID_2") \
    .agg(
        f.sqrt(f.sum(f.pow("X", 2))).alias("X_norm"),
        f.sqrt(f.sum(f.pow("Y'", 2))).alias("Y_norm"),
        f.sum(col("X") * col("Y'")).alias("XY")
    )

combined_df_grouped = combined_df_grouped \
    .withColumn("COSINE_SIM", col("XY") / (col("Y_norm") * col("X_norm")))
combined_df_grouped = combined_df_grouped.drop("X_norm", "Y_norm", "XY")
combined_df_grouped.persist()
combined_df_grouped.show()

print(f"Number of rows: {combined_df_grouped.count()}")

+--------------------+------------+------------+-------------------+
|               STOCK|STATION_ID_1|STATION_ID_2|         COSINE_SIM|
+--------------------+------------+------------+-------------------+
|42814.Nordamerika...| 02265099999| 94677099999| 0.8642378548973022|
|4438.Asien--Austr...| 02265099999| 76749399999| 0.7725067131938536|
|43297.Nordamerika...| 02265099999| 43278099999| 0.8487977948316912|
|13784.Europa_Deut...| 06022499999| 94677099999| 0.5193424493119109|
|28549.Futures--In...| 28552099999| 94804099999| 0.7120823821302918|
|32843.Nordamerika...| 28552099999| 62398099999| 0.6929047123707867|
|31474.Nordamerika...| 28552099999| 54292099999|0.11055468034407347|
|41574.Nordamerika...| 41862199999| 71450099999|0.45457949948720755|
|34271.Nordamerika...| 43278099999| 94584099999| 0.5726049863036856|
|13784.Europa_Deut...| 43278099999| 71453099999| 0.4919507413036301|
|38565.Nordamerika...| 62398099999| 71450099999| 0.6591657325739723|
|37239.Nordamerika...| 68487099999

In [62]:
# stocks_df.filter(col("PRICE").isNotNull()).count()