In [None]:
# install spark-nlp: recommended to use this command in bash
# !conda install --yes -c johnsnowlabs spark-nlp

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from sparknlp.pretrained import PretrainedPipeline

from pyspark.ml import Pipeline
from sparknlp.annotator import *
from sparknlp.base import *
from pyspark.sql.functions import explode, col
from functools import reduce

import os
import re
import json
import sparknlp
import pandas as pd
import numpy as np

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

## 1. Configure and start Spark Session: ##
* master('local[*]') -> as many workers as logical cores on machine
* arrow szhould be enabled to use pandas_udf
* spark-nlp enabled (alternative: use spark = sparknlp.start())
* spark.rpc.message.maxSize - 128 is default, increase when java errors observed with rpc size

In [2]:
%%time
# def start_spark():
#     builder = SparkSession.builder \
#     .appName("tsmp - social media based stock market value prediction")\
#     .master('local[5]') \
#     .config("spark.driver.memory","16G")\
#     .config("spark.driver.maxResultSize", "0") \
#     .config("spark.kryoserializer.buffer.max", "2000M")\
#     .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.1.0")\      
#     .config("spark.executor.memory", "2G") \
#     .config('spark.sql.execution.arrow.enabled', 'true') \
#     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
#     .config("spark.kryoserializer.buffer.max", "1000M")\
#     .config("spark.rpc.message.maxSize", "1000")
#     return builder.getOrCreate()

# spark = start_spark()

spark = sparknlp.start()

CPU times: user 23.2 ms, sys: 42.8 ms, total: 66.1 ms
Wall time: 8.58 s


## 2. Tweets data preprocessing ##

In [3]:
# tweets_path = '/home/jovyan/work/input_data/json/tweets_merged/'
tweets_path = '/home/jovyan/my_files/json/tweets_merged/'

In [57]:
# Initial preprocessing of tweets json data
def json_tweets_preprocessing(tweets_json_df):
    
    # explode data array to rows of twitter data
    tweets_rows = tweets_json_df \
        .select(explode("data").alias("tweets"))
        # .limit(4500)
       
    # remove tweets with language different than English
    # select only columns used in further processing 
    # leave only distinct values (remove obvious duplicates)
    tweets_data = tweets_rows \
        .filter(
            col("tweets.lang") == "en"
        ) \
        .filter(
            (col("tweets.public_metrics.like_count") >= 10) | \
            (col("tweets.public_metrics.quote_count") >= 10) | \
            (col("tweets.public_metrics.reply_count") >= 10) | \
            (col("tweets.public_metrics.retweet_count") >= 10)
        ) \
        .select(
            col("tweets.id").alias("id") 
          , col("tweets.text").alias("text")
          , col("tweets.public_metrics.like_count").alias("like_count")
          , col("tweets.public_metrics.quote_count").alias("quote_count")
          , col("tweets.public_metrics.reply_count").alias("reply_count")
          , col("tweets.public_metrics.retweet_count").alias("retweet_count")
          , col("tweets.author_id").alias("author_id")
          , col("tweets.created_at").alias("created_at")
          , col("tweets.conversation_id").alias("conversation_id")
          , col("tweets.in_reply_to_user_id").alias("in_reply_to_user_id")
        ) \
        .distinct() \
        .withColumn("created_at_timestamp",F.from_utc_timestamp(col("created_at"), "GMT")) \
        .withColumn("created_at_date", F.to_date(col("created_at"))) \
        .orderBy("created_at_timestamp", ascending=True)


    
    return tweets_data

In [58]:
%%time
tweets_json_file_paths = []
for file in os.listdir(tweets_path):
    if file.startswith("tweets") and file.endswith(".json"): 
        tweets_json_file_paths.append(os.path.join(tweets_path, file))
print(f'Files to analyze: {len(tweets_json_file_paths)}')

Files to analyze: 10
CPU times: user 1.53 ms, sys: 809 µs, total: 2.33 ms
Wall time: 1.31 ms


In [59]:
%%time
for tweets_json_file_path in tweets_json_file_paths:
    company_code_pattern = "merged\/tweets_(.*?)_merged"
    company_code = re.search(company_code_pattern, tweets_json_file_path).group(1)
    print(f'file found: {tweets_json_file_path} for company {company_code}')

file found: /home/jovyan/my_files/json/tweets_merged/tweets_AMZN_merged_1611788377913.json for company AMZN
file found: /home/jovyan/my_files/json/tweets_merged/tweets_MSFT_merged_1611788415837.json for company MSFT
file found: /home/jovyan/my_files/json/tweets_merged/tweets_MCD_merged_1611788399786.json for company MCD
file found: /home/jovyan/my_files/json/tweets_merged/tweets_INTC_merged_1611788389654.json for company INTC
file found: /home/jovyan/my_files/json/tweets_merged/tweets_NFLX_merged_1611788430702.json for company NFLX
file found: /home/jovyan/my_files/json/tweets_merged/tweets_TSLA_merged_1611788475071.json for company TSLA
file found: /home/jovyan/my_files/json/tweets_merged/tweets_AMD_merged_1611788368385.json for company AMD
file found: /home/jovyan/my_files/json/tweets_merged/tweets_PFE_merged_1611788444379.json for company PFE
file found: /home/jovyan/my_files/json/tweets_merged/tweets_SBUX_merged_1611788459996.json for company SBUX
file found: /home/jovyan/my_files/

## 3. Tweets sentiment analysis ##

Configuration for tweets sentiment analysis. 
For performance puproses we create pretrained pipeline once (~950 MB download, issues observed with model loaded from disk and checking that model is loaded takes time...)

In [44]:
%%time
sentiment_analysis_pipeline_dl_use_twitter = PretrainedPipeline("analyze_sentimentdl_use_twitter", lang = "en")

analyze_sentimentdl_use_twitter download started this may take some time.
Approx size to download 935.1 MB
[OK!]
CPU times: user 51.7 ms, sys: 30.6 ms, total: 82.3 ms
Wall time: 4.01 s


In [60]:
from pyspark.sql.functions import when

# analyze sentiment using pretrained pipeline from Spark NLP project
# sentiment_dl_sparknlp_use_twitter https://nlp.johnsnowlabs.com/2021/01/18/analyze_sentimentdl_use_twitter_en.html
# pipeline based on Universal Sentence Encoder
# pretrained with data from Twitter
# is_sentiment_positive & is_sentiment_negative added for easier data manipulation
def analyze_sentiment_pretrained_pipeline(text_list, sentiment_analysis_pipeline):
    pipeline = sentiment_analysis_pipeline
        
    sentiment_result = pipeline.fullAnnotate(text_list,'sentiment_dl_sparknlp_use_twitter')
    
    resultdf = sentiment_result.select(F.explode(F.arrays_zip('document.result', 'sentiment.result')).alias("cols")) \
        .select(F.expr("cols['0']").alias("text"),
                F.expr("cols['1']").alias("sentiment_use_twitter")) \
        .withColumn("is_sentiment_positive", when(col("sentiment_use_twitter") == "positive",1).otherwise(0)) \
        .withColumn("is_sentiment_negative", when(col("sentiment_use_twitter") == "negative",1).otherwise(0))
    return resultdf

Iterate twitter data json files.
Create tweets_with_sentiment_dfs dictionary of DataFrames for further processing.

In [61]:
%%time

tweets_with_sentiment_dfs = {}

for tweets_json_file_path in tweets_json_file_paths:
    company_code_pattern = "merged\/tweets_(.*?)_merged"
    company_code = re.search(company_code_pattern, tweets_json_file_path).group(1)
    
    # for testing purpose we will process only one company file: AMZN
    #if company_code != 'AMZN':
    #    continue
    
    print(f'reading {tweets_json_file_path} for company {company_code}')
    tweets_json_df = spark.read.option("multiline", True).json(tweets_json_file_path)
    tweets_data = json_tweets_preprocessing(tweets_json_df)
       
    print(f'analyzing sentiment...')
    sentiment_resultdf = analyze_sentiment_pretrained_pipeline\
        (tweets_data.select('text'),sentiment_analysis_pipeline_dl_use_twitter)
     
    print(f'merging sentiment data with tweets data...')
    tweets_with_sentiment_dfs[company_code] = tweets_data.join(sentiment_resultdf, on="text", how="outer")

reading /home/jovyan/my_files/json/tweets_merged/tweets_AMZN_merged_1611788377913.json for company AMZN
analyzing sentiment...
merging sentiment data with tweets data...
reading /home/jovyan/my_files/json/tweets_merged/tweets_MSFT_merged_1611788415837.json for company MSFT
analyzing sentiment...
merging sentiment data with tweets data...
reading /home/jovyan/my_files/json/tweets_merged/tweets_MCD_merged_1611788399786.json for company MCD
analyzing sentiment...
merging sentiment data with tweets data...
reading /home/jovyan/my_files/json/tweets_merged/tweets_INTC_merged_1611788389654.json for company INTC
analyzing sentiment...
merging sentiment data with tweets data...
reading /home/jovyan/my_files/json/tweets_merged/tweets_NFLX_merged_1611788430702.json for company NFLX
analyzing sentiment...
merging sentiment data with tweets data...
reading /home/jovyan/my_files/json/tweets_merged/tweets_TSLA_merged_1611788475071.json for company TSLA
analyzing sentiment...
merging sentiment data wi

In [10]:
tweets_with_sentiment_dfs['TSLA'].describe()

DataFrame[summary: string, text: string, id: string, like_count: string, quote_count: string, reply_count: string, retweet_count: string, author_id: string, created_at: string, conversation_id: string, in_reply_to_user_id: string, sentiment_use_twitter: string, is_sentiment_positive: string, is_sentiment_negative: string]

Example stats of sentiment (very slow due to usage of group by, better to be used with window functions):

In [62]:
%%time
for company_code in tweets_with_sentiment_dfs.keys():
    
#     if company_code != 'TSLA':
#         continue
    
    print(f'Sentiment stats for {company_code}')
    tweets_with_sentiment_dfs[company_code].groupBy('sentiment_use_twitter').count().show()
    # print(f'Data time window start: {tweets_with_sentiment_dfs[key].first()}')
    # print(f'Data time window end: {tweets_with_sentiment_dfs[key].last().}')

Sentiment stats for AMZN
+---------------------+-----+
|sentiment_use_twitter|count|
+---------------------+-----+
|             positive|  825|
|              neutral|   32|
|             negative|  365|
+---------------------+-----+

Sentiment stats for MSFT
+---------------------+-----+
|sentiment_use_twitter|count|
+---------------------+-----+
|             positive| 1001|
|              neutral|   56|
|             negative|  436|
+---------------------+-----+

Sentiment stats for MCD
+---------------------+-----+
|sentiment_use_twitter|count|
+---------------------+-----+
|             positive|  992|
|              neutral|  122|
|             negative| 1182|
+---------------------+-----+

Sentiment stats for INTC
+---------------------+-----+
|sentiment_use_twitter|count|
+---------------------+-----+
|             positive|  770|
|              neutral|   37|
|             negative|  250|
+---------------------+-----+

Sentiment stats for NFLX
+---------------------+-----+
|s

In [10]:
%%time
tweets_with_sentiment_dfs['AMZN'].printSchema()

root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- like_count: long (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- author_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- conversation_id: string (nullable = true)
 |-- in_reply_to_user_id: string (nullable = true)
 |-- created_at_timestamp: timestamp (nullable = true)
 |-- created_at_date: date (nullable = true)
 |-- sentiment_use_twitter: string (nullable = true)
 |-- is_sentiment_positive: integer (nullable = true)
 |-- is_sentiment_negative: integer (nullable = true)

CPU times: user 3.25 ms, sys: 0 ns, total: 3.25 ms
Wall time: 11.4 ms


In [None]:
%%time
tweets_with_sentiment_dfs['AMZN'].show(10)

Write results to disk as parquet (serialize for further use), 
```.coalesce(1).cache()``` 
was faster when saving to database


In [None]:
# %%time
# for key in tweets_with_sentiment_dfs.keys():
#     file_name = f'merged_json_run_files/tweets_with_sentiment_{key}_parquet'
#     print(f'Saving {key} data to parquet file: {file_name}, total sentiment stats for this file:')
#     tweets_with_sentiment_dfs[key] \
#         .cache() \
#         .write.mode('overwrite') \
#         .partitionBy('created_at_date') \
#         .parquet(file_name)

Write results to hive/database (for testing purposes)

In [None]:
# %%time
# for key in tweets_with_sentiment_dfs.keys():
#     print(f'saving {key} data to table')
#     tweets_with_sentiment_dfs[key] \
#         .cache() \
#         .write.mode('overwrite') \
#         .partitionBy('created_at_date') \
#         .saveAsTable(f'tweets_with_sentiment_{key}_table')

In [None]:
# spark.catalog.listDatabases()

In [None]:
# spark.catalog.listTables()

## 4. Stock data preprocessing ##

Settings for stock data

In [11]:
yfinance_data_path = '/home/jovyan/work/input_data/json/stock/'

Functions used by stock data preprocessing

In [12]:
from pyspark.sql.functions import round
from pyspark.sql.functions import when

# avg_tweet_lifespan = 18
# alternative_tweet_adjust = 1

def yfinance_json_to_df(file_name, yfinance_data_path, yfinance_data_dfs):
    
    file_path = os.path.join(yfinance_data_path, file_name)
    yfinance_json = spark.read.json(file_path)
    
    company_code_pattern = "stock_yfinance_(.*?)_"
    company_code = re.search(company_code_pattern, file_name).group(1)
    
    ticker_interval_pattern = "_ti(.*?)m_"
    ticker_interval =  re.search(ticker_interval_pattern, file_name).group(1)
    
    interval_expr = ''
    if ticker_interval == "15" or "30" or "60":
        interval_expr = f'INTERVAL {ticker_interval} MINUTES'
    else:
        raise Exception("Ticker not found or not supported")
    
    # adj_start_interval_expr = f'INTERVAL {alternative_tweet_adjust} MINUTES'
    
    # explode data array to rows of yahoo finance data
    yfinance_rows = yfinance_json.select(explode("data").alias("records"))
    
    # round values for close/open/high/low to $0.001
    # add columns with more descriptive data about stock result (gain/loss/no_change)
    yfinance_data_df = yfinance_rows \
        .select(
            col("records.Datetime").alias("records_datetime")
          , F.round(col("records.Close"),3).alias("close") 
          , F.round(col("records.Open"),3).alias("open")
          , F.round(col("records.High"),3).alias("high")
          , F.round(col("records.Low"),3).alias("low")
          , col("records.Volume").alias("volume")
          , col("records.Dividends").alias("dividends")
          , col("records.Stock Splits").alias("stock_splits")
        ) \
        .withColumn("records_date", F.to_date(col("records_datetime"))) \
        .withColumn("open_timestamp", F.from_utc_timestamp(col("records_datetime"), "GMT")) \
        .withColumn("close_timestamp", col("open_timestamp")+F.expr(interval_expr)) \
        .withColumn("result_percent", F.round((col("close")-col("open"))/col("open"),5)) \
        .withColumn("result_numeric", when( col("close") >= col("open"), 1) \
            .otherwise(0)
        ) \
        .withColumn("result", when( col("close") >= col("open"), "gain") \
            .otherwise("loss")
        ) \
        .orderBy("open_timestamp", ascending=True)

        #.withColumn("result_numeric", when( col("close") > col("open"), 1) \
        #    .when( col("close") < col("open"), -1) \
        #    .otherwise(0)
    
        #.withColumn("result", when( col("close") > col("open"), "gain") \
        #    .when( col("close") < col("open"), "loss") \
        #    .otherwise("no_change")
        #.withColumn("adjusted_tweet_start_timestamp", col("open_timestamp")-F.expr(adj_start_interval_expr)) \
    
    if company_code not in yfinance_data_dfs.keys():
        yfinance_data_dfs[company_code] = {}
        
    print(f'ticker time: {ticker_interval}, company code: {company_code}')
    
    yfinance_data_dfs[company_code][ticker_interval] = yfinance_data_df

In [13]:
%%time
yfinance_data_dfs = {}

for file_name in os.listdir(yfinance_data_path):
    if file_name.startswith("stock_yfinance") and file_name.endswith('.json'):
        yfinance_json_to_df(file_name, yfinance_data_path, yfinance_data_dfs)         

ticker time: 15, company code: AMD
ticker time: 30, company code: AMD
ticker time: 60, company code: AMD
ticker time: 15, company code: AMZN
ticker time: 30, company code: AMZN
ticker time: 60, company code: AMZN
ticker time: 15, company code: INTC
ticker time: 30, company code: INTC
ticker time: 60, company code: INTC
ticker time: 15, company code: MCD
ticker time: 30, company code: MCD
ticker time: 60, company code: MCD
ticker time: 15, company code: MSFT
ticker time: 30, company code: MSFT
ticker time: 60, company code: MSFT
ticker time: 15, company code: NFLX
ticker time: 30, company code: NFLX
ticker time: 60, company code: NFLX
ticker time: 15, company code: PFE
ticker time: 30, company code: PFE
ticker time: 60, company code: PFE
ticker time: 15, company code: QCOM
ticker time: 30, company code: QCOM
ticker time: 60, company code: QCOM
ticker time: 15, company code: SBUX
ticker time: 30, company code: SBUX
ticker time: 60, company code: SBUX
ticker time: 15, company code: TSLA
t

In [None]:
len(yfinance_data_dfs)

More about data from Yahoo Finance:

In [14]:
%%time
for company_code in sorted(yfinance_data_dfs.keys()):
    for ticker_interval in sorted(yfinance_data_dfs[company_code].keys()):
        print(f'company_code: {company_code}, ticker_interval: {ticker_interval}')

company_code: AMD, ticker_interval: 15
company_code: AMD, ticker_interval: 30
company_code: AMD, ticker_interval: 60
company_code: AMZN, ticker_interval: 15
company_code: AMZN, ticker_interval: 30
company_code: AMZN, ticker_interval: 60
company_code: INTC, ticker_interval: 15
company_code: INTC, ticker_interval: 30
company_code: INTC, ticker_interval: 60
company_code: MCD, ticker_interval: 15
company_code: MCD, ticker_interval: 30
company_code: MCD, ticker_interval: 60
company_code: MSFT, ticker_interval: 15
company_code: MSFT, ticker_interval: 30
company_code: MSFT, ticker_interval: 60
company_code: NFLX, ticker_interval: 15
company_code: NFLX, ticker_interval: 30
company_code: NFLX, ticker_interval: 60
company_code: PFE, ticker_interval: 15
company_code: PFE, ticker_interval: 30
company_code: PFE, ticker_interval: 60
company_code: QCOM, ticker_interval: 15
company_code: QCOM, ticker_interval: 30
company_code: QCOM, ticker_interval: 60
company_code: SBUX, ticker_interval: 15
company_c

In [15]:
%%time
yfinance_data_dfs["AMZN"]["60"].printSchema()

root
 |-- records_datetime: string (nullable = true)
 |-- close: double (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- dividends: long (nullable = true)
 |-- stock_splits: long (nullable = true)
 |-- records_date: date (nullable = true)
 |-- open_timestamp: timestamp (nullable = true)
 |-- close_timestamp: timestamp (nullable = true)
 |-- result_percent: double (nullable = true)
 |-- result_numeric: integer (nullable = false)
 |-- result: string (nullable = false)

CPU times: user 1.28 ms, sys: 683 µs, total: 1.96 ms
Wall time: 1.01 ms


In [None]:
%%time
yfinance_data_dfs["AMZN"]["15"].show(10)

## 5. Merge tweets data with stock data ##

Stock data are in this case significantly smaller than tweets data (for single company and single week it is tens of kB vs hundreds of MB, about 1000 times smaller). That is the reason why *broadcast hash join* was used.

In [16]:
%%time
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import desc


tweets_with_sentiment_and_stock_results_df = {}

for company_code in tweets_with_sentiment_dfs:
    
    # when we do not have stock data for this company we check another one
    if company_code not in yfinance_data_dfs:
        continue
    
    if company_code not in tweets_with_sentiment_and_stock_results_df:
        tweets_with_sentiment_and_stock_results_df[company_code] = {}
    
    for ticker_interval in yfinance_data_dfs[company_code]:
        
        # if company_code != 'AMZN':
        #    continue
        
        print(f'company code: {company_code}, ticker interval: {ticker_interval}')
        
        stock_data_df = yfinance_data_dfs[company_code][ticker_interval]
        tweets_sentiment_df = tweets_with_sentiment_dfs[company_code]
        
        result_df = tweets_sentiment_df \
            .join(broadcast(yfinance_data_dfs[company_code][ticker_interval]), \
                  (stock_data_df.open_timestamp <= tweets_sentiment_df.created_at_timestamp) \
                  & (tweets_sentiment_df.created_at_timestamp < stock_data_df.close_timestamp), \
                  how='inner')
#         result_df = tweets_sentiment_df \
#             .join(broadcast(yfinance_data_dfs[company_code][ticker_interval]), \
#                   (stock_data_df.adjusted_tweet_start_timestamp <= tweets_sentiment_df.created_at_timestamp) \
#                   & (tweets_sentiment_df.created_at_timestamp < stock_data_df.close_timestamp), \
#                   how='inner')
        
        tweets_with_sentiment_and_stock_results_df[company_code][ticker_interval] = result_df

company code: AMZN, ticker interval: 15
company code: AMZN, ticker interval: 30
company code: AMZN, ticker interval: 60
company code: MSFT, ticker interval: 15
company code: MSFT, ticker interval: 30
company code: MSFT, ticker interval: 60
company code: MCD, ticker interval: 15
company code: MCD, ticker interval: 30
company code: MCD, ticker interval: 60
company code: INTC, ticker interval: 15
company code: INTC, ticker interval: 30
company code: INTC, ticker interval: 60
company code: NFLX, ticker interval: 15
company code: NFLX, ticker interval: 30
company code: NFLX, ticker interval: 60
company code: TSLA, ticker interval: 15
company code: TSLA, ticker interval: 30
company code: TSLA, ticker interval: 60
company code: AMD, ticker interval: 15
company code: AMD, ticker interval: 30
company code: AMD, ticker interval: 60
company code: PFE, ticker interval: 15
company code: PFE, ticker interval: 30
company code: PFE, ticker interval: 60
company code: SBUX, ticker interval: 15
company c

In [20]:
tweets_with_sentiment_dfs

{'AMZN': DataFrame[text: string, id: string, like_count: bigint, quote_count: bigint, reply_count: bigint, retweet_count: bigint, author_id: string, created_at: string, conversation_id: string, in_reply_to_user_id: string, created_at_timestamp: timestamp, created_at_date: date, sentiment_use_twitter: string, is_sentiment_positive: int, is_sentiment_negative: int],
 'MSFT': DataFrame[text: string, id: string, like_count: bigint, quote_count: bigint, reply_count: bigint, retweet_count: bigint, author_id: string, created_at: string, conversation_id: string, in_reply_to_user_id: string, created_at_timestamp: timestamp, created_at_date: date, sentiment_use_twitter: string, is_sentiment_positive: int, is_sentiment_negative: int],
 'MCD': DataFrame[text: string, id: string, like_count: bigint, quote_count: bigint, reply_count: bigint, retweet_count: bigint, author_id: string, created_at: string, conversation_id: string, in_reply_to_user_id: string, created_at_timestamp: timestamp, created_at_

In [17]:
for company_code in tweets_with_sentiment_and_stock_results_df:
    for ticker_interval in tweets_with_sentiment_and_stock_results_df[company_code]:
        print(f"tweets_with_sentiment_and_stock_results_df has company_code:{company_code} ticker_interval:{ticker_interval}")

tweets_with_sentiment_and_stock_results_df has company_code:AMZN ticker_interval:15
tweets_with_sentiment_and_stock_results_df has company_code:AMZN ticker_interval:30
tweets_with_sentiment_and_stock_results_df has company_code:AMZN ticker_interval:60
tweets_with_sentiment_and_stock_results_df has company_code:MSFT ticker_interval:15
tweets_with_sentiment_and_stock_results_df has company_code:MSFT ticker_interval:30
tweets_with_sentiment_and_stock_results_df has company_code:MSFT ticker_interval:60
tweets_with_sentiment_and_stock_results_df has company_code:MCD ticker_interval:15
tweets_with_sentiment_and_stock_results_df has company_code:MCD ticker_interval:30
tweets_with_sentiment_and_stock_results_df has company_code:MCD ticker_interval:60
tweets_with_sentiment_and_stock_results_df has company_code:INTC ticker_interval:15
tweets_with_sentiment_and_stock_results_df has company_code:INTC ticker_interval:30
tweets_with_sentiment_and_stock_results_df has company_code:INTC ticker_interva

In [22]:
tweets_with_sentiment_and_stock_results_df["TSLA"]["15"].printSchema()

root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- like_count: long (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- author_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- conversation_id: string (nullable = true)
 |-- in_reply_to_user_id: string (nullable = true)
 |-- created_at_timestamp: timestamp (nullable = true)
 |-- created_at_date: date (nullable = true)
 |-- sentiment_use_twitter: string (nullable = true)
 |-- is_sentiment_positive: integer (nullable = true)
 |-- is_sentiment_negative: integer (nullable = true)
 |-- records_datetime: string (nullable = true)
 |-- close: double (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- dividends: long (nullable = true)
 |-- stock_splits: long (nullable = true)
 |-- reco

In [63]:
%%time
for company_code in tweets_with_sentiment_and_stock_results_df.keys():
    for ticker_interval in tweets_with_sentiment_and_stock_results_df[company_code].keys():
        
        if company_code != 'TSLA' or ticker_interval != "15":
            continue
            
        tweets_with_sentiment_and_stock_results_df[company_code][ticker_interval].show(10)

+--------------------+-------------------+----------+-----------+-----------+-------------+-------------------+--------------------+-------------------+-------------------+--------------------+---------------+---------------------+---------------------+---------------------+--------------------+-------+-------+------+-------+-------+---------+------------+------------+-------------------+-------------------+--------------+--------------+------+
|                text|                 id|like_count|quote_count|reply_count|retweet_count|          author_id|          created_at|    conversation_id|in_reply_to_user_id|created_at_timestamp|created_at_date|sentiment_use_twitter|is_sentiment_positive|is_sentiment_negative|    records_datetime|  close|   open|  high|    low| volume|dividends|stock_splits|records_date|     open_timestamp|    close_timestamp|result_percent|result_numeric|result|
+--------------------+-------------------+----------+-----------+-----------+-------------+-----------

In [None]:
## 6. Window operations ## 
# %%time
# from pyspark.sql.window import Window

# windowSpec = Window.partitionBy("close_timestamp").orderBy("created_at_timestamp")

# for key in tweets_with_sentiment_and_stock_results_df.keys():
#     for subkey in tweets_with_sentiment_and_stock_results_df[key].keys():
        
#         if key != 'AMZN' or subkey != "60":
#             continue
        
#         the_data = tweets_with_sentiment_and_stock_results_df[key][subkey]
        
#         the_data.withColumn("rank", F.rank().over(windowSpec)) \
#             .withColumn("dense_rank", F.dense_rank().over(windowSpec)) \
#             .withColumn("row_num", F.row_number().over(windowSpec)) \
#             .withColumn("popularity_factor", F.count(the_data['id']).over(windowSpec)) \
#             .withColumn("cum_likes", F.sum(the_data['like_count']).over(windowSpec)) \
#             .withColumn("cum_quotes", F.sum(the_data['quote_count']).over(windowSpec)) \
#             .withColumn("cum_replies", F.sum(the_data['reply_count']).over(windowSpec)) \
#             .withColumn("cum_retweets", F.sum(the_data['retweet_count']).over(windowSpec)) \
#             .withColumn("authors_count", F.count(the_data['author_id']).over(windowSpec)) \
#             .withColumn("conversations_count", F.count(the_data['conversation_id']).over(windowSpec)) \
#             .withColumn("cum_retweet_count", F.sum(the_data['retweet_count']).over(windowSpec)) \
#             .withColumn("positive_sentiment_factor", F.avg(the_data['is_sentiment_positive']).over(windowSpec)) \
#             .show(10)


## 6. GroupBy data in time intervals  ##

Group data, count aggregated tweet stats for each time interval, merge with stock data

In [64]:
%%time
from pyspark.sql.functions import length

aggregated_tweets_and_stock_df = {}

for company_code in tweets_with_sentiment_and_stock_results_df.keys():
    aggregated_tweets_and_stock_df[company_code] = {}
    
    for ticker_interval in tweets_with_sentiment_and_stock_results_df[company_code].keys():
               
        if company_code != 'TSLA' or ticker_interval != "15":
            continue
        
        aggregated_data = tweets_with_sentiment_and_stock_results_df[company_code][ticker_interval] \
            .groupBy("close_timestamp") \
            .agg( \
                F.count('id'), \
                F.sum('like_count'), \
                F.avg('like_count'), \
                F.sum('quote_count'), \
                F.avg('quote_count'), \
                F.sum('reply_count'), \
                F.avg('reply_count'), \
                F.sum('retweet_count'), \
                F.avg('retweet_count'), \
                F.countDistinct('author_id'), \
                F.countDistinct('conversation_id'), \
                F.avg(length('text')) , \
                F.sum('is_sentiment_positive'), \
                F.sum('is_sentiment_negative'), \
                F.avg('is_sentiment_positive'), \
                F.avg('is_sentiment_negative') \
                ) \
            .orderBy("close_timestamp", ascending=True)
        
        # F.sum('is_sentiment_positive') / F.sum('is_sentiment_negative'), \
        
        aggregated_tweets_and_stock_df[company_code][ticker_interval] = aggregated_data \
            .join(yfinance_data_dfs[company_code][ticker_interval], on="close_timestamp", how="outer") \
            .orderBy("close_timestamp", ascending=True) \
            .dropna() \
            
        aggregated_tweets_and_stock_df[company_code][ticker_interval].show(10)

+-------------------+---------+---------------+------------------+----------------+------------------+----------------+------------------+------------------+------------------+----------------+----------------------+------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------+-------+-------+-------+-------+-------+---------+------------+------------+-------------------+--------------+--------------+------+
|    close_timestamp|count(id)|sum(like_count)|   avg(like_count)|sum(quote_count)|  avg(quote_count)|sum(reply_count)|  avg(reply_count)|sum(retweet_count)|avg(retweet_count)|count(author_id)|count(conversation_id)| avg(length(text))|sum(is_sentiment_positive)|sum(is_sentiment_negative)|avg(is_sentiment_positive)|avg(is_sentiment_negative)|    records_datetime|  close|   open|   high|    low| volume|dividends|stock_splits|records_date|     open_timestamp|result_percent|result_numeric|result|
+---

In [54]:
aggregated_tweets_and_stock_df["TSLA"]["15"].printSchema()

root
 |-- close_timestamp: timestamp (nullable = true)
 |-- count(id): long (nullable = true)
 |-- sum(like_count): long (nullable = true)
 |-- avg(like_count): double (nullable = true)
 |-- sum(quote_count): long (nullable = true)
 |-- avg(quote_count): double (nullable = true)
 |-- sum(reply_count): long (nullable = true)
 |-- avg(reply_count): double (nullable = true)
 |-- sum(retweet_count): long (nullable = true)
 |-- avg(retweet_count): double (nullable = true)
 |-- count(author_id): long (nullable = true)
 |-- count(conversation_id): long (nullable = true)
 |-- avg(length(text)): double (nullable = true)
 |-- sum(is_sentiment_positive): long (nullable = true)
 |-- sum(is_sentiment_negative): long (nullable = true)
 |-- avg(is_sentiment_positive): double (nullable = true)
 |-- avg(is_sentiment_negative): double (nullable = true)
 |-- records_datetime: string (nullable = true)
 |-- close: double (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = tr

In [66]:
%%time
print((aggregated_tweets_and_stock_df["TSLA"]["15"].count(), len(aggregated_tweets_and_stock_df["TSLA"]["15"].columns)))

(126, 30)
CPU times: user 4.42 ms, sys: 0 ns, total: 4.42 ms
Wall time: 14.3 s


In [None]:
aggregated_tweets_and_stock_df

In [55]:
%%time
for company_code in tweets_with_sentiment_and_stock_results_df.keys():  
    for ticker_interval in tweets_with_sentiment_and_stock_results_df[company_code].keys():
        
        if company_code != 'TSLA' or ticker_interval != "15":
            continue
        
        aggregated_tweets_and_stock_df[company_code][ticker_interval] \
            .select(
                    col("close_timestamp"),\
                    col("avg(is_sentiment_positive)"),\
                    col("avg(is_sentiment_negative)"),\
                    col("count(id)"),\
                    col("sum(like_count)"),\
                    col("avg(like_count)"),\
                    col("sum(quote_count)"),\
                    col("avg(quote_count)"),\
                    col("sum(reply_count)"),\
                    col("avg(reply_count)"),\
                    col("sum(retweet_count)"),\
                    col("avg(retweet_count)"),\
                    col("avg(length(text))"),\
                    col("count(author_id)"),\
                    col("count(conversation_id)"),\
                    col("result_percent"),\
                    col("result_numeric"))\
            .show(100)

+-------------------+--------------------------+--------------------------+---------+---------------+------------------+----------------+-------------------+----------------+------------------+------------------+------------------+------------------+----------------+----------------------+--------------+--------------+
|    close_timestamp|avg(is_sentiment_positive)|avg(is_sentiment_negative)|count(id)|sum(like_count)|   avg(like_count)|sum(quote_count)|   avg(quote_count)|sum(reply_count)|  avg(reply_count)|sum(retweet_count)|avg(retweet_count)| avg(length(text))|count(author_id)|count(conversation_id)|result_percent|result_numeric|
+-------------------+--------------------------+--------------------------+---------+---------------+------------------+----------------+-------------------+----------------+------------------+------------------+------------------+------------------+----------------+----------------------+--------------+--------------+
|2021-01-11 14:45:00|        0.333333

## 7. Prepare corelation matrix ##

In [56]:
%%time
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

for company_code in tweets_with_sentiment_and_stock_results_df.keys():  
    for ticker_interval in tweets_with_sentiment_and_stock_results_df[company_code].keys():
               
        if company_code != 'TSLA' or ticker_interval != "15":
            continue
        
        #aggregated_tweets_and_stock_df[company_code][ticker_interval].printSchema()
        
        df = aggregated_tweets_and_stock_df[company_code][ticker_interval]\
            .select(\
                    col("avg(is_sentiment_positive)"),\
                    col("avg(is_sentiment_negative)"),\
                    col("count(id)"),\
                    col("sum(like_count)"),\
                    col("avg(like_count)"),\
                    col("sum(quote_count)"),\
                    col("avg(quote_count)"),\
                    col("sum(reply_count)"),\
                    col("avg(reply_count)"),\
                    col("sum(retweet_count)"),\
                    col("avg(retweet_count)"),\
                    col("avg(length(text))"),\
                    col("count(author_id)"),\
                    col("count(conversation_id)"),\
                    col("result_percent"),\
                    col("result_numeric")\
                    )
        
#                     col("sum(is_sentiment_positive)"),\
#                     col("sum(is_sentiment_negative)"),\
#                     col("(sum(is_sentiment_positive) / sum(is_sentiment_negative))"),\
#                     col("close"),\
#                     col("open"),\
#                     col("high"),\
#                     col("low"),\
#                     col("volume"),\
        
        assembler = VectorAssembler(inputCols = df.columns, outputCol = "features")
        assembled = assembler.transform(df)
        
        print(f'company code: {company_code}, ticker time: {ticker_interval}')
        
        print("Pearson Correlation:")
        pearson_corr = Correlation.corr(assembled, "features", "pearson")
        corr_list = pearson_corr.head()[0].toArray().tolist()
        pearson_corr_df = (spark.createDataFrame(corr_list)).toDF(*df.columns)
        pearson_corr_df.show(truncate=False)
        
        print("Spearman Correlation:")
        spearman_corr = Correlation.corr(assembled, "features", "spearman")
        corr_list = spearman_corr.head()[0].toArray().tolist()
        spearman_corr_df = (spark.createDataFrame(corr_list)).toDF(*df.columns)
        spearman_corr_df.show(truncate=False)

company code: TSLA, ticker time: 15
Pearson Correlation:
+--------------------------+--------------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+----------------------+--------------------+---------------------+
|avg(is_sentiment_positive)|avg(is_sentiment_negative)|count(id)            |sum(like_count)      |avg(like_count)      |sum(quote_count)     |avg(quote_count)     |sum(reply_count)     |avg(reply_count)     |sum(retweet_count)   |avg(retweet_count)   |avg(length(text))    |count(author_id)     |count(conversation_id)|result_percent      |result_numeric       |
+--------------------------+--------------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+------

In [None]:
pearson_corr_df.printSchema()

In [None]:
spearman_corr_df.printSchema()

## 8. Experiments with Clasifiers ##

Classifiers based on: https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa

In [21]:
from functools import reduce 
from pyspark.sql import DataFrame

# warning - slow :(
def unionAll(*dfs):
    first, *_ = dfs 
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )

# extremely slow
# def unionAll(*dfs):
#     return reduce(DataFrame.unionAll, dfs)

In [22]:
%%time
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

        
# df = aggregated_tweets_and_stock_df["TSLA"]["15"]\
#     .select(\
#         col("avg(is_sentiment_positive)"),\
#         col("avg(length(text))"),\
#         col("count(id)"),\
#         col("result")\
#     )

df = unionAll(
    aggregated_tweets_and_stock_df['AMZN']["15"],\
    aggregated_tweets_and_stock_df['AMD']["15"],\
    aggregated_tweets_and_stock_df['INTC']["15"],\
    aggregated_tweets_and_stock_df['MCD']["15"],\
    aggregated_tweets_and_stock_df['MSFT']["15"],\
    aggregated_tweets_and_stock_df['NFLX']["15"],\
    aggregated_tweets_and_stock_df['PFE']["15"],\
    aggregated_tweets_and_stock_df['QCOM']["15"],\
    aggregated_tweets_and_stock_df['SBUX']["15"],\
    aggregated_tweets_and_stock_df['TSLA']["15"]\
    )\
    .select(\
        col("avg(is_sentiment_positive)"),\
        col("avg(length(text))"),\
        col("count(id)"),\
        col("result")\
    )

CPU times: user 44.7 ms, sys: 0 ns, total: 44.7 ms
Wall time: 2min 24s


In [23]:
%%time

categoricalColumns = []
stages = []
cols = df.columns

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'result', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['avg(is_sentiment_positive)', 'avg(length(text))', 'count(id)']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

CPU times: user 6.22 ms, sys: 0 ns, total: 6.22 ms
Wall time: 21.6 ms


In [24]:
%%time
print((df.count(), len(df.columns)))

(1225, 4)
CPU times: user 140 ms, sys: 91.2 ms, total: 231 ms
Wall time: 18.3 s


In [25]:
%%time
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- avg(is_sentiment_positive): double (nullable = true)
 |-- avg(length(text)): double (nullable = true)
 |-- count(id): long (nullable = true)
 |-- result: string (nullable = true)

CPU times: user 184 ms, sys: 54.6 ms, total: 239 ms
Wall time: 12.6 s


In [26]:
%%time
train, test = df.randomSplit([0.7, 0.3], seed = 2021)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 861
Test Dataset Count: 364
CPU times: user 316 ms, sys: 98.7 ms, total: 415 ms
Wall time: 24.9 s


In [27]:
%%time
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

CPU times: user 2.2 s, sys: 1.07 s, total: 3.27 s
Wall time: 2min 33s


In [28]:
%%time
predictions_lr = lrModel.transform(test)
predictions_lr.show(10)

+-----+--------------------+--------------------------+------------------+---------+------+--------------------+--------------------+----------+
|label|            features|avg(is_sentiment_positive)| avg(length(text))|count(id)|result|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------------+------------------+---------+------+--------------------+--------------------+----------+
|  1.0|[0.5,198.66666666...|                       0.5|198.66666666666666|        6|  gain|[0.15484569788143...|[0.53863426021780...|       0.0|
|  1.0| [0.375,164.375,8.0]|                     0.375|           164.375|        8|  gain|[0.16795071738127...|[0.54188925986124...|       0.0|
|  0.0|   [0.75,153.25,4.0]|                      0.75|            153.25|        4|  loss|[0.13513513673790...|[0.53373246600133...|       0.0|
|  0.0|   [0.625,221.0,8.0]|                     0.625|             221.0|        8|  loss|[0.07418347418833...|[0.51853736810847.

In [29]:
%%time
from pyspark.ml.evaluation import BinaryClassificationEvaluator

bc_evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', bc_evaluator.evaluate(predictions_lr))

Test Area Under ROC 0.5149098160065261
CPU times: user 134 ms, sys: 111 ms, total: 245 ms
Wall time: 15.7 s


In [30]:
%%time
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

mc_evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol="label", metricName="f1")
print(f"Calculated f1: {mc_evaluator.evaluate(predictions_lr)}")

Calculated f1: 0.3760809123863192
CPU times: user 136 ms, sys: 115 ms, total: 252 ms
Wall time: 13.9 s


In [31]:
%%time
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)

CPU times: user 947 ms, sys: 636 ms, total: 1.58 s
Wall time: 1min 29s


In [32]:
%%time
predictions_rf = rfModel.transform(test)
predictions_rf.show(10)

+-----+--------------------+--------------------------+------------------+---------+------+--------------------+--------------------+----------+
|label|            features|avg(is_sentiment_positive)| avg(length(text))|count(id)|result|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------------+------------------+---------+------+--------------------+--------------------+----------+
|  1.0|[0.5,198.66666666...|                       0.5|198.66666666666666|        6|  gain|[11.4070666002165...|[0.57035333001082...|       0.0|
|  1.0| [0.375,164.375,8.0]|                     0.375|           164.375|        8|  gain|[10.4718343368009...|[0.52359171684004...|       0.0|
|  0.0|   [0.75,153.25,4.0]|                      0.75|            153.25|        4|  loss|[10.8962172032807...|[0.54481086016403...|       0.0|
|  0.0|   [0.625,221.0,8.0]|                     0.625|             221.0|        8|  loss|[11.0271511108962...|[0.55135755554481.

In [33]:
%%time
bc_evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(bc_evaluator.evaluate(predictions_rf, {bc_evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.514728541647784
CPU times: user 170 ms, sys: 79.3 ms, total: 250 ms
Wall time: 15.3 s


In [34]:
%%time
mc_evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol="label", metricName="f1")
print(f"Calculated f1: {mc_evaluator.evaluate(predictions_rf)}")

Calculated f1: 0.48955225376630057
CPU times: user 143 ms, sys: 95.5 ms, total: 238 ms
Wall time: 13.5 s


In [35]:
%%time
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)

CPU times: user 6.86 s, sys: 3.62 s, total: 10.5 s
Wall time: 8min 43s


In [36]:
%%time
predictions_gbt = gbtModel.transform(test)
predictions_gbt.show(10)

+-----+--------------------+--------------------------+------------------+---------+------+--------------------+--------------------+----------+
|label|            features|avg(is_sentiment_positive)| avg(length(text))|count(id)|result|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------------+------------------+---------+------+--------------------+--------------------+----------+
|  1.0|[0.5,198.66666666...|                       0.5|198.66666666666666|        6|  gain|[0.11080656558659...|[0.55517764198153...|       0.0|
|  1.0| [0.375,164.375,8.0]|                     0.375|           164.375|        8|  gain|[0.12772553356781...|[0.56351773639138...|       0.0|
|  0.0|   [0.75,153.25,4.0]|                      0.75|            153.25|        4|  loss|[0.08221245914211...|[0.54101386845980...|       0.0|
|  0.0|   [0.625,221.0,8.0]|                     0.625|             221.0|        8|  loss|[-0.1830218737459...|[0.40949733643256.

In [37]:
%%time
bc_evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(bc_evaluator.evaluate(predictions_gbt, {bc_evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.5283543309465542
CPU times: user 127 ms, sys: 107 ms, total: 234 ms
Wall time: 14.6 s


In [38]:
%%time
mc_evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol="label", metricName="f1")
print(f"Calculated f1: {mc_evaluator.evaluate(predictions_gbt)}")

Calculated f1: 0.5035172625931702
CPU times: user 79.8 ms, sys: 143 ms, total: 223 ms
Wall time: 13.5 s
