<a href="https://colab.research.google.com/github/Maksym-Tymchenko/johnsnow/blob/main/CLASS_FOR_SENTIMENT_DETECTION_USING_SNOW_LABS_PIPELINES.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Class for Sentiment Analysis for News Articles**

## Colab Setup

In [None]:
# Install PySpark and Spark NLP
! pip install -q pyspark==3.1.2 spark-nlp

# Install Spark NLP Display lib
! pip install --upgrade -q spark-nlp-display

In [None]:
import sparknlp
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from tabulate import tabulate
import sparknlp
from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.pretrained import PretrainedPipeline
from sparknlp_display import NerVisualizer

spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

Spark NLP version:  3.4.1
Apache Spark version:  3.1.2


## Define a News Article

In [None]:
article = [ # two strings - headline & article body
"""Google sued in US over 'deceptive' location tracking""", # headline
"""Google is being sued in the US over accusations it deceived people about how to control location tracking.

The legal action refers to a widely reported 2018 revelation turning off one location-tracking setting in its apps was insufficient to fully disable the feature.

It accuses Google of using so-called dark patterns, marketing techniques that deliberately confuse.

Google said the claims were inaccurate and outdated.

'Unfair practices'
The legal action was filed in the District of Columbia. Similar ones were also filed in Texas, Indiana and Washington state.

It refers to an Associated Press revelation turning off Location History when using Google Maps or Search was insufficient - as a separate setting, Web and App Activity, continued to log location and other personal data.

The study, with researchers at Princeton University, found up to two billion Android and Apple devices could be affected.

"Google has relied on, and continues to rely on, deceptive and unfair practices that make it difficult for users to decline location tracking or to evaluate the data collection and processing to which they are purportedly consenting," the legal action alleges.

'Robust controls'
Google told BBC News the case was based "on inaccurate claims and outdated assertions about our settings".

A representative added: "We have always built privacy features into our products and provided robust controls for location data.

"We will vigorously defend ourselves and set the record straight."

Visual misdirection
The legal action claims Google's policies contained other "misleading, ambiguous and incomplete descriptions... but guarantee that consumers will not understand when their location is collected and retained by Google or for what purposes".

It refers to dark patterns, design choices that alter users' decision-making for the designer's benefit - such as, complicated navigation menus, visual misdirection, confusing wording and repeated nudging towards a particular outcome.

Data regulators are increasingly focusing on these practices.

Google faces a raft of other legal actions in the US, including:

In May 2020, Arizona filed a legal action over the same issue
In December 2020, multiple US states sued over the price and process of advertising auctions
In October 2020, the US Justice Department alleged Google had a monopoly over search and search advertising"""]



## Define the Brand Identification Class

In [None]:
class BrandIdentification:
    def __init__(self, MODEL_NAME):
        self.MODEL_NAME = MODEL_NAME

        # Define Spark NLP pipeline 
        documentAssembler = DocumentAssembler() \
            .setInputCol('text') \
            .setOutputCol('document')

        tokenizer = Tokenizer() \
            .setInputCols(['document']) \
            .setOutputCol('token')

        # ner_dl and onto_100 model are trained with glove_100d, so the embeddings in the pipeline should match
        if (self.MODEL_NAME == "ner_dl") or (self.MODEL_NAME == "onto_100"):
            embeddings = WordEmbeddingsModel.pretrained('glove_100d') \
                .setInputCols(["document", 'token']) \
                .setOutputCol("embeddings")

        # Bert model uses Bert embeddings
        elif self.MODEL_NAME == "ner_dl_bert":
            embeddings = BertEmbeddings.pretrained(name='bert_base_cased', lang='en') \
                .setInputCols(['document', 'token']) \
                .setOutputCol('embeddings')

        ner_model = NerDLModel.pretrained(MODEL_NAME, 'en') \
            .setInputCols(['document', 'token', 'embeddings']) \
            .setOutputCol('ner')

        ner_converter = NerConverter() \
            .setInputCols(['document', 'token', 'ner']) \
            .setOutputCol('ner_chunk')

        nlp_pipeline = Pipeline(stages=[
            documentAssembler, 
            tokenizer,
            embeddings,
            ner_model,
            ner_converter
        ])
        
        # Create the pipeline model
        empty_df = spark.createDataFrame([['']]).toDF('text')
        self.pipeline_model = nlp_pipeline.fit(empty_df)


    def create_ranked_result_df(self, text):
        # Run the pipeline for the text
        text_df = spark.createDataFrame(pd.DataFrame({'text': text}, index = [0]))
        result = self.pipeline_model.transform(text_df)
        
        # Tabulate results
        df = result.select(F.explode(F.arrays_zip('document.result', 'ner_chunk.result',"ner_chunk.metadata")).alias("cols")).select(\
        F.expr("cols['1']").alias("chunk"),
        F.expr("cols['2'].entity").alias('result'))
        
        # Rank the identified ORGs by frequencies
        ranked_df = df.filter(df.result == 'ORG').groupBy(df.chunk).count().orderBy('count', ascending=False)

        return ranked_df


    def predict_by_headline(self, headline):
        ranked_df_hl = self.create_ranked_result_df(headline)
        ranked_df_hl.show(100, truncate=False)

        # If only one ORG appears in headline, return it 
        if ranked_df_hl.count() == 1:
            return ranked_df_hl.first()[0] 
        else: # If no ORG appears, or multiple ORGs all appear once, return no brand
            return None


    def predict(self, body):
        ranked_df = self.create_ranked_result_df(body)
        ranked_df.show(100, truncate=False)

        # Return the ORG with highest freq (at least greater than 2)
        if ranked_df.first()[1] > 2: 
            return ranked_df.first()[0] 
        else:
            return None
        # TO DO: break even - Wikidata#


    # def visualise(self, ranked_df, result):
        # Visualise ORG names in text
        # NerVisualizer().display(
            # result = result.collect()[0],
            # label_col = 'ner_chunk',
            # document_col = 'document',
            # labels=['ORG']
        #)


## Define the Senitment Identification Class

In [None]:
class SentimentIdentification:

    def __init__(self, MODEL_NAME):
        """Creates a class for sentiment identication using specified model.

        Args:
          MODEL_NAME: Name of the Spark NLP pretrained pipeline.
        """

        # Create the pipeline instance
        self.MODEL_NAME = MODEL_NAME

        if self.MODEL_NAME == "custom_pipeline": # https://nlp.johnsnowlabs.com/2021/11/03/bert_sequence_classifier_finbert_en.html
          document_assembler = DocumentAssembler() \
              .setInputCol('text') \
              .setOutputCol('document')

          tokenizer = Tokenizer() \
              .setInputCols(['document']) \
              .setOutputCol('token')

          sequenceClassifier = BertForSequenceClassification \
                .pretrained('bert_sequence_classifier_finbert', 'en') \
                .setInputCols(['token', 'document']) \
                .setOutputCol('class') \
                .setCaseSensitive(True) \
                .setMaxSentenceLength(512)

          pipeline = Pipeline(stages=[
              document_assembler,
              tokenizer,
              sequenceClassifier
          ])

          self.pipeline_model = LightPipeline(pipeline.fit(spark.createDataFrame([['']]).toDF("text")))

        else:
          self.pipeline_model = PretrainedPipeline(self.MODEL_NAME, lang = 'en')



    def predict(self, text):
        """Predicts sentiment of the input string..

        Args:
          text: String to classify.
        """
        self.text = text

        # Annotate input text using pretrained model
        annotations =  self.pipeline_model.annotate(self.text)

        # Depending on the chosen pipeline the outputs will be slightly different
        if self.MODEL_NAME == "analyze_sentimentdl_glove_imdb":
          # print(f"{annotations['sentiment']} {annotations['document']}")

          if isinstance(self.text, list):
            return [annotation['sentiment'][0] for annotation in annotations] # Return the sentiment list of strings
          else:
            return annotations['sentiment'][0] # Return the sentiment string

        else:
          # print(f"{annotations['class']} {annotations['document']}")

          if isinstance(self.text, list):
            return [annotation['class'][0] for annotation in annotations] # Return the sentiment list of strings
          else:
            return annotations['class'][0] # Return the sentiment string

## Identify Brand in news article


In [None]:
MODEL_NAME = "ner_dl_bert" # MODEL_NAME = "onto_100"

brand_identifier = BrandIdentification(MODEL_NAME)
headline, body = article

headline_brand = brand_identifier.predict_by_headline(headline)
print(headline)
print(headline_brand)

# Only use article body if no brand identified in the headline
if headline_brand == None:
    brand = brand_identifier.predict(body)
    print(brand)

bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[OK!]
ner_dl_bert download started this may take some time.
Approximate size to download 15.4 MB
[OK!]
+------+-----+
|chunk |count|
+------+-----+
|Google|1    |
+------+-----+

Google sued in US over 'deceptive' location tracking
Google


## Classify article using chosen pipeline

In [None]:
# identifier = SentimentIdentification(MODEL_NAME =  "analyze_sentimentdl_glove_imdb")
# identifier = SentimentIdentification(MODEL_NAME =  "classifierdl_bertwiki_finance_sentiment_pipeline")
identifier = SentimentIdentification(MODEL_NAME = "custom_pipeline") # Uses https://nlp.johnsnowlabs.com/2021/11/03/bert_sequence_classifier_finbert_en.html

# Predict by headline
headline = article[0]
identifier.predict(headline)

# Predict by body
body = article[1]
identifier.predict(body)


bert_sequence_classifier_finbert download started this may take some time.
Approximate size to download 390.9 MB
[OK!]


'negative'

## Test the accuracy of sentiment using the Financial News Headline Dataset

In [None]:
# Data downloaded from here: https://www.kaggle.com/ankurzing/sentiment-analysis-for-financial-news/version/5
# Upload data from local machine
# from google.colab import files
# uploaded = files.upload()

### Alternatively download directly from kaggle using api keys

In [None]:
!pip install opendatasets
import opendatasets as od
od.download("https://www.kaggle.com/ankurzing/sentiment-analysis-for-financial-news?select=all-data.csv")

# Input the following username and key when prompted:
# username: maxtimm
# Key: e9e0955b8d40b7d939e21febfbea8d15

Skipping, found downloaded files in "./sentiment-analysis-for-financial-news" (use force=True to force download)


## Convert Kaggle to dataframe and preprocess

In [None]:
import io
import time

# Store data in a Pandas Dataframe
df_pandas = pd.read_csv("./sentiment-analysis-for-financial-news/all-data.csv", encoding='latin-1')

# Change column names (pipelines require a "text" column to predict)
df_pandas.columns = ['True_Sentiment', 'text']

# shuffle the DataFrame rows
df_pandas = df_pandas.sample(frac = 1)

# Make dataset smaller for faster runtime
num_sentences = 10
total_num_sentences = df_pandas.shape[0]
df_pandas.drop(df_pandas.index[num_sentences:total_num_sentences], inplace=True)

## Classify sentences one by one

In [None]:
# Create the identifier object
# identifier = SentimentIdentification(MODEL_NAME = "custom_pipeline") # 90.2% accuracy on 500 sentences 89.8% on 1000 sentences
# identifier = SentimentIdentification(MODEL_NAME =  "classifierdl_bertwiki_finance_sentiment_pipeline") # Alternative pretrained pipeline 90.0% accuracy on 500 sentences

preds = []
target = []
ignored_idxs = []
sentiment_to_ignore = "" # e.g. neutral

# Measure how long it takes
start = time.time()

# Collect predicted sentiment for each headline - take three minutes to run
for idx, hl in enumerate(df_pandas['text']):

    # Only append the sentiment if it is not the sentiment to ignore (e.g. neutral)
    target_sentiment = df_pandas["True_Sentiment"][df_pandas.index[idx]]

    if target_sentiment != sentiment_to_ignore:
      preds.append(identifier.predict(hl))
    else:
      ignored_idxs.append(idx)

    # Print progress
    if idx % 25 == 0:
      print(f"Classification {100*idx/num_sentences}% done.")

# Remove all ignored entries from dataset
df_pandas.drop(df_pandas.index[ignored_idxs], inplace=True)

df_pandas['Predicted_Sentiment'] = preds

# Measure how long it takes
end = time.time()
print(f"{end-start} seconds elapsed to classify {num_sentences} sentences.")

# Modify predicted labels to match with true labels
# df = df.replace({'Predicted Sentiment': {'pos' : 'positive', 'neg' : 'negative'}})

df_pandas

Classification 0.0% done.
2.318124771118164 seconds elapsed to classify 10 sentences.


Unnamed: 0,True_Sentiment,text,Predicted_Sentiment
2090,positive,Diluted loss per share stood at EUR 0.15 versu...,negative
255,positive,"Operating profit totaled EUR 5.5 mn , up from ...",positive
1991,positive,Markets had been expecting a poor performance ...,positive
4685,negative,"( ADP News ) - Feb 4 , 2009 - Finnish broadban...",negative
2711,neutral,"Swedbank Hypotek - Is to issue a benchmark , f...",neutral
825,positive,BasWare 's CEO Ilkka Sihvo comments in conjunc...,positive
300,positive,The stock rose for a third day on Tuesday brin...,positive
1220,neutral,Finnish consumers prefer to buy the cheapest b...,neutral
2036,neutral,The Board of Directors proposes to the Shareho...,neutral
561,positive,Kazgiprotsvetmet and Outotec Finland have sign...,positive


## Measure the Accuracy

In [None]:
from sklearn.metrics import classification_report

y_true = df_pandas['True_Sentiment'].to_numpy()
y_pred = df_pandas['Predicted_Sentiment'].to_numpy()

print(f"The accuracy is {100* sum(y_true==y_pred)/len(y_true)}%. \n")

target_names = ['positive', 'neutral', 'negative']

# Compute classification metrics - poor accuracy
print(classification_report(y_true, y_pred, target_names=target_names))

The accuracy is 90.0%. 

              precision    recall  f1-score   support

    positive       0.50      1.00      0.67         1
     neutral       1.00      1.00      1.00         3
    negative       1.00      0.83      0.91         6

    accuracy                           0.90        10
   macro avg       0.83      0.94      0.86        10
weighted avg       0.95      0.90      0.91        10



### Classify using Spark Dataframe as input

In [1]:
from pyspark.sql.functions import array_join
from pyspark.sql.functions import col, explode, expr, greatest

# Define pretrained pipeline
# pipeline = PretrainedPipeline("classifierdl_bertwiki_finance_sentiment_pipeline", lang = 'en')

# Convert to spark dataframe for faster prediction
df_spark = spark.createDataFrame(df_pandas) 

# Measure how long it takes
start = time.time()

# Predict the sentiment
df_spark = pipeline.transform(df_spark)

# print(df_spark.first()['class'])
# df_spark.printSchema()

#Extract sentiment score
df_spark_scores = df_spark.select(explode(col("class.metadata")).alias("metadata")).select(col("metadata")["positive"].alias("positive"),
                                                                                    col("metadata")["neutral"].alias("neutral"),
                                                                                    col("metadata")["negative"].alias("negative"),)
df_pandas_scores = df_spark_scores.toPandas()

# df_spark_scores = df_spark_scores.withColumn('max_val', greatest('positive', 'negative', 'neutral')) # Doesn't work because of scientific notation

# df_spark_scores.show()


# Extract only targets and labels
df_spark = df_spark.select("text", "True_Sentiment", "class.result")

# Rename to Predicted Sentiment
df_spark = df_spark.withColumnRenamed("result","Predicted_Sentiment")

# Convert sentiment from a list to a string
df_spark = df_spark.withColumn("Predicted_Sentiment", array_join("Predicted_Sentiment", ""))

# Convert to pandas dataframe for postprocessing (https://towardsdatascience.com/text-classification-in-spark-nlp-with-bert-and-universal-sentence-encoders-e644d618ca32)
df_pandas = df_spark.toPandas()

# df_pandas["Predicted_Sentiment"] = df_pandas["Predicted_Sentiment"].apply(lambda x: x[0]) # Alternative to convert list to string

end = time.time()

print(f"{end-start} seconds elapsed to classify {num_sentences} sentences.")

df_pandas


ModuleNotFoundError: ignored

## Compute the Accuracy

In [None]:
from sklearn.metrics import classification_report, accuracy_score

# Compute the accuracy
accuracy = accuracy_score(df_pandas["True_Sentiment"], df_pandas["Predicted_Sentiment"])
print(f"The accuracy is {accuracy*100}%.")
print(classification_report(df_pandas["True_Sentiment"], df_pandas["Predicted_Sentiment"]))

# # Alternatively if not converted to pandas dataframe, use the following for the accuracy
# # Compute accuracy by comparing each true label with predicted label
# start = time.time()
# accuracy = df_spark.filter(df_spark.Predicted_Sentiment == df_spark.True_Sentiment).count()/ num_sentences
# end = time.time()
# print(f"{end-start} seconds elapsed to calculate accuracy of {num_sentences} sentences.")
# print(f"The accuracy is {accuracy*100}%.")

The accuracy is 90.0%.
              precision    recall  f1-score   support

     neutral       1.00      0.83      0.91         6
    positive       0.80      1.00      0.89         4

    accuracy                           0.90        10
   macro avg       0.90      0.92      0.90        10
weighted avg       0.92      0.90      0.90        10



## Alternatively extract predictions as strings (takes much longer)

In [None]:
# # Extract the predictions from the dataframe
# annotations_list = result.select("class.result").collect()
# sentiment_list = [annotations_list[i].result[0] for i in range(num_sentences)]

# # Annotate previous dataframe for visualization
# df_pandas['Predicted Sentiment'] = sentiment_list

# # Move text column to the beginning
# text_column = df_pandas.pop('text')
# df_pandas.insert(0, 'Headline', text_column)

# display(df_pandas)

# y_true = df_pandas['True Sentiment'].to_numpy()
# y_pred = df_pandas['Predicted Sentiment'].to_numpy()

# print(f"The accuracy is {100* sum(y_true==y_pred)/len(y_true)}%. \n")