In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, DoubleType, StringType, IntegerType

from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import numpy as np
import pandas as pd

In [2]:
window_duration = '11 day'
slide_duration = '1 day'

In [2]:
import pyspark

In [3]:
pyspark.__version__

'3.1.1'

In [3]:
spark = SparkSession.builder.master('local').appName('NewsSentimentAnalysis').getOrCreate()

### StockPrice

In [84]:
# read csv
df = spark.read.csv('../data/stock_price/data/alldate_RIOT.csv', header=True)
df = df.withColumn('Date', f.to_timestamp(df['Date'], 'yyyy-MM-dd'))

In [85]:
df.show()

+-------------------+------------------+------------------+------+
|               Date|              Open|             Close|Ticker|
+-------------------+------------------+------------------+------+
|2016-03-31 00:00:00| 2.309999942779541| 2.700000047683716|  RIOT|
|2016-04-01 00:00:00|2.5999999046325684| 2.630000114440918|  RIOT|
|2016-04-02 00:00:00| 2.630000114440918| 2.630000114440918|  RIOT|
|2016-04-03 00:00:00| 2.630000114440918| 2.630000114440918|  RIOT|
|2016-04-04 00:00:00| 2.630000114440918| 2.630000114440918|  RIOT|
|2016-04-05 00:00:00| 2.880000114440918|3.0199999809265137|  RIOT|
|2016-04-06 00:00:00|3.0899999141693115|2.9800000190734863|  RIOT|
|2016-04-07 00:00:00|2.9800000190734863| 2.990000009536743|  RIOT|
|2016-04-08 00:00:00| 3.009999990463257|2.9700000286102295|  RIOT|
|2016-04-09 00:00:00| 2.974000024795532| 2.978000020980835|  RIOT|
|2016-04-10 00:00:00|2.9820000171661376|2.9860000133514406|  RIOT|
|2016-04-11 00:00:00| 2.990000009536743|3.0399999618530273|  R

In [86]:
@f.udf
def combine(opening_price, closing_price):
    return [opening_price, closing_price]

In [87]:
df = df.withColumn('Open_Close', combine(df['Open'], df['Close']))
df = df.withColumn("Open_Close_new", f.split(f.regexp_replace("Open_Close", r"(^\[)|(\]$)", ""), ", ").cast("array<double>"))

In [88]:
df.show()

+-------------------+------------------+------------------+------+--------------------+--------------------+
|               Date|              Open|             Close|Ticker|          Open_Close|      Open_Close_new|
+-------------------+------------------+------------------+------+--------------------+--------------------+
|2016-03-31 00:00:00| 2.309999942779541| 2.700000047683716|  RIOT|[2.30999994277954...|[2.30999994277954...|
|2016-04-01 00:00:00|2.5999999046325684| 2.630000114440918|  RIOT|[2.59999990463256...|[2.59999990463256...|
|2016-04-02 00:00:00| 2.630000114440918| 2.630000114440918|  RIOT|[2.63000011444091...|[2.63000011444091...|
|2016-04-03 00:00:00| 2.630000114440918| 2.630000114440918|  RIOT|[2.63000011444091...|[2.63000011444091...|
|2016-04-04 00:00:00| 2.630000114440918| 2.630000114440918|  RIOT|[2.63000011444091...|[2.63000011444091...|
|2016-04-05 00:00:00| 2.880000114440918|3.0199999809265137|  RIOT|[2.88000011444091...|[2.88000011444091...|
|2016-04-06 00:00:0

In [89]:
df = df.orderBy("Date").groupBy(f.window('Date', window_duration, slide_duration)) \
    .agg(f.collect_list('Open_Close_new')) \
    .withColumnRenamed('collect_list(Open_Close_new)', 'sliding_window')

# flatten
df = df.withColumn('sliding_window', f.flatten(df['sliding_window']))

In [90]:
@f.udf(ArrayType(DoubleType()))
def normalize(x):
    """
    Normalize the input to the range between 0 and 1
    """
    x = np.array(x)
    x_normalized = ((x - np.min(x)) / (np.max(x) - np.min(x))).tolist()
    return x_normalized


df = df.withColumn('sliding_window', normalize(df['sliding_window']))

In [91]:
df1 = df.withColumn('array_length', f.size("sliding_window"))

In [92]:
df1 = df1.filter((df1.array_length == 22))

In [93]:
df1 = df1.select('window', 'sliding_window').withColumnRenamed("sliding_window", "StockPrice")

In [95]:
df1.orderBy("window").show()

+--------------------+--------------------+
|              window|          StockPrice|
+--------------------+--------------------+
|{2016-03-30 20:00...|[0.0, 0.500000152...|
|{2016-03-31 20:00...|[0.0, 0.061224916...|
|{2016-04-01 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-04-02 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-04-03 20:00...|[0.0, 0.0, 0.5434...|
|{2016-04-04 20:00...|[0.0, 0.499999574...|
|{2016-04-05 20:00...|[0.63157815484152...|
|{2016-04-06 20:00...|[0.05263151290346...|
|{2016-04-07 20:00...|[0.21052605161384...|
|{2016-04-08 20:00...|[0.0, 0.021505348...|
|{2016-04-09 20:00...|[0.01111109639391...|
|{2016-04-10 20:00...|[0.05555548196960...|
|{2016-04-11 20:00...|[0.61111030166561...|
|{2016-04-12 20:00...|[0.27777740984800...|
|{2016-04-13 20:00...|[0.16666644590880...|
|{2016-04-14 20:00...|[1.0, 0.479999542...|
|{2016-04-15 20:00...|[0.77241398020961...|
|{2016-04-16 20:00...|[0.80000016442662...|
|{2016-04-17 20:00...|[0.82758634864364...|
|{2016-04-18 20:00...|[0.4594595

### Sentiment Analysis

In [4]:
df2 = spark.read.csv('../data/news/data/news_large.csv', header=True, escape='"')

In [5]:
df2.show()

+-------------------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+
|             source|     author|               time|               title|         description|             content|                 url|
+-------------------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+
|       gulfbusiness|NotProvided|2016-05-30 14:16:02|Dubai looks to im...|Emirate plans to ...|Dubai looks to im...|https://gulfbusin...|
|             forbes|NotProvided|2016-05-26 23:00:00|How Blockchain Te...|Theres a lot of h...|Kevin Durants Bus...|https://www.forbe...|
|      mjvinnovation|NotProvided|2016-05-23 17:01:06|From the Internet...|Have you heard of...|Understand how th...|http://blog.mjvin...|
|            bitcoin|NotProvided|2016-05-22 16:29:34|New Experiment Al...|Just recently a n...|May 22, 2016 New ...|https://news.bitc...|
|          economist|NotProvided|2

In [5]:
df2 = df2.withColumn('time', f.to_timestamp(df2['time'], 'yyyy-MM-dd H:mm:ss'))
df2 = df2.withColumn('hour', f.hour(f.col('time')))
df2 = df2.withColumn('Day', f.to_date(df2['time'], format='yyyy-MM-dd'))

In [7]:
df2.show()

+-------------------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+----+----------+
|             source|     author|               time|               title|         description|             content|                 url|hour|       Day|
+-------------------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+----+----------+
|       gulfbusiness|NotProvided|2016-05-30 14:16:02|Dubai looks to im...|Emirate plans to ...|Dubai looks to im...|https://gulfbusin...|  14|2016-05-30|
|             forbes|NotProvided|2016-05-26 23:00:00|How Blockchain Te...|Theres a lot of h...|Kevin Durants Bus...|https://www.forbe...|  23|2016-05-26|
|      mjvinnovation|NotProvided|2016-05-23 17:01:06|From the Internet...|Have you heard of...|Understand how th...|http://blog.mjvin...|  17|2016-05-23|
|            bitcoin|NotProvided|2016-05-22 16:29:34|New Experiment Al...|Ju

In [6]:
@f.udf
def connect_string(a, b):
    return a+' '+b

In [7]:
df2 = df2.withColumn('AllText', connect_string(connect_string(df2['title'], df2['description']), df2['content']))

In [10]:
df2.show()

+-------------------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+----+----------+--------------------+
|             source|     author|               time|               title|         description|             content|                 url|hour|       Day|             AllText|
+-------------------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+----+----------+--------------------+
|       gulfbusiness|NotProvided|2016-05-30 14:16:02|Dubai looks to im...|Emirate plans to ...|Dubai looks to im...|https://gulfbusin...|  14|2016-05-30|Dubai looks to im...|
|             forbes|NotProvided|2016-05-26 23:00:00|How Blockchain Te...|Theres a lot of h...|Kevin Durants Bus...|https://www.forbe...|  23|2016-05-26|How Blockchain Te...|
|      mjvinnovation|NotProvided|2016-05-23 17:01:06|From the Internet...|Have you heard of...|Understand how th...|http://bl

In [8]:
df3 = df2.select('Day', 'hour', 'Alltext')

In [9]:
df3.show()

+----------+----+--------------------+
|       Day|hour|             Alltext|
+----------+----+--------------------+
|2016-05-30|  14|Dubai looks to im...|
|2016-05-26|  23|How Blockchain Te...|
|2016-05-23|  17|From the Internet...|
|2016-05-22|  16|New Experiment Al...|
|2016-05-21|   0|Science and techn...|
|2016-05-20|  22|Why The Blockchai...|
|2016-05-20|  18|THE BLOCKCHAIN RE...|
|2016-05-18|   0|Banking with Bloc...|
|2016-05-16|  17|Blockchain announ...|
|2016-05-15|  10|11FS Blockchain C...|
|2016-05-12|   0|How Revolutionary...|
|2016-05-10|  10|New Yorkers Tout ...|
|2016-05-03|  18|CoinReport  Austr...|
|2016-05-03|   0|How can blockchai...|
|2016-05-02|  12|Blockchain  More ...|
|2016-05-01|  11|Blockchain Techno...|
|2016-04-29|  13|The Human Blockch...|
|2016-04-28|  16|ECB Executive Pla...|
|2016-04-28|  11|Russian Central B...|
|2016-04-27|  20|Gem Health Unveil...|
+----------+----+--------------------+
only showing top 20 rows



In [10]:
from pyspark.sql.functions import when

In [11]:
df3 = df3.withColumn('TrueDate', when(df3.hour < 9, df3['Day'])
                                .when(df3.hour >= 16 , f.date_add(df3['Day'], 1))
                                .otherwise(df3['Day']))

In [12]:
df3 = df3.withColumn('Type', when(df3.hour < 9, 'Open')
                                .when(df3.hour >= 16 , 'Open')
                                .otherwise('Close'))

In [13]:
df3.show()

+----------+----+--------------------+----------+-----+
|       Day|hour|             Alltext|  TrueDate| Type|
+----------+----+--------------------+----------+-----+
|2016-05-30|  14|Dubai looks to im...|2016-05-30|Close|
|2016-05-26|  23|How Blockchain Te...|2016-05-27| Open|
|2016-05-23|  17|From the Internet...|2016-05-24| Open|
|2016-05-22|  16|New Experiment Al...|2016-05-23| Open|
|2016-05-21|   0|Science and techn...|2016-05-21| Open|
|2016-05-20|  22|Why The Blockchai...|2016-05-21| Open|
|2016-05-20|  18|THE BLOCKCHAIN RE...|2016-05-21| Open|
|2016-05-18|   0|Banking with Bloc...|2016-05-18| Open|
|2016-05-16|  17|Blockchain announ...|2016-05-17| Open|
|2016-05-15|  10|11FS Blockchain C...|2016-05-15|Close|
|2016-05-12|   0|How Revolutionary...|2016-05-12| Open|
|2016-05-10|  10|New Yorkers Tout ...|2016-05-10|Close|
|2016-05-03|  18|CoinReport  Austr...|2016-05-04| Open|
|2016-05-03|   0|How can blockchai...|2016-05-03| Open|
|2016-05-02|  12|Blockchain  More ...|2016-05-02

In [14]:
# calculate sentiment scores for title, description and content
analyzer = SentimentIntensityAnalyzer()

In [15]:
@f.udf(returnType=DoubleType())
def calculate_sentiment_score(text):
    score = analyzer.polarity_scores(text)['compound']
    return score

In [16]:
df4 = df3.withColumn('score', calculate_sentiment_score(df3['Alltext']))

In [17]:
df4.show()

+----------+----+--------------------+----------+-----+-------+
|       Day|hour|             Alltext|  TrueDate| Type|  score|
+----------+----+--------------------+----------+-----+-------+
|2016-05-30|  14|Dubai looks to im...|2016-05-30|Close| 0.9892|
|2016-05-26|  23|How Blockchain Te...|2016-05-27| Open|  0.999|
|2016-05-23|  17|From the Internet...|2016-05-24| Open|  0.933|
|2016-05-22|  16|New Experiment Al...|2016-05-23| Open| 0.9926|
|2016-05-21|   0|Science and techn...|2016-05-21| Open| 0.9794|
|2016-05-20|  22|Why The Blockchai...|2016-05-21| Open|-0.9266|
|2016-05-20|  18|THE BLOCKCHAIN RE...|2016-05-21| Open| 0.9648|
|2016-05-18|   0|Banking with Bloc...|2016-05-18| Open| 0.9981|
|2016-05-16|  17|Blockchain announ...|2016-05-17| Open| 0.7322|
|2016-05-15|  10|11FS Blockchain C...|2016-05-15|Close| 0.9885|
|2016-05-12|   0|How Revolutionary...|2016-05-12| Open| 0.9931|
|2016-05-10|  10|New Yorkers Tout ...|2016-05-10|Close| 0.9486|
|2016-05-03|  18|CoinReport  Austr...|20

In [18]:
df4 = df4.groupBy(['TrueDate', 'Type']).agg(f.avg("score").alias("AverageScore"))

In [103]:
df4.show()

+----------+-----+-------------------+
|  TrueDate| Type|       AverageScore|
+----------+-----+-------------------+
|2021-03-31| Open| 0.6719461538461545|
|2021-04-03| Open| 0.2891708333333335|
|2016-07-05| Open|             0.9879|
|2021-03-29|Close| 0.3657036496350363|
|2021-04-06| Open|  0.646568817204301|
|2016-10-05| Open|             0.9509|
|2016-05-27| Open|              0.999|
|2021-04-14|Close|-0.3307480769230771|
|2021-03-24| Open|0.06080416666666664|
|2016-04-04| Open|             0.9878|
|2016-05-30|Close|             0.9892|
|2021-03-21| Open|             0.8243|
|2021-04-16|Close|          0.6268125|
|2021-04-11| Open| 0.9886999999999999|
|2021-04-01|Close| 0.6732446808510636|
|2021-03-25| Open|  0.349928888888889|
|2016-05-17| Open|             0.7322|
|2016-06-08|Close|             0.8081|
|2016-06-19|Close|             0.9001|
|2021-04-15|Close|             0.9958|
+----------+-----+-------------------+
only showing top 20 rows



In [19]:
df5 = df4.orderBy("TrueDate", f.desc("Type"))

In [20]:
df5.show()

+----------+-----+------------+
|  TrueDate| Type|AverageScore|
+----------+-----+------------+
|2016-04-02| Open|     -0.1307|
|2016-04-04| Open|      0.9878|
|2016-04-05| Open|     -0.1027|
|2016-04-07| Open|      0.1104|
|2016-04-12|Close|       -0.07|
|2016-04-14| Open|      0.2306|
|2016-04-16| Open|     -0.7894|
|2016-04-18|Close|      0.7964|
|2016-04-23| Open|      0.9896|
|2016-04-23|Close|      0.8387|
|2016-04-25|Close|     -0.5994|
|2016-04-28| Open|       0.974|
|2016-04-28|Close|      0.8905|
|2016-04-29| Open|      0.9699|
|2016-04-29|Close|     -0.5106|
|2016-05-01|Close|      0.9969|
|2016-05-02|Close|     -0.5267|
|2016-05-03| Open|      0.9928|
|2016-05-04| Open|      0.9773|
|2016-05-10|Close|      0.9486|
+----------+-----+------------+
only showing top 20 rows



In [30]:
dict(df5.dtypes)

{'TrueDate': 'date', 'Type': 'string', 'AverageScore': 'double'}

##### General a full dataframe

In [31]:
import datetime

In [33]:
startdate = datetime.date(2016,3,31)
enddate = datetime.date(2021,4,16)

In [48]:
full_dict = {'TrueDate':[], 'Type':[]}
cdate = startdate
while cdate <= enddate:
    full_dict['TrueDate'].extend([cdate, cdate])
    full_dict['Type'].extend(['Open', 'Close'])
    cdate += datetime.timedelta(days=1)

In [49]:
df_ref_pd = pd.DataFrame(full_dict)

In [51]:
df_ref = spark.createDataFrame(df_ref_pd)

In [52]:
df_ref.show()

+----------+-----+
|  TrueDate| Type|
+----------+-----+
|2016-03-31| Open|
|2016-03-31|Close|
|2016-04-01| Open|
|2016-04-01|Close|
|2016-04-02| Open|
|2016-04-02|Close|
|2016-04-03| Open|
|2016-04-03|Close|
|2016-04-04| Open|
|2016-04-04|Close|
|2016-04-05| Open|
|2016-04-05|Close|
|2016-04-06| Open|
|2016-04-06|Close|
|2016-04-07| Open|
|2016-04-07|Close|
|2016-04-08| Open|
|2016-04-08|Close|
|2016-04-09| Open|
|2016-04-09|Close|
+----------+-----+
only showing top 20 rows



#### Outer Join df_ref and df_5 or df_4

In [100]:
df6 = df_ref.join(df4, on=['TrueDate', 'Type'], how='left_outer')

In [101]:
df6 = df6.na.fill(value=0,subset=["AverageScore"])

In [102]:
df6.show()

+----------+-----+------------+
|  TrueDate| Type|AverageScore|
+----------+-----+------------+
|2016-03-31| Open|         0.0|
|2016-03-31|Close|         0.0|
|2016-04-01| Open|         0.0|
|2016-04-01|Close|         0.0|
|2016-04-02| Open|     -0.1307|
|2016-04-02|Close|         0.0|
|2016-04-03| Open|         0.0|
|2016-04-03|Close|         0.0|
|2016-04-04| Open|      0.9878|
|2016-04-04|Close|         0.0|
|2016-04-05| Open|     -0.1027|
|2016-04-05|Close|         0.0|
|2016-04-06| Open|         0.0|
|2016-04-06|Close|         0.0|
|2016-04-07| Open|      0.1104|
|2016-04-07|Close|         0.0|
|2016-04-08| Open|         0.0|
|2016-04-08|Close|         0.0|
|2016-04-09| Open|         0.0|
|2016-04-09|Close|         0.0|
+----------+-----+------------+
only showing top 20 rows



In [58]:
from pyspark.sql import Window

In [59]:
w = Window.partitionBy('TrueDate').orderBy(f.desc('Type'))

df7 = df6.withColumn(
            'Open_Close', f.collect_list('AverageScore').over(w)
        )\
        .groupBy('TrueDate').agg(f.max('Open_Close').alias('Open_Close'))

In [73]:
df7.sort("TrueDate").show()

+----------+--------------+
|  TrueDate|    Open_Close|
+----------+--------------+
|2016-03-31|    [0.0, 0.0]|
|2016-04-01|    [0.0, 0.0]|
|2016-04-02|[-0.1307, 0.0]|
|2016-04-03|    [0.0, 0.0]|
|2016-04-04| [0.9878, 0.0]|
|2016-04-05|[-0.1027, 0.0]|
|2016-04-06|    [0.0, 0.0]|
|2016-04-07| [0.1104, 0.0]|
|2016-04-08|    [0.0, 0.0]|
|2016-04-09|    [0.0, 0.0]|
|2016-04-10|    [0.0, 0.0]|
|2016-04-11|    [0.0, 0.0]|
|2016-04-12|  [0.0, -0.07]|
|2016-04-13|    [0.0, 0.0]|
|2016-04-14| [0.2306, 0.0]|
|2016-04-15|    [0.0, 0.0]|
|2016-04-16|[-0.7894, 0.0]|
|2016-04-17|    [0.0, 0.0]|
|2016-04-18| [0.0, 0.7964]|
|2016-04-19|    [0.0, 0.0]|
+----------+--------------+
only showing top 20 rows



In [77]:
window_duration = '11 day'
slide_duration = '1 day'

df8 = df7.orderBy('TrueDate').groupBy(f.window('TrueDate', window_duration, slide_duration)) \
    .agg(f.collect_list('Open_Close')) \
    .withColumnRenamed('collect_list(Open_Close)', 'NewsScore')

In [78]:
# flatten
df8 = df8.withColumn('NewsScore', f.flatten(df8['NewsScore']))

In [79]:
df8.sort("window").show()

+--------------------+--------------------+
|              window|           NewsScore|
+--------------------+--------------------+
|{2016-03-20 20:00...|          [0.0, 0.0]|
|{2016-03-21 20:00...|[0.0, 0.0, 0.0, 0.0]|
|{2016-03-22 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-23 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-24 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-25 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-26 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-27 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-28 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-29 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-30 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-31 20:00...|[0.0, 0.0, -0.130...|
|{2016-04-01 20:00...|[-0.1307, 0.0, 0....|
|{2016-04-02 20:00...|[0.0, 0.0, 0.9878...|
|{2016-04-03 20:00...|[0.9878, 0.0, -0....|
|{2016-04-04 20:00...|[-0.1027, 0.0, 0....|
|{2016-04-05 20:00...|[0.0, 0.0, 0.1104...|
|{2016-04-06 20:00...|[0.1104, 0.0, 0.0...|
|{2016-04-07 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-04-08 20:00...|[0.0, 0.0,

In [80]:
df8 = df8.withColumn('array_length', f.size("NewsScore"))

In [81]:
df8 = df8.filter((df8.array_length == 22))

In [82]:
df8 = df8.select('window','NewsScore')

In [83]:
df8.sort('window').show()

+--------------------+--------------------+
|              window|           NewsScore|
+--------------------+--------------------+
|{2016-03-30 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-03-31 20:00...|[0.0, 0.0, -0.130...|
|{2016-04-01 20:00...|[-0.1307, 0.0, 0....|
|{2016-04-02 20:00...|[0.0, 0.0, 0.9878...|
|{2016-04-03 20:00...|[0.9878, 0.0, -0....|
|{2016-04-04 20:00...|[-0.1027, 0.0, 0....|
|{2016-04-05 20:00...|[0.0, 0.0, 0.1104...|
|{2016-04-06 20:00...|[0.1104, 0.0, 0.0...|
|{2016-04-07 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-04-08 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-04-09 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-04-10 20:00...|[0.0, 0.0, 0.0, -...|
|{2016-04-11 20:00...|[0.0, -0.07, 0.0,...|
|{2016-04-12 20:00...|[0.0, 0.0, 0.2306...|
|{2016-04-13 20:00...|[0.2306, 0.0, 0.0...|
|{2016-04-14 20:00...|[0.0, 0.0, -0.789...|
|{2016-04-15 20:00...|[-0.7894, 0.0, 0....|
|{2016-04-16 20:00...|[0.0, 0.0, 0.0, 0...|
|{2016-04-17 20:00...|[0.0, 0.7964, 0.0...|
|{2016-04-18 20:00...|[0.0, 0.0,

#### Combine the two

In [97]:
df_final = df1.join(df8, on=['window'], how='left_outer')

In [99]:
df_final.orderBy('window').show()

+--------------------+--------------------+--------------------+
|              window|          StockPrice|           NewsScore|
+--------------------+--------------------+--------------------+
|{2016-03-30 20:00...|[0.0, 0.500000152...|[0.0, 0.0, 0.0, 0...|
|{2016-03-31 20:00...|[0.0, 0.061224916...|[0.0, 0.0, -0.130...|
|{2016-04-01 20:00...|[0.0, 0.0, 0.0, 0...|[-0.1307, 0.0, 0....|
|{2016-04-02 20:00...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.9878...|
|{2016-04-03 20:00...|[0.0, 0.0, 0.5434...|[0.9878, 0.0, -0....|
|{2016-04-04 20:00...|[0.0, 0.499999574...|[-0.1027, 0.0, 0....|
|{2016-04-05 20:00...|[0.63157815484152...|[0.0, 0.0, 0.1104...|
|{2016-04-06 20:00...|[0.05263151290346...|[0.1104, 0.0, 0.0...|
|{2016-04-07 20:00...|[0.21052605161384...|[0.0, 0.0, 0.0, 0...|
|{2016-04-08 20:00...|[0.0, 0.021505348...|[0.0, 0.0, 0.0, 0...|
|{2016-04-09 20:00...|[0.01111109639391...|[0.0, 0.0, 0.0, 0...|
|{2016-04-10 20:00...|[0.05555548196960...|[0.0, 0.0, 0.0, -...|
|{2016-04-11 20:00...|[0.