In [1]:
import findspark
findspark.init()


In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'


In [3]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, udf
from pyspark.sql.types import StringType, TimestampType, FloatType

import json
import datetime

from afinn import Afinn
spark = SparkSession.builder \
                    .master("local[3]") \
                    .config('spark.executor.instances', 3) \
                    .config("spark.sql.catalogImplementation", "hive")\
                    .appName('tweets') \
                    .getOrCreate()
ssc = StreamingContext(spark.sparkContext, 1)


:: loading settings :: url = jar:file:/usr/local/Cellar/apache-spark/3.1.1/libexec/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/yhua/.ivy2/cache
The jars for the packages stored in: /Users/yhua/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c64695f8-b08c-4b03-b594-3a147712b789;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.1 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.1 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 316ms :: artifacts

In [4]:
spark.sparkContext.setLogLevel("ERROR")

In [5]:
tweetsDfRaw = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test1") \
  .load()

In [6]:
tweetsDf = tweetsDfRaw.selectExpr("CAST(value AS STRING) as tweet_info")

In [7]:
def extract_tags(word: str):
    if word.lower().startswith("#"):
        return word
    else:
        return "nonTag"

In [8]:
def extract_tweet_text(text):
    tweet_info = json.loads(text)
    return tweet_info["tweet"]

def extract_timestamp(text):
    tweet_info = json.loads(text)
    created_at = tweet_info["created_at"]
    return datetime.strptime(created_at, '%a %b %d %H:%M:%S %z %Y')

In [9]:
afinn = Afinn()

def add_sentiment_score(text):
    sentiment_score = afinn.score(text)
    return sentiment_score

def add_sentiment_status(text):
    sentiment_score = afinn.score(text)

    try:
        if sentiment_score < 0:
            return "negative"
        elif sentiment_score == 0:
            return "neutral"
        else:
            return "positive"

    except TypeError:
        return "error"
        

In [10]:

# extract_tags_udf = udf(extract_tags, StringType())

# resultDf = words.filter(words.word.isNotNull()).withColumn("tags", extract_tags_udf(words.word))

# hashtagCountsDf = resultDf.filter(resultDf.tags != "nonTag")\
#                             .groupBy("tags")\
#                             .count()\
#                             .orderBy("count", ascending=False)

In [11]:
extract_tweet_text_udf = udf(extract_tweet_text, StringType())
extract_timestamp_udf = udf(extract_timestamp, TimestampType())

add_sentiment_score_udf = udf(add_sentiment_score, FloatType())
add_sentiment_status_udf = udf(add_sentiment_status, StringType())

In [12]:
tweetsDf = tweetsDf.withColumn(
    "tweet",
    extract_tweet_text_udf(tweetsDf.tweet_info)
)\
    .withColumn(
        "event_time",
        extract_timestamp_udf(tweetsDf.tweet_info)
    )\
    .withColumn(
        "sentiment_status",
        add_sentiment_status_udf(tweetsDf.tweet_info)
    )\
    .withColumn(
        "sentiment_score",
        add_sentiment_score_udf(tweetsDf.tweet_info)
    )
newDf = tweetsDf.select("tweet", "event_time")

In [13]:
query = newDf.writeStream \
.outputMode("append") \
.format("console")\
.option("truncate", "false")\
.start()\
.awaitTermination()


AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;
Project [tweet#24, event_time#28]
+- Project [tweet_info#21, tweet#24, event_time#28, sentiment_status#33, add_sentiment_score(tweet_info#21) AS sentiment_score#39]
   +- Project [tweet_info#21, tweet#24, event_time#28, add_sentiment_status(tweet_info#21) AS sentiment_status#33]
      +- Project [tweet_info#21, tweet#24, extract_timestamp(tweet_info#21) AS event_time#28]
         +- Project [tweet_info#21, extract_tweet_text(tweet_info#21) AS tweet#24]
            +- Project [cast(value#8 as string) AS tweet_info#21]
               +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@1bb27a9c, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@682f08ef, [kafka.bootstrap.servers=localhost:9092, subscribe=test1], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6041f9ec,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> test1),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
