In [ ]:
%pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Sentiment Analysis") \
    .getOrCreate()

In [ ]:
%pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType

# Read dialogue data from s3
dataframe = spark \
    .read \
    .load("s3://strata-demo-data/nlp_data/Dialogue-dataset/trainline_dialogue.csv",
          format="csv", inferSchema="true", header="true")
          
change_speaker_name_udf = F.udf(lambda speaker: 'Call Agent' if speaker == 'A' else 'Customer', StringType())
dataframe = dataframe.withColumn('speaker', change_speaker_name_udf(dataframe.speaker))
          
dataframe.show(5)

# Only keep rows with sentiment data for modelling
filtered_dataframe = dataframe.na.drop(subset=['polarity'])

In [ ]:
%pyspark
first_ten_examples = dataframe.select(F.col('speaker'), F.col('response'), F.col('polarity')).take(10)
print('First 10 responses:')
for example in first_ten_examples:
    print '{} - "{} ({})"'.format(example.speaker, example.response, example.polarity)

## Distribution of Sentiment within Dataset


In [ ]:
%pyspark
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.functions import desc

grouped_dataframe = dataframe.groupBy('polarity').count()
sentiments = grouped_dataframe.select('polarity').collect()
sentiments = ['null' if sentiment.polarity is None else sentiment.polarity for sentiment in sentiments]
counts = grouped_dataframe.select('count').collect()
counts = [c['count'] for c in counts]

grouped_dataframe_speaker = dataframe.groupBy(['speaker', 'polarity']).count().orderBy('speaker', desc('count'))
speaker_a_counts = grouped_dataframe_speaker.filter(grouped_dataframe_speaker.speaker == 'Call Agent').select('count').collect()
speaker_a_counts = [c['count'] for c in speaker_a_counts]
speaker_b_counts = grouped_dataframe_speaker.filter(grouped_dataframe_speaker.speaker == 'Customer').select('count').collect()
speaker_b_counts = [c['count'] for c in speaker_b_counts]

fig, axarr = plt.subplots(2, sharex=True)
fig.suptitle('Breakdown of Sentiment within Dialogues', fontsize=15)

axarr[0].bar(x=sentiments, height=counts)
axarr[0].set_title('(All Responses)')
axarr[0].set_ylabel('Total Responses', fontsize=13)

p1 = axarr[1].bar(x=sentiments, height=speaker_a_counts)
p2 = axarr[1].bar(x=sentiments, height=speaker_b_counts, bottom=speaker_a_counts)

axarr[1].set_title('(Responses Split by Speaker)')
axarr[1].set_xlabel('Sentiment', fontsize=13)
axarr[1].set_ylabel('Total Responses', fontsize=13)
axarr[1].legend((p1[0], p2[0]), ('Call Agent', 'Customer'))

plt.show()

In [ ]:
%pyspark
five_positive_examples_a = dataframe.select(F.col('speaker'), F.col('response'), F.col('polarity')).filter((dataframe.polarity == 'positive') & (dataframe.speaker == 'Call Agent')).take(5)
five_positive_examples_b = dataframe.select(F.col('speaker'), F.col('response'), F.col('polarity')).filter((dataframe.polarity == 'positive') & (dataframe.speaker == 'Customer')).take(5)
five_negative_examples_a = dataframe.select(F.col('speaker'), F.col('response'), F.col('polarity')).filter((dataframe.polarity == 'negative') & (dataframe.speaker == 'Call Agent')).take(5)
five_negative_examples_b = dataframe.select(F.col('speaker'), F.col('response'), F.col('polarity')).filter((dataframe.polarity == 'negative') & (dataframe.speaker == 'Customer')).take(5)

print('Positive examples (Call Agent):')
for example in five_positive_examples_a:
    print '{} - "{} ({})"'.format(example.speaker, example.response, example.polarity)


In [ ]:
%pyspark
print('Negative examples (Call Agent):')
for example in five_negative_examples_a:
    print '{} - "{} ({})"'.format(example.speaker, example.response, example.polarity)

In [ ]:
%pyspark
print('Positive examples (Customer):')
for example in five_positive_examples_b:
    print '{} - "{} ({})"'.format(example.speaker, example.response, example.polarity)

In [ ]:
%pyspark
print('Negative examples (Customer):')
for example in five_negative_examples_b:
    print '{} - "{} ({})"'.format(example.speaker, example.response, example.polarity)

## Analysis of Topic Distributions for Negative Responses

In [ ]:
%pyspark
top_five_negative_topics = dataframe \
    .filter(dataframe.polarity == 'negative') \
    .groupBy('topic') \
    .count() \
    .orderBy(desc('count')) \
    .limit(5)

topics = top_five_negative_topics.select('topic').collect()
topics = ['null' if t.topic is None else t.topic for t in topics]
topic_counts = top_five_negative_topics.select('count').collect()
topic_counts = [c['count'] for c in topic_counts]

plt.bar(x=topics, height=topic_counts)
plt.title('Top 5 \'Negative\' Topics within Dialogues', fontsize=15)
plt.xlabel('Topics', fontsize=13)
plt.ylabel('Total Negative Responses', fontsize=13)
plt.show()

In [ ]:
%pyspark
import string

remove_punctuation_udf = F.udf(lambda response: response.replace('.', '').replace(',', '').replace('!', '').replace('?', ''), StringType())
create_labels_udf = F.udf(lambda polarity: 1 if polarity == 'positive' else 0, IntegerType())

cleaned_dataframe = filtered_dataframe \
    .withColumn('cleaned response', remove_punctuation_udf(filtered_dataframe.response)) \
    .withColumn('label', create_labels_udf(filtered_dataframe.polarity)) \
    .select(F.col('id'), F.col('conversation id'), F.col('turn number'), F.col('speaker'),  F.col('response'), F.col('cleaned response'), F.col('topic'), F.col('polarity'), F.col('label'))

## Plotting Word Distributions for Positive and Negative Responses

In [ ]:
%pyspark
import itertools
import pandas as pd
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline

all_positive_responses = cleaned_dataframe.filter(dataframe.polarity == 'positive')
all_negative_responses = cleaned_dataframe.filter(dataframe.polarity == 'negative')

# Tokenize responses
tokenizer = Tokenizer().setInputCol('cleaned response').setOutputCol('words')

# Get stopwords
#stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words') \
#    .text.split()
stop_words = requests.get('https://raw.githubusercontent.com/igorbrigadir/stopwords/master/en/bow_short.txt') \
    .text.split()
additional_stopwords = ['uh', 'um', 'uh-huh', 'um-hum', 'em', 'ah', 'er', 'erm']
additional_stopwords.extend([str(i) for i in range(100)])
stop_words.extend(additional_stopwords)

# Prepare stopwords remover
sw_filter = StopWordsRemover() \
    .setStopWords(stop_words) \
    .setCaseSensitive(False) \
    .setInputCol('words') \
    .setOutputCol('filtered')

positive_tokenization_pipeline = Pipeline(stages=[tokenizer, sw_filter]).fit(all_positive_responses)
negative_tokenization_pipeline = Pipeline(stages=[tokenizer, sw_filter]).fit(all_negative_responses)
tokenized_positive_responses = positive_tokenization_pipeline.transform(all_positive_responses)
tokenized_negative_responses = negative_tokenization_pipeline.transform(all_negative_responses)

positive_words = tokenized_positive_responses.select('filtered').collect()
positive_words = list(itertools.chain.from_iterable([word.filtered for word in positive_words]))
positive_words_distinct = list(set(positive_words))
positive_words_cleaned = [word for word in positive_words_distinct if word != '']
positive_word_counts = [positive_words.count(word) for word in positive_words_cleaned]
positive_wordcount_df = pd.DataFrame({'word': positive_words_cleaned, 'count': positive_word_counts}).sort_values(by=['count'], ascending=False).head(10)

negative_words = tokenized_negative_responses.select('filtered').collect()
negative_words = list(itertools.chain.from_iterable([word.filtered for word in negative_words]))
negative_words_distinct = list(set(negative_words))
negative_words_cleaned = [word for word in negative_words_distinct if word != '']
negative_word_counts = [negative_words.count(word) for word in negative_words_cleaned]
negative_wordcount_df = pd.DataFrame({'word': negative_words_cleaned, 'count': negative_word_counts}).sort_values(by=['count'], ascending=False).head(10)

fig, axarr = plt.subplots(2, sharex=False)
fig.suptitle('Word Distributions for Positive and Negative Responses')

axarr[0].bar(x=positive_wordcount_df.word, height=positive_wordcount_df['count'], color='green')
axarr[0].set_ylabel('Positive Word Count', fontsize=13)
axarr[0].legend(['Positive'])

axarr[1].bar(x=negative_wordcount_df.word, height=negative_wordcount_df['count'], color='red')
axarr[1].set_ylabel('Negative Word Count', fontsize=13)
axarr[1].set_xlabel('Word', fontsize=13)
axarr[1].legend(['Negative'])

plt.show()

 
## Spark ML Pipeline
![image](https://s3.eu-west-2.amazonaws.com/strata-demo-data/nlp_data/Dialogue-dataset/Pipeline.png)


## Data Pre-processing


In [ ]:
%pyspark
cleaned_dataframe.show(5)

In [ ]:
%pyspark
import requests
from pyspark.ml.feature import CountVectorizer, IDF


# Prepare count vectorizer and remove words that appear in less than 5 'docs'
cv = CountVectorizer(minTF=1., minDF=5) \
    .setInputCol('filtered') \
    .setOutputCol('tf')

# Create count vectorizer pipeline
cv_pipeline = Pipeline(stages=[tokenizer, sw_filter, cv]).fit(cleaned_dataframe)

# Create IDF pipeline
idf = IDF() \
    .setInputCol('tf') \
    .setOutputCol('tfidf')

idf_pipeline = Pipeline(stages=[cv_pipeline, idf]).fit(cleaned_dataframe)
idf_pipeline.transform(cleaned_dataframe).show(5)


## Logistic Regression

In [ ]:
%pyspark
import pyspark.sql.functions as fn
from pyspark.ml.classification import LogisticRegression

training_df, testing_df = cleaned_dataframe.randomSplit([0.7, 0.3], seed=0)
print 'Total Rows in Training Set: {}'.format(training_df.count())
print 'Total Rows in Test Set: {}'.format(testing_df.count())

lr = LogisticRegression() \
    .setLabelCol('label') \
    .setFeaturesCol('tfidf') \
    .setRegParam(0.0) \
    .setMaxIter(100) \
    .setElasticNetParam(0.)

# Create logistic regression pipeline
print 'Fitting Logistic Regression model on training data set...'
lr_pipeline = Pipeline(stages=[idf_pipeline, lr]).fit(training_df)

print 'Transforming validation data set...'
print 'Calculating accuracy...'
lr_accuracy = lr_pipeline.transform(testing_df) \
    .select(fn.expr('float(prediction = label)').alias('correct')) \
    .select(fn.avg('correct')).collect()
print lr_accuracy[0]['avg(correct)']

In [ ]:
%pyspark
import pandas as pd

vocabulary = idf_pipeline.stages[0].stages[-1].vocabulary
weights = lr_pipeline.stages[-1].coefficients.toArray()
coeffs_df = pd.DataFrame({'word': vocabulary, 'coefficient': weights})

print 'Generating Top 15 Negative Words'
# Top 15 'Negative' words based on weights of trained model
negative_coeffs_df = coeffs_df.sort_values('coefficient').head(15)
negative_coeffs_spark_df = spark.createDataFrame(negative_coeffs_df)
negative_coeffs_spark_df.show()

In [ ]:
%pyspark

print 'Generating Top 15 Positive Words'
# Top 15 'Positive' words based on weights of trained model
positive_coeffs_df = coeffs_df.sort_values('coefficient', ascending=False).head(15)
positive_coeffs_spark_df = spark.createDataFrame(positive_coeffs_df)
positive_coeffs_spark_df.show()

## Parameter Tuning - Regularisation and Param Grid


In [ ]:
%pyspark
import numpy as np
from pyspark.ml.tuning import ParamGridBuilder

lambda_par = 0.02
alpha_par = 0.3

en_lr = LogisticRegression() \
    .setLabelCol('label') \
    .setFeaturesCol('tfidf') \
    .setRegParam(lambda_par) \
    .setMaxIter(100) \
    .setElasticNetParam(alpha_par)
    
en_lr_estimator = Pipeline(stages=[idf_pipeline, en_lr])

grid = ParamGridBuilder() \
    .addGrid(en_lr.regParam, [0., 0.01, 0.02, 0.03, 0.04, 0.05]) \
    .addGrid(en_lr.elasticNetParam, [0., 0.2, 0.4, 0.6, 0.8, 1.0]) \
    .build()

all_models = []

param_grid_length = len(grid)
for j in range(param_grid_length):
    print 'Fitting model {} of {}'.format((j + 1), param_grid_length)
    model = en_lr_estimator.fit(training_df, grid[j])
    all_models.append(model)

accuracies = [m \
    .transform(testing_df) \
    .select(fn.avg(fn.expr('float(prediction = label)')).alias('accuracy')) \
    .first() \
    .accuracy for m in all_models]

best_model_idx = np.argmax(accuracies)

print 'Determining best model...'
best_model = all_models[best_model_idx]

print 'Calculating accuracy for best model...'
print accuracies[best_model_idx]
    

In [ ]:
%pyspark
# Get Top 15 negative and positive words based on weights for best model
en_lr_best_pipeline = en_lr_estimator.fit(training_df, grid[best_model_idx])
en_weights = en_lr_best_pipeline.stages[-1].coefficients.toArray()
en_coeffs_df = pd.DataFrame({'word': vocabulary, 'coefficient': en_weights})

print 'Top 15 Negative Words from Best Model'
# Top 15 'Negative' words based on weights of trained model
negative_en_coeffs_df = en_coeffs_df.sort_values('coefficient').head(15)
negative_en_coeffs_spark_df = spark.createDataFrame(negative_en_coeffs_df)
negative_en_coeffs_spark_df.show()

In [ ]:
%pyspark
print 'Top 15 Positive Words from Best Model'
# Top 15 'Positive' words based on weights of trained model
positive_en_coeffs_df = en_coeffs_df.sort_values('coefficient', ascending=False).head(15)
positive_en_coeffs_spark_df = spark.createDataFrame(positive_en_coeffs_df)
positive_en_coeffs_spark_df.show()

## Classifying Responses with Missing Sentiment


In [ ]:
%pyspark
import random

missing_sentiment_dataframe = dataframe \
    .filter(dataframe.polarity.isNull())
    
cleaned_missing_sentiment_dataframe = missing_sentiment_dataframe \
    .withColumn('cleaned response', remove_punctuation_udf(missing_sentiment_dataframe.response)) \
    .withColumn('label', create_labels_udf(missing_sentiment_dataframe.polarity)) \
    .select(F.col('id'), F.col('conversation id'), F.col('turn number'), F.col('speaker'),  F.col('response'), F.col('cleaned response'), F.col('topic'), F.col('polarity'), F.col('label'))
    
print 'Classifying responses with missing sentiment...'
missing_sentiment_prediction_df = en_lr_best_pipeline.transform(cleaned_missing_sentiment_dataframe)
#missing_sentiment_prediction_df \
#    .select(F.col('id'), F.col('conversation id'), F.col('turn number'), F.col('speaker'), F.col('response'), F.col('topic'), F.col('polarity'), F.col('prediction')).show(100)
positive_predictions = missing_sentiment_prediction_df.filter(missing_sentiment_prediction_df.prediction == 1).select(F.col('response')).collect()
negative_predictions = missing_sentiment_prediction_df.filter(missing_sentiment_prediction_df.prediction == 0).select(F.col('response')).collect()

positive_predictions = [response.response for response in positive_predictions]
negative_predictions = [response.response for response in negative_predictions]

print 'Sampling positive and negative predictions...'
# Sample positive and negative predictions
positive_prediction_sample = random.sample(positive_predictions, 10)
negative_prediction_sample = random.sample(negative_predictions, 10)

print 'Outputting prediction samples to dataframe...'
prediction_sample_dataframe = pd.DataFrame({'Positive Responses': positive_prediction_sample, 'Negative Responses': negative_prediction_sample})
z.show(spark.createDataFrame(prediction_sample_dataframe))






In [ ]:
%pyspark
