In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml import PipelineModel

# Import stemmer library
from nltk.stem.porter import PorterStemmer


In [None]:
# need to mount the S3 bucket to your dbfs file system
%fs ls /mnt/my_twitter_data_project/

In [None]:
sc = spark.sparkContext
sc.addPyFile("dbfs:/mnt/my_twitter_data_project/sparkLDA.zip")

In [None]:
from sparkLDA.config import extra_for_stemmed, seedNum, file_schema_str
from sparkLDA.config import cols_select
from sparkLDA.utils import show_topics, evaluate
from sparkLDA.processing import preprocess_text

In [None]:
from textblob import TextBlob

def get_sentiment_vectorized(texts):
    testimonials = texts.apply(lambda text: TextBlob(text).sentiment.polarity)
    return testimonials

In [None]:
def parse_tweets(raw_records):
  for pdf in raw_records:
    # yield pdf.data.apply(lambda record: record.split("\t"))
    yield pdf.data.str.split(pat="\t", expand=True)

In [None]:
getSentiment = F.pandas_udf(get_sentiment_vectorized, FloatType())

In [None]:
pipelinePath = "dbfs:/mnt/my_twitter_data_project/ML_models/LDA-pipeline-model_Nov_Data/ntopics_5_maxIter_40/"

In [None]:
n_topics = 5

In [None]:
print(cols_select)

In [None]:
stream_name = "twitter-data-kinesis"

In [None]:
kinesisStreamName = "twitter-data-kinesis" 
kinesisRegion = "us-east-1"

awsAccessKeyId = "" # update the access key
awsSecretKey = ""

In [None]:
kinesisDF = (spark
            .readStream
            .format("kinesis")
            .option("streamName", kinesisStreamName)
            .option("region", kinesisRegion)
            .option("awsAccessKey", awsAccessKeyId)
            .option("awsSecretKey", awsSecretKey)
            .option("initialPosition", "latest")
            .option("format", "json")
            .option("inferSchema", "true")
            .load())

A formal test for our data transformation

In [None]:
kinesisDF

In [None]:
n_topics = 5
cols_to_save = cols_select + ["tweet_text", "stemmed_rm"]
print(cols_to_save)

In [None]:
tweets = kinesisDF.select(F.col("data").cast(StringType()).alias("data"))

#
df = tweets.mapInPandas(parse_tweets, schema = file_schema_str)
df = preprocess_text(df)

#
stopword_remover_stem = StopWordsRemover(inputCol="stemmed", outputCol="stemmed_rm")
stopword_remover_stem.setStopWords(extra_for_stemmed)

df = stopword_remover_stem.transform(df)
#
savedPipelineModel = PipelineModel.load(pipelinePath)

df_with_topics = savedPipelineModel.transform(df)

#
to_array = F.udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

df_with_topics = df_with_topics.withColumn("topicDistributionArray", 
                                         to_array("topicDistribution"))

# add the raw_tweet and the processed_tweet together
df_with_topics = df_with_topics.select(*cols_to_save, *[(F.col("topicDistributionArray")[i])
                                 .alias("topic_"+str(i)) for i in range(n_topics)]).withColumn("stemmed_rm", F.concat_ws(",", F.col("stemmed_rm")))

#
df_with_topics_sentiment = df_with_topics.withColumn("sentiment", getSentiment(F.col("tweet_text")))\
                            .withColumn("sentiment", F.format_number("sentiment", 3))

In [None]:
outputPath = "dbfs:/mnt/my_twitter_data_project/spark_streaming_test_4"
ckp_location = "dbfs:/mnt/my_twitter_data_project/spark_streaming_test_4/checkpoint"

In [None]:
df_with_topics_sentiment.printSchema()

In [None]:
query = (df_with_topics_sentiment
         .writeStream
         .outputMode("append")
         .format("csv")
         .option("sep", "\t")
         .trigger(processingTime="15 minutes")
         .start(path=outputPath, checkpointLocation=ckp_location))

query.awaitTermination()

In [None]:
print("hello\tworld")