# Lego Marketing Sentiment

## <em>Using Sentiment Analysis to analyse consumer sentiment towards Lego marketing campaigns.</em>

The libraries used for this project include:
- Python v3.9.1
- PySpark v3.4.0
- Emoji
- Instaloader (as example of getting individual Instagram posts)
- Seaborn

## Project Features

- Ingestion of data using structured streaming dataframes
- Logistic Regression classifier model for sentiment analysis
- Sample Dashboard widget demo

## Why Logistic Regression?

- binary data classification (positive or negative)
- Handles noisier data better
- Fast, needs less computational resource


> **PLEASE NOTE**
> An official social media api was not used for the following reasons:
>
> - Twitter has changed the data available for free tier, so streaming tweets wasn't possible.
> - Instagram and Facebook have a manual authorisation process which would not be completed before project due date.
>
> Thus, an Instagram comment dataset csv was created, and will be used to simulate streaming data.

The training set for this project can be found here: [Sentiment401](http://help.sentiment140.com/for-students/])

In [None]:
#Import libraries
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.functions import lower, countDistinct
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import shutil
import re
import operator
import functools
import requests
import json
import time
import instaloader
from bs4 import BeautifulSoup
import emoji
import os
import sys
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import nest_asyncio

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

#Below prevents errors when running spark sessions in jupyter
nest_asyncio.apply()


In [None]:
# initializing spark session
sc = SparkContext(appName="LegoSentiment")
spark = SparkSession(sc)

## Model Pipeline Definition, Training and Evaluation

See Above for training dataset used.

In [None]:
# define schemas for training and instagram post data
train_schema = tp.StructType([
    tp.StructField(name= 'polarity', dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'comment_id', dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'date', dataType= tp.TimestampType(),  nullable= True),
    tp.StructField(name= 'query', dataType= tp.StringType(),   nullable= True),
    tp.StructField(name= 'username', dataType= tp.StringType(),   nullable= True),
    tp.StructField(name= 'comment', dataType= tp.StringType(),   nullable= True)                
  ,
])

insta_schema = tp.StructType([
    tp.StructField(name= 'id', dataType= tp.IntegerType(),  nullable= True),
    tp.StructField(name= 'username', dataType= tp.StringType(),   nullable= True),
    tp.StructField(name= 'comment', dataType= tp.StringType(),   nullable= True),
    tp.StructField(name= 'comment_id', dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'profile_url', dataType= tp.StringType(),   nullable= True),
    tp.StructField(name= 'comment_url', dataType= tp.StringType(),   nullable= True)                
  ,
])

In [None]:
#Load training data
training_data = spark.read.csv('C:/Users/hayle/Desktop/lego_post_data/sentiment_model_data/training_data.csv',schema = train_schema,header=True)

In [None]:
# view the data and deal with null values
df = training_data.na.fill('')
df = df.na.fill(value=0)
df.show(20)

In [None]:
#Drop unneccessary colums and drop usernames to anonymise data
df = df.drop('query')
df = df.drop('username')
df = df.drop('date')
df.show(20)

#How many entries in training data
df.count()

In [None]:
#Split training data for training and testing
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 2000)

In [None]:
#Define transformation pipeline for incoming data
#split text into individual word tokens
tokenizer = Tokenizer(inputCol="comment", outputCol="words")
#increase memory efficiency and scalability using hashing
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
#Inverse Document Frequency, measure importance of word token in entry
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
#convert any categorical str vals to numerical for ML prep
label_stringIdx = StringIndexer(inputCol = "polarity", outputCol = "label")
#Define pipeline steps
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

In [None]:
#fit training set data to pipeline processes     
pipelineFit = pipeline.fit(train_set)
print('finished!')

In [None]:
#run pipeline process on train and val set
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)

In [None]:
#show transformed training set
train_df.show(50)

In [None]:
#instantiate and train logistic regression model
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

In [None]:
#show results
predictions.show(50)

In [None]:
#Uses Area Under the Curve (AUC). 
#Bigger the value, better the model is at distinguishing between categories
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [None]:
#Calculate model accuracy
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

## Ingestion, Transformation and Sentiment Analysis of Structured Streaming Instagram Comments

Using an unbounded dataframe method, more fault tolerant and scalable than socket according to Spark docs

> **NOTE** An instance of Instaloader was included to show how post data from Lego's Instagram page could be scraped.
> Using Facebook Graph API with elevated permissions for business is recommended for production stability and legal reasons.

In [None]:
#instaloader instance, an example of how to scrape insta post information realtime.
#Instagram will sometimes return a 401 if too many requests in a short time period, so not fault tolerant.
instapost = instaloader.Instaloader()
profile = instaloader.Profile.from_username(instapost.context,'lego')
posts = profile.get_posts()
count = 0
post_list = []

for post in posts:
    if count < 3:
        instapost.download_post(post, 'lego')
        post_params = dict()
        post_params['Url'] = post.url
        post_params['Likes'] = post.likes
        post_params['Caption'] = post.caption
        post_params['Caption_Hashtags'] = post.caption_hashtags
        count=count+1
        time.sleep(5)
    else:
        break

In [None]:
#Preprocessing of instagram comments
def preprocess_sentence(text):
    # remove html tags
    text = BeautifulSoup(text.encode('utf8'), "html.parser").get_text()
    # convert the text to lower case
    text.lower()
    # convert all urls to string "URL"
    text = re.sub('((www\.[^\s]+)|(https?://[^\s]+))', 'URL', text)
    # convert all @username to "AT_USER"
    text = re.sub('@[^\s]+', 'AT_USER', text)
    # correct all multiple white spaces to a single white space
    text = re.sub('[\s]+', ' ', text)
    # convert "#topic" to just "topic"
    text = re.sub(r'#([^\s]+)', r'\1', text)
    

    # split emojis
    text = emoji.get_emoji_regexp().split(text)
    text = [substr.split() for substr in text]
    text = " ".join(functools.reduce(operator.concat, text))
    # convert emojis to text
    text = emoji.demojize(text)
    return text

In [None]:
#create structured streaming dataframe
insta_comment_folder = r'C:/Users/hayle/Desktop/lego_post_data/lego_insta_comments'
insta_comments = spark.readStream.format('csv').schema(insta_schema).option('header', True).option('maxFilesPerTrigger', 1).load(insta_comment_folder)
insta_comments.dropDuplicates(['comment'])

In [None]:
insta_comments.isStreaming

In [None]:
# anonymise comments (GDPR) and drop useless columns
df = insta_comments.drop('username').drop('profile_url')
#Start dataframe streaming and save to memory
query = df.writeStream.format('memory').queryName('temp').outputMode('append').start()

In [None]:
#Due to data restrictions, will simulate streaming conditions by dropping csv into folder 😭
shutil.move('C:/Users/hayle/Desktop/lego_post_data/post_1.csv', 'C:/Users/hayle/Desktop/lego_post_data/lego_insta_comments')

In [None]:
#preprocess comment in dataframe
cleanUDF = udf(lambda x:preprocess_sentence(x),tp.StringType())

In [None]:
#Check dataframe creation and data correctly processed.
query_df = spark.sql("SELECT * FROM temp")
query_df = query_df.na.fill('').na.fill(value=0)
query_df = query_df.withColumn("comment",cleanUDF(col("comment")))
query_df.show()

In [None]:
#Run transformation on data
final_df = pipelineFit.transform(query_df)
final_df.show()

In [None]:
#run logistic regression model on data
lego_pred = lrModel.transform(final_df)
lego_pred.show()

In [None]:
#lego_pred = lego_pred.withColumn('words',lego_pred.words.cast(tp.StringType()))
lego_pred = lego_pred.withColumn('tf',lego_pred.tf.cast(tp.StringType()))
lego_pred = lego_pred.withColumn('features',lego_pred.features.cast(tp.StringType()))
#lego_pred = lego_pred.withColumn('rawPrediction',lego_pred.rawPrediction.cast(tp.StringType()))
#lego_pred = lego_pred.withColumn('probability',lego_pred.probability.cast(tp.StringType()))
#lego_pred = lego_pred.withColumn('prediction',lego_pred.prediction.cast(tp.StringType()))
lego_pred.show()

In [None]:
df = lego_pred.toPandas()
df

In [None]:
#spark.conf.set("spark.sql.execution.arrow.enabled", "true")
#simulate storing for later/historical data analysis
lego_pred.write.csv('C:/Users/hayle/Desktop/lego_post_data/test.csv')

In [None]:
df = df[['comment','prediction']]
df

In [None]:
df_metric = df.groupby('prediction').count()
df_metric['sentiment'] = ['negative', 'positive']

In [None]:
df_metric

In [None]:
#Pie chart for dashboard
palette_color = sns.color_palette('bright')
labels = ['negative', 'postive']
plt.pie(data=df_metric,x= 'comment', labels=labels, colors=palette_color, autopct='%.0f%%')
plt.show()

#Barchart for dashboard
bars = sns.barplot(x='comment',y='sentiment', data=df_metric)
bars.set(xlabel='comment count')
bars