# Tweets streaming - Consumer
Read Stream into dataframe, add timestamp, sentiment and price, save aggregated window

Version 12: we need to first save in memory and only then in parquet as a workaround

In [0]:
#!pip install textblob
#!pip install pycountry
#!pip install bs4

In [0]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob
import re
import pycountry
from datetime import datetime
import requests
import pandas as pd
import json
import time
from bs4 import BeautifulSoup

In [0]:
# Funktions to split away the timestamp from the tweet message
@udf
def split_start(text):
  stamp = text[:-27]
  return stamp
@udf
def split_back(text):
  stamp = text[-21:-4]     # -4 because reading it without milliseconds 
  return stamp


In [0]:
# Additional cleaning and splitting of the incoming text
def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    words = words.withColumn('tweet_txt', split_start('word'))
    words = words.withColumn('stamp', split_back('word'))
    return words
  

In [0]:
# Apply Textblob Sentiment Analysis
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity

def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("tweet_txt"))
    return words


In [0]:
# Language Detection
def language_detection(text):
    try:
      iso_code = TextBlob(text).detect_language()
      language = pycountry.languages.get(alpha_2=iso_code)
      language_name = language.name
    except:
      language_name = 'no language detected'
    return language_name

def apply_language(words):
    # language detection
    language_detection_udf = udf(language_detection, StringType())
    words = words.withColumn("language", language_detection_udf("tweet_txt"))
    return words
  

In [0]:
# Funtcion to call the current bitcoint price
@udf
def get_actual_crypto_price(crypto):
  base_url = 'https://coinmarketcap.com'
  request = requests.get(base_url)
  soup = BeautifulSoup(request.content, 'html.parser')
  data = soup.find('script', id="__NEXT_DATA__", type="application/json")
  coins = {}

  coin_data = json.loads(data.contents[0])
  listings = coin_data['props']['initialState']['cryptocurrency']['listingLatest']['data']

  for i in listings:
    crypto_curr = i['name']
    if crypto_curr.lower() == crypto.lower():
      evaluate_price = i['quotes'][2]
      #print(evaluate_price)
      coins[str(i['id'])] = i['slug']
      coins['currency'] = evaluate_price['name']
      coins['actual_price'] = evaluate_price['price']
      coins['percentChange24h'] = evaluate_price['percentChange24h']
  return coins['actual_price']


In [0]:
# Create Spark session
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

# Read the tweet data from socket
lines = spark.readStream.format("socket") \
        .option("host", "localhost") \
        .option("port", 9997) \
        .load()

# Preprocess the data
words = preprocessing(lines)

# Apply column with text classification to define polarity
words = text_classification(words)

# Add column with language
words = apply_language(words)
                         
# Add a column with the currency lable 'bitcoin'
words = words.withColumn("crypto", lit('bitcoin'))


In [0]:
# Filter all tweets in English
words = words.filter(words.language == "English")

# Filter out all tweets with polarity '0'
words = words.filter(words.polarity != "0.0")


In [0]:
# Re-format timestamp from string to TimestampType
format = '%Y-%m-%dT%H%M%S'
time_udf = udf(lambda x: datetime.strptime(x, format), TimestampType())
words = words.withColumn('t_stamp', time_udf('stamp'))

# Re-format polarity to float
words = words.withColumn('polarity', col('polarity').cast('float'))


In [0]:
# Create a 6 second window as basis for all subsequent analysis (since version 11 with watermark)
windowedStream = words.withWatermark('t_stamp', '6 seconds') \
                      .groupBy(window('t_stamp', '6 seconds', '6 seconds'))

# previous versions without watermark:
#windowedStream = words.groupBy(window('t_stamp', '6 seconds', '6 seconds'))

# Aggregate the figures we need and add the bitcoin price
aggregationsStream = windowedStream \
        .agg(count('tweet_txt').alias('count_tweets') \
           , avg('polarity').alias('pol_avg') \
           , get_actual_crypto_price(first(col('crypto'))).cast('float').alias('window_price') \
           , max('t_stamp').alias('t_stamp')
           )


In [0]:
# Both the words-Stream as well as the aggregationsStream can be displayed
# display(words)
# display(aggregationsStream)

In [0]:
# Try to write the aggregationStream to parquet with 'append mode' (with watermark included in command 12)
# --> this did not work: parquet file is created with structure but file has no records
# we did not find a solution to make this work
'''
aggregationsStream \
    .writeStream \
    .format("parquet")\
    .option("path", "/experiment_results /original")\
    .option("checkpointLocation", "/ experiment_results /check")\
    .outputMode("append")\
    .start()
'''

In [0]:
# Write to memory with 'append mode' to overcome the parquet problem
streamingETLQuery = aggregationsStream \
  .writeStream \
  .format("memory") \
  .queryName("aggDF") \
  .outputMode("append")\
  .start()


In [0]:
# Count the records in memory
spark.sql("select count(*) from aggDF").show()

In [0]:
# Display the records in memory
spark.sql("select * from aggDF")\
  .sort('t_stamp')\
  .show() 


In [0]:
# Persistently store the result in a parquet file
spark.sql("select * from aggDF").write.parquet('dbfs:/FileStore/original/tweet_sparksql')

In [0]:
#%fs ls dbfs:/FileStore/

In [0]:
#%fs rm -r dbfs:/FileStore/original/

In [0]:
#%fs rm -r dbfs:/local_disk0/tmp/