This notebook contains the code for doing keyword-focused sentiment analysis of raw
text collected from news articles. 

Note: This notebook was originally developed in a DataBricks environment and will be adapted
to run in AWS Glue.

In [None]:
#Setup and check mount to S3
access_key = 'key goes here'
secret_key = 'key goes here'
encoded_secret_key = secret_key.replace('/', '%2F')
bucket_name = 'bucket name goes here'
mount_name = 's3_data'
mount_resource = f"s3a://{access_key}:{encoded_secret_key}@{bucket_name}"
mount_location = f"/mnt/{mount_name}"

dbutils.fs.mount(mount_resource, mount_location)

In [None]:
display(dbutils.fs.ls(mount_location))

In [None]:
from datetime import datetime, timedelta
yesterday = (datetime.today().date() - timedelta(days=1)).strftime('%Y-%m-%d')
path_to_yesterdays_articles = f"{mount_location}/incoming/{yesterday}"
display(dbutils.fs.ls(path_to_yesterdays_articles))

In [None]:
# Explore data

In [None]:
from pyspark.sql.functions import input_file_name, lit, regexp_extract, length

df = spark.read.text(path_to_yesterdays_articles) \
    .withColumn('filename', regexp_extract(input_file_name(), '[^/]+$', 0)) \
    .withColumn('date', lit(yesterday)) \
    .withColumnRenamed('value', 'article_text') \
    .withColumn('article_text_length', length('article_text')) \
    .filter('article_text_length > 400')

display(df)


In [None]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.functions import col, expr, array_remove

In [None]:
class EmptyTokenRemover(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(EmptyTokenRemover, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        remove_expr = array_remove(col(self.getInputCol()), '')
        filtered_df = dataset.withColumn(self.getOutputCol(), remove_expr)
        return filtered_df


In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline

In [None]:
# Do some basic cleaning and tokenization in preparation for getting term frequencies
tokenizer = Tokenizer(inputCol='article_text', outputCol='tokens')
empty_remover = EmptyTokenRemover(inputCol='tokens', outputCol='cleaner_tokens')
stop_remover = StopWordsRemover(inputCol='cleaner_tokens', outputCol='stop_removed')

prep_pipeline = Pipeline(stages=[tokenizer, empty_remover, stop_remover])

prep_pipeline_model = prep_pipeline.fit(df)
prepped_df = prep_pipeline_model.transform(df)

In [None]:
from pyspark.sql.functions import col, lit, size, expr

In [None]:
# Get Term Frequency for keyword (keyword e.g., 'pickleball')
keyword = 'keyword goes here'

tf_df = prepped_df.withColumn('keyword', lit(keyword)) \
    .withColumn('keyword_freq', expr(f"size(filter(stop_removed, token -> token == '{keyword}'))"))

tf_df = tf_df.withColumn('norm_keyword_freq', col('keyword_freq') / size(col('stop_removed')))

display(tf_df)


In [None]:
sorted([(row[0], round(row[1], 4)) for row in (tf_df.select('filename', 'norm_keyword_freq').collect())], key=lambda x: x[1],reverse=True)

In [None]:
sorted([ (row[0], round(row[1], 4)) for row in (tf_df.select('filename', 'norm_keyword_freq', 'keyword_freq').collect()) if row[2] > 2], key=lambda x: x[1],reverse=True)

In [None]:
# Keyword TF cutoffs (absolute count and relative to article word count) used here
# determined somewhat arbitrarily through visual inspection of sample articles.

keyword_freq_cutoff = 3
norm_keyword_freq_cutoff = 0.003

filtered_articles_df = tf_df.filter((col('keyword_freq') >= keyword_freq_cutoff) & (col('norm_keyword_freq') >= norm_keyword_freq_cutoff))

display(filtered_articles_df)

In [None]:
# Sentiment Analysis

In [None]:
from pyspark.sql.functions import explode
from pyspark.sql.types import ArrayType, StringType, FloatType
import nltk
from nltk.tokenize import sent_tokenize
from textblob import TextBlob

nltk.download('punkt')

In [None]:
def tokenize_sentences(text):
    return sent_tokenize(text)

tokenize_sentences_udf = udf(tokenize_sentences, ArrayType(StringType()))

sentence_tokenized_df = filtered_articles_df.withColumn('sentence', explode(tokenize_sentences_udf(filtered_articles_df['article_text'])))

In [None]:
def sentiment_score(text):
    return TextBlob(text).sentiment.polarity

sentence_score_udf = udf(sentiment_score, FloatType())

sentence_sentiment_df = sentence_tokenized_df \
    .withColumn('sentiment_score', sentence_score_udf(sentence_tokenized_df['sentence'])) \
    .withColumn('keyword_in_sentence', col('sentence').contains(keyword))

display(sentence_sentiment_df.select('filename', 'sentence', 'sentiment_score', 'keyword_in_sentence'))

In [None]:
from pyspark.sql import functions as F

In [None]:
sentiment_summary_df_1 = sentence_sentiment_df.groupBy('filename').agg(F.mean('sentiment_score').alias('full_article_sentiment'))

sentiment_summary_df_2 = sentence_sentiment_df.filter(col('keyword_in_sentence') == True).groupBy('filename').agg(F.mean('sentiment_score').alias('keyword_sentences_sentiment'))

sentiment_summary_df = sentiment_summary_df_1.join(sentiment_summary_df_2, 'filename', 'inner') \
    .withColumn('full_article_sentiment', F.format_number('full_article_sentiment', 3)) \
    .withColumn('keyword_sentences_sentiment', F.format_number('keyword_sentences_sentiment', 3))

display(sentiment_summary_df)

In [None]:
# Load results

In [None]:
# Unmount S3

dbutils.fs.unmount(mount_location)