<a href="https://colab.research.google.com/github/Brand-Sentiment-Tracking/dev-sentiment-package/blob/main/johnsnow/Package_notebook_with_tuples.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [None]:
# Install PySpark and Spark NLP
! pip install -q pyspark==3.1.2 spark-nlp

# Install Spark NLP Display lib
! pip install --upgrade -q spark-nlp-display


import nltk.data
import os
import json

import os
import time
import random
import sparknlp
import pandas as pd
from pyspark import SparkFiles
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as F
import sparknlp
from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.pretrained import PretrainedPipeline
from sparknlp_display import NerVisualizer

import sparknlp
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# from tabulate import tabulate
from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.pretrained import PretrainedPipeline

# Import functions to manipulate dataframe
from pyspark.sql.functions import array_join
from pyspark.sql.functions import col, explode, expr, greatest
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark import SparkFiles
from sklearn.metrics import classification_report, accuracy_score

import time
from IPython.display import display

# Unzip data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# before next step, upload "cc_download_articles.zip" to your google drive root folder
# !unzip ./drive/MyDrive/cc_download_articles.zip -d ./drive/MyDrive/group_nlp_data


Mounted at /content/drive


In [None]:
articles_folder = "./drive/MyDrive/group_nlp_data/cc_download_articles/"

# Define Extraction Class

In [None]:
class ArticleExtraction:
    def __init__(self):
        self.headlines = []

    def import_one_article(self, filepath):

        with open(filepath, "rb") as f:
            article = f.read().decode('utf-8')

        return article

    def import_one_headline_json(self, filepath):
        with open(filepath, 'r') as f:
            data = json.load(f)
            self.headlines.append(data['title'])

    def article_to_sentences(self, article):
        tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
        return '\n-----\n'.join(tokenizer.tokenize(article))

    def import_folder_headlines(self, folderpath):
        for filename in os.listdir(folderpath):
            filepath = os.path.join(folderpath, filename)
            self.import_one_headline_json(filepath)
        return self.headlines

# Define Brand Identification Class

In [None]:
# The spark udf function that has to be defined outside the class
def get_brand(row_list):
    if not row_list: # If the list is empty
        return [] # If no entities detected return an empty list

    else:
        # Create a list of lists with the idetified entity and type
        data = [[row.result, row.metadata['entity']] for row in row_list]
        return data

            
class BrandIdentification:
    def __init__(self, MODEL_NAME):
        self.MODEL_NAME = MODEL_NAME
        spark = sparknlp.start()

        # Define Spark NLP pipeline 
        documentAssembler = DocumentAssembler() \
            .setInputCol('text') \
            .setOutputCol('document')

        tokenizer = Tokenizer() \
            .setInputCols(['document']) \
            .setOutputCol('token')

        # ner_dl and onto_100 model are trained with glove_100d, so the embeddings in the pipeline should match
        if (self.MODEL_NAME == "ner_dl") or (self.MODEL_NAME == "onto_100"):
            embeddings = WordEmbeddingsModel.pretrained('glove_100d') \
                .setInputCols(["document", 'token']) \
                .setOutputCol("embeddings")

        # Bert model uses Bert embeddings
        elif self.MODEL_NAME == "ner_dl_bert":
            embeddings = BertEmbeddings.pretrained(name='bert_base_cased', lang='en') \
                .setInputCols(['document', 'token']) \
                .setOutputCol('embeddings')

        ner_model = NerDLModel.pretrained(MODEL_NAME, 'en') \
            .setInputCols(['document', 'token', 'embeddings']) \
            .setOutputCol('ner')

        ner_converter = NerConverter() \
            .setInputCols(['document', 'token', 'ner']) \
            .setOutputCol('ner_chunk')

        nlp_pipeline = Pipeline(stages=[
            documentAssembler, 
            tokenizer,
            embeddings,
            ner_model,
            ner_converter
        ])
        
        # Create the pipeline model
        empty_df = spark.createDataFrame([['']]).toDF('text') # An empty df with column name "text"
        self.pipeline_model = nlp_pipeline.fit(empty_df)


    def predict_brand(self, text): # text could be a pandas dataframe or a Spark dataframe (both with a column "text"), a list of strings or a single string
        # Run the pipeline for the text
        spark = sparknlp.start()
        
        if isinstance(text, pd.DataFrame): text_df = spark.createDataFrame(text) # If input a pandas dataframe
        elif isinstance(text, list): text_df = spark.createDataFrame(pd.DataFrame({'text': text})) # If input a list of strings
        elif isinstance(text, str): text_df = spark.createDataFrame(pd.DataFrame({'text': text}, index=[0])) # If input a single string
        else: text_df = text

        df_spark = self.pipeline_model.transform(text_df) 

        pred_brand = F.udf(lambda z: get_brand(z), ArrayType(ArrayType(StringType()))) # Output a list of lists containing [entity, type] pairs

        df_spark_combined = df_spark.withColumn('Predicted_Brand', pred_brand('ner_chunk'))
        df_spark_combined = df_spark_combined.select("text", "Predicted_Brand")
        # df_spark_combined.show(100)

        # Remove all rows with no brands detected
        df_spark_combined  = df_spark_combined.filter(F.size(df_spark_combined.Predicted_Brand) > 0) # Only keep lists with at least one entry
        # df_spark_final.show(100)

        # return df_spark_final
        return df_spark_combined

# Define Sentiment Class

In [None]:
# The spark udf function that has to be defined outside the class
def append_sentiment(pair_list, sentiment):
        """Append sentiment to each entry in pred brand list. """

        for pair in pair_list:
            pair.append(sentiment)

        return pair_list


class SentimentIdentification:

    def __init__(self, MODEL_NAME):
        """Creates a class for sentiment identication using specified model.
        Args:
          MODEL_NAME: Name of the Spark NLP pretrained pipeline.
        """

        # Create the pipeline instance
        self.MODEL_NAME = MODEL_NAME
        spark = sparknlp.start()

          # Create a custom pipline if requested
        if self.MODEL_NAME == "custom_pipeline": # https://nlp.johnsnowlabs.com/2021/11/03/bert_sequence_classifier_finbert_en.html
            document_assembler = DocumentAssembler() \
                .setInputCol('text') \
                .setOutputCol('document')

            tokenizer = Tokenizer() \
                .setInputCols(['document']) \
                .setOutputCol('token')

            sequenceClassifier = BertForSequenceClassification \
                  .pretrained('bert_sequence_classifier_finbert', 'en') \
                  .setInputCols(['token', 'document']) \
                  .setOutputCol('class') \
                  .setCaseSensitive(True) \
                  .setMaxSentenceLength(512)

            pipeline = Pipeline(stages=[
                document_assembler,
                tokenizer,
                sequenceClassifier
            ])

            self.pipeline_model = pipeline.fit(spark.createDataFrame([['']]).toDF("text"))

        else:
            self.pipeline_model = PretrainedPipeline(self.MODEL_NAME, lang = 'en')


    def predict_dataframe(self, df):
        """Annotates the input dataframe with the classification results.
        Args:
          df : Pandas or Spark dataframe to classify (must contain a "text" column)
        """
        spark = sparknlp.start()
        
        if isinstance(df, pd.DataFrame):
            # Convert to spark dataframe for faster prediction
            df_spark = spark.createDataFrame(df) 
        else:
            df_spark = df

        # Annotate dataframe with classification results
        df_spark = self.pipeline_model.transform(df_spark)


        # Extract sentiment score
        if self.MODEL_NAME == "custom_pipeline":
          df_spark_scores = df_spark.select(explode(col("class.metadata")).alias("metadata")).select(col("metadata")["Some(positive)"].alias("positive"),
                                                                                            col("metadata")["Some(neutral)"].alias("neutral"),
                                                                                            col("metadata")["Some(negative)"].alias("negative"))
        else:
          df_spark_scores = df_spark.select(explode(col("class.metadata")).alias("metadata")).select(col("metadata")["positive"].alias("positive"),
                                                                                            col("metadata")["neutral"].alias("neutral"),
                                                                                            col("metadata")["negative"].alias("negative"))
        

        df_spark_scores = df_spark_scores.withColumn("score", col("positive")-col("negative"))

        # Extract only target and label columns
        # df_spark = df_spark.select("text", "True_Sentiment", "class.result")
        df_spark = df_spark.select("text", "Predicted_Brand", "class.result") # This is to run main.py

        # Rename to result column to Predicted Sentiment
        df_spark = df_spark.withColumnRenamed("result", "Predicted_Sentiment")

        # Convert sentiment from a list to a string
        df_spark = df_spark.withColumn("Predicted_Sentiment", array_join("Predicted_Sentiment", ""))

        # Join the predictions dataframe to the scores dataframe
        # Add temporary column index to join
        w = Window.orderBy(monotonically_increasing_id())
        df_spark_with_index =  df_spark.withColumn("columnindex", row_number().over(w))
        df_spark_scores_with_index =  df_spark_scores.withColumn("columnindex", row_number().over(w))

        # Join the predictions and the scores in one dataframe
        df_spark_with_index = df_spark_with_index.join(df_spark_scores_with_index,
                                df_spark_with_index.columnindex == df_spark_scores_with_index.columnindex,
                                'inner').drop(df_spark_scores_with_index.columnindex)

        # Remove the index column
        df_spark_combined = df_spark_with_index.drop(df_spark_with_index.columnindex)

        # Append sentiment to each entry in pred brand list
        append_sent = F.udf(lambda z, y: append_sentiment(z, y), ArrayType(ArrayType(StringType()))) # Output a list of lists
        df_spark_combined = df_spark_combined.withColumn('Predicted_Brand', append_sent('Predicted_Brand', 'Predicted_Sentiment'))

        # Convert to pandas dataframe for postprocessing (https://towardsdatascience.com/text-classification-in-spark-nlp-with-bert-and-universal-sentence-encoders-e644d618ca32)

        df_spark_combined.show(100)
        
        # return df_pandas_postprocessed
        return df_spark_combined


    def predict_string_list(self, string_list):
        """Predicts sentiment of the input list of strings.
        Args:
          string_list: List of strings to classify.
        """
 
        # Annotate input text using pretrained model

        if self.MODEL_NAME == "custom_pipeline":
            pipeline_annotator = LightPipeline(self.pipeline_model) # Convert the pipeline to an annotator
        else:
            pipeline_annotator = self.pipeline_model

        annotations =  pipeline_annotator.annotate(string_list)

        return [annotation['class'][0] for annotation in annotations] # Return the sentiment list of strings


    def compute_accuracy(self, df_pandas_postprocessed):
        """Computes accuracy by comparing labels of input dataframe.
        Args:
          df_pandas_postprocessed: pandas dataframe containing "True_Sentiment" and "Predicted_Sentiment" columns
        """
    
        from sklearn.metrics import classification_report, accuracy_score

        # Compute the accuracy
        accuracy = accuracy_score(df_pandas_postprocessed["True_Sentiment"], df_pandas_postprocessed["Predicted_Sentiment"])
        accuracy *= 100
        classification_report = classification_report(df_pandas_postprocessed["True_Sentiment"], df_pandas_postprocessed["Predicted_Sentiment"])

        # Alternatively if the input is a postprocessed spark dataframe
        # Compute accuracy by comparing each true label with predicted label
        # accuracy = df_spark.filter(df_spark.Predicted_Sentiment == df_spark.True_Sentiment).count()/ num_sentences

        return accuracy, classification_report

# Run test

In [None]:
spark = sparknlp.start(gpu=True)

# Define brand and sentiment identifier objects
brand_identifier = BrandIdentification("ner_dl_bert")
# sentimentiser = SentimentIdentification(MODEL_NAME = "classifierdl_bertwiki_finance_sentiment_pipeline")
sentimentiser = SentimentIdentification(MODEL_NAME = "custom_pipeline")
article_extractor = ArticleExtraction()

list_of_headlines = article_extractor.import_folder_headlines(articles_folder + 'cyprus-mail.com') #www.cnn.com

# Predict and append extracted brands
brand_spark_df = brand_identifier.predict_brand(list_of_headlines)

# Predict and append sentiment and score
spark_df_final = sentimentiser.predict_dataframe(brand_spark_df)

bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[OK!]
ner_dl_bert download started this may take some time.
Approximate size to download 15.4 MB
[OK!]
bert_sequence_classifier_finbert download started this may take some time.
Approximate size to download 390.9 MB
[OK!]
+--------------------+--------------------+-------------------+-----------+-----------+-----------+--------------------+
|                text|     Predicted_Brand|Predicted_Sentiment|   positive|    neutral|   negative|               score|
+--------------------+--------------------+-------------------+-----------+-----------+-----------+--------------------+
|Cyprus Mail News ...|[[Cyprus Mail New...|            neutral|0.038595352|  0.9248994|0.036505252|0.002090099999999...|
|Man United spoil ...|[[Man United, ORG...|            neutral|  0.1830672| 0.80010974|0.016823042|         0.166244158|
|Forty hijack pass...|[[Egypt, LOC, neg...|           negative|0.015288657| 0

Unnamed: 0,text,Predicted_Brand,Predicted_Sentiment,positive,neutral,negative,score
0,Cyprus Mail News and More,"[[Cyprus Mail News and More, ORG, neutral]]",neutral,0.038595352,0.9248994,0.036505252,0.00209
1,Man United spoil West Ham party to reach FA Cu...,"[[Man United, ORG, neutral], [West Ham, ORG, n...",neutral,0.1830672,0.80010974,0.016823042,0.166244
2,"Forty hijack passengers flown back to Egypt, c...","[[Egypt, LOC, negative]]",negative,0.015288657,0.10679633,0.877915,-0.862626
3,"Pakistan detained more than 5,000 after Easter...","[[Pakistan, LOC, negative]]",negative,0.013677679,0.09166545,0.8946569,-0.880979
4,Papadopoulos wants Cyprus problem issues in pa...,"[[Papadopoulos, PER, neutral], [Cyprus, LOC, n...",neutral,0.033094272,0.6027314,0.3641743,-0.33108
5,Turkey signals no quick end to Syria incursion...,"[[Turkey, LOC, negative], [Syria, LOC, negative]]",negative,0.016433084,0.070145525,0.9134214,-0.896988
6,APOEL face Olympiacos in Europa League,"[[APOEL, ORG, neutral], [Olympiacos, ORG, neut...",neutral,0.054285347,0.9071312,0.03858346,0.015702
7,Cabinet reappoints Kalogirou as CySEC chairwoman,"[[Kalogirou, PER, neutral], [CySEC, ORG, neutr...",neutral,0.045380227,0.92480344,0.02981633,0.015564
8,UEFA keeps it in the family as Champions Leagu...,"[[UEFA, ORG, neutral], [Champions League, MISC...",neutral,0.037894357,0.9407692,0.02133645,0.016558
9,Geeks rule at CyprusComicCon,"[[CyprusComicCon, ORG, neutral]]",neutral,0.022995394,0.91113853,0.06586606,-0.042871


# Display results nicely

In [None]:
display(spark_df_final.toPandas()) # Display as pandas dataframe for better visualization

Unnamed: 0,text,Predicted_Brand,Predicted_Sentiment,positive,neutral,negative,score
0,Cyprus Mail News and More,"[[Cyprus Mail News and More, ORG, neutral]]",neutral,0.038595352,0.9248994,0.036505252,0.00209
1,Man United spoil West Ham party to reach FA Cu...,"[[Man United, ORG, neutral], [West Ham, ORG, n...",neutral,0.1830672,0.80010974,0.016823042,0.166244
2,"Forty hijack passengers flown back to Egypt, c...","[[Egypt, LOC, negative]]",negative,0.015288657,0.10679633,0.877915,-0.862626
3,"Pakistan detained more than 5,000 after Easter...","[[Pakistan, LOC, negative]]",negative,0.013677679,0.09166545,0.8946569,-0.880979
4,Papadopoulos wants Cyprus problem issues in pa...,"[[Papadopoulos, PER, neutral], [Cyprus, LOC, n...",neutral,0.033094272,0.6027314,0.3641743,-0.33108
5,Turkey signals no quick end to Syria incursion...,"[[Turkey, LOC, negative], [Syria, LOC, negative]]",negative,0.016433084,0.070145525,0.9134214,-0.896988
6,APOEL face Olympiacos in Europa League,"[[APOEL, ORG, neutral], [Olympiacos, ORG, neut...",neutral,0.054285347,0.9071312,0.03858346,0.015702
7,Cabinet reappoints Kalogirou as CySEC chairwoman,"[[Kalogirou, PER, neutral], [CySEC, ORG, neutr...",neutral,0.045380227,0.92480344,0.02981633,0.015564
8,UEFA keeps it in the family as Champions Leagu...,"[[UEFA, ORG, neutral], [Champions League, MISC...",neutral,0.037894357,0.9407692,0.02133645,0.016558
9,Geeks rule at CyprusComicCon,"[[CyprusComicCon, ORG, neutral]]",neutral,0.022995394,0.91113853,0.06586606,-0.042871
