# Sentiment Analysis using Multiprocessing

Multiprocessing is a useful python package that enables users to utilize multiple processors on a given machine for more efficient progress. The Pool object allows the exploitation of data parallelism by distributing the work across a pool of processes running the same function. This greatly improves the speed at which the work is done, reducing overall runtime. 

Multiprocesing is mainly preferred when calling functions on larger sets of data expressing data parallelism. Data parallelism is the concept of breaking a set of data into smaller sets, which is then processed on multiple processes applying the same function without communicating with each other. Joining the output of these processes should produce the same result as if one process had applied the function to the entire dataset. 

# Import Libraries

In [6]:
# Libraries for parsing data
import pandas as pd
from io import StringIO
from html.parser import HTMLParser
import os
import spacy
from lxml import etree
import itertools
from itertools import repeat

# Libraries for importing our sentiment analysis models
import pickle
import sklearn
from sklearn import preprocessing
from sentence_transformers import SentenceTransformer as ST

# Libraries for Multiprocessing
import multiprocessing as mp
from multiprocessing import Pool
from time import perf_counter
import numpy as np
from tqdm import tqdm

# Load Data

In [None]:
# Set corpus to the folder of files you want to use
corpus = '/home/ec2-user/SageMaker/data/WSJ_full/'

# Read in files
input_files = os.listdir(corpus)

print("Loaded", len(input_files), "documents.")

# Check Total Cores
Check the total number of cores on your current device. The following multiprocessing portions will be using this variable. 

In [None]:
# Check core count
num_cores = mp.cpu_count()
print(num_cores)

In [None]:
# Retrieve metadata from XML document
def getxmlcontent(corpus, file, strip_html=True):
    try:
        tree = etree.parse(corpus + file)
        root = tree.getroot()

        if root.find('.//GOID') is not None:
            goid = root.find('.//GOID').text
        else:
            goid = None
        
        if root.find('.//Title') is not None:
            title = root.find('.//Title').text
        else:
            title = None
        
        if root.find('.//NumericDate')') is not None:
            date = root.find('.//NumericDate').text
        else:
            date = None

        if root.find('.//PublisherName') is not None:
            publisher = root.find('.//PublisherName').text
        else:
            publisher = None
        
        if root.find('.//MpubId') is not None:
            pubid = root.find('.//MpubId').text
        else:
            pubid = None

        if root.find('.//FullText') is not None:
            fulltext = root.find('.//FullText').text
        else:
            fulltext = None
        
        elif root.find('.//HiddenText') is not None:
            text = root.find('.//HiddenText').text
        
        elif root.find('.//Text') is not None:
            text = root.find('.//Text').text
        
        elif root.find('.//AbsText') is not None:
            text = root.find('.//AbsText').text
        
        else:
            text = None
        
        # Strip html from text portion
        if text is not None and strip_html == True:
            text = strip_tags(text)
    except Exception as e:
        print(f"Error while parsing file {file}: {e}")
    
    return goid, title, date, publisher, pubid, text

In [None]:
# Class for cleaning up text
class MLStripper(HTMLParser):
    def __init__(self):
        super().__init__()
        self.reset()
        self.strict = False
        self.convert_charrefs= True
        self.text = StringIO()

    def handle_data(self, d):
        self.text.write(d)  
    
    def get_data(self):
        return self.text.getvalue()
    
def strip_tags(html):
    "Remove HTML tags from the provided html text"
    s = MLStripper()
    s.feed(html)
    return s.get_data()

In [None]:
# Extract the necesssary content from the XML files
# Set up for multiprocessing -- for a split list of files
def parse_xmls(document_list):

    sents = []
    goids = []
    dates = []
    pubids = []

    nlp = spacy.load('../../Resources/Models/en_core_web_sm-3.2.0/en_core_web_sm/en_core_web_sm-3.2.0')

    try:
        for document in document_list:

            sents_short = []
            goids_short = []
            dates_short = []
            pubids_short = []

            #Get necessary metadata
            goid, title, date, publisher, pub_id, text = getxmlcontent(corpus, document, strip_html=True)

            if text is not None:
                all_sents = nlp(text)

                # Make list of all sentences
                sentences = []

                for sent in all_sents.sents:
                    sent_text = sent.text

                    if sent_text is None:
                        continue
                    sent_text = sent_text.strip()
                    if not sent_text:
                        continue

                    sentences.append(sent_text)

                sents_short.extend(sentences)
                goids_short.extend([goid] * len(sentences))
                dates_short.extend([date] * len(sentences))
                pubids_short.extend([pub_id] * len(sentences))

                # Combine all individual document info into one big list
                sents.extend(sents_short)
                goids.extend(goids_short)
                dates.extend(dates_short)
                pubids.extend(pubids_short)

    except AttributeError:
        # Error logging - will show filename if there is a problem processing it
        print("Attribute Error" + document)

    return sents, goids, dates, pubids

In [None]:
def parser(input_files):
    # When using multiple processes, important to eventually close them to avoid memory/resource leaks
    try: 
        # Record duration of operation
        # start = perf_counter()

        # Define a thread Pool to process multiple XML files simultaneously
        # Default set to num_cores, but may change number of processes (cores_used) depending on instance
        cores_used = num_cores - 1
        p_parse = Pool(processes = cores_used)

        # Apply function with Pool to corpus, array is split into smaller lists for faster progress
        split = np.array_split(input_files, num_cores)
        processed_lists = p_parse.map(parse_xmls, split)

        # end = perf_counter()

        #total_minutes = (end - start) / 60
        #total_seconds = (end - start) % 60

        #print(f"Took {int(total_minutes)}min {total_seconds :.2f}s to parse {len(input_files)} documents. ")

    except Exception as e:
        print(f"Error occurred while parsing documents: {e}")

    finally:
        p_parse.close()

    # Convert to dataframe to extract individual lists
    df = pd.DataFrame(processed_lists, columns=['sentences', 'goids', 'dates', 'pubids'])

    sent = df['sentences'].to_list()
    parsed_sents = list(itertools.chain.from_iterable(sent))

    goids = df['goids'].to_list()
    parsed_goids = list(itertools.chain.from_iterable(goids))

    dates = df['dates'].to_list()
    parsed_dates = list(itertools.chain.from_iterable(dates))

    pubids = df['pubids'].to_list()
    parsed_pubids = list(itertools.chain.from_iterable(pubids))

    # Make sure all arrays are the same length
    assert len(parsed_sents) == len(parsed_goids) == len(parsed_dates) == len(parsed_pubids), 'parsed data have unequal lengths'

    return parsed_sents, parsed_goids, parsed_dates, parsed_pubids

# Extract Emotion Encodings

In [None]:
# Extract the encodings that we will use
label_encoder = sklearn.preprocessing.LabelEncoder()
label_encodings = ["love", "anger", "disgust", "fear", "happiness", "sadness", "surprise", "neutral", "other"]
label_encodings.sort()
label_encoder.fit(label_encodings)

# Print out the emotions to check it has been loaded successfully
emotions = label_encoder.classes_
print(emotions)

# Load Classifier and Scaler
Load logistic regression classifier and pre-fit scaler and store the models.

In [None]:
# Set classifier_path to location of the logistic regression classifier
classifier_path = '../../Resources/Models/nli-mpnet-base-v2-LR-classifier.pkl'

try:
    # Load and store model in sentiment_model
    file = open(classifier_path, 'rb')
    sentiment_model = pickle.load(file)

except Exception as e:
    print(f"Error while opening file: {e}")

finally:
    file.close()

In [None]:
# Set scaler_path to location of the logistic regression classifier
scaler_path = '../../Resources/Models/sentimentScaler.pkl'

try:
    # Load and store model in sentiment_model
    file = open(scaler_path, 'rb')
    scaler = pickle.load(file)

except Exception as e:
    print(f"Error while opening file: {e}")

finally:
    file.close()

# Predict Labels with Multiprocessing
By using SBERT model and the array of parsed sentences, we will now output a corresponding array where each element is a tuple of the predicted label and an array of the raw possibilities for each label. The emotions ouputted should match that of the previous cell.

This process uses multiprocessing to efficiently run large-sized corpuses.

In [None]:
# Set sbert_path to location of SBERT model
sbert_path = '../../Resources/Models/nli-mpnet-base-v2'
transformer = ST(sbert_path)

In [None]:
# Encoding through multiprocessing
def encode_sentence(sent):

    # Encode chunk of sentences in parsed_sents array
    sentence_embedding = transformer.encode(sent, show_progress_bar=False)

    return sentence_embedding

In [None]:
def encoding_process(parsed_sents):
    # When using multiple processes, important to eventually close them to avoid memory/resource Leaks

    try:
        # start = perf_counter()

        # Define a thread Pool to process multiple sentences simultaneously
        # Default set to num_cores, but may change nubmer of processes depending on instance 
        cores_used = num_cores - 1
        p_encode = Pool(processes=cores_used)

        # Apply function with Pool to array
        chunksize = int(len(parsed_sents)/cores_used)
        sentence_embeddings = p_encode.map(encode_sentence, parsed_sents, chunksize)

        # end = perf_counter()

        # total_minutes = (end - start) / 60
        # total_seconds = (end - start) % 60

        #print(f"Took {int(total_minutes)}min {total_seconds:.2f}s to encode {len(parsed_sents)} sentences")
    except Exception as e:
        print(f"Error occurred while encoding sentences: {e}")

    finally:
        p_encode.close()

    return sentence_embeddings

In [None]:
def prediction(sentence_embeddings):
    # Both scaler and sentiment_model should exist before running this cell
    if scaler is not None and sentiment_model is not None:
        standardized = scaler.transform(sentence_embeddings)

        #y_pred_numeric = sentiment_model.predict(standardized)
        #y_pred_string = label_encoder.inverse_transform(y_pred_numeric)

        # Call the predict function on our sentences
        raw_predictions = sentiment_model.predict_proba(standardized)

        #results = list(zip(y_pred_string, raw_predictions))

    else:
        print("Please load scaler and sentiment model.")
    
    return raw_predictions

# Create Dataframe
Create a dataframe to show the predicted labels and each estimated value for each emotion.

In [None]:
def doc_mean(raw_predictions, parsed_goids, parsed_dates, parsed_pubids, parsed_sents):
    # Make column list
    columns = ['GOID', 'Date', 'PubID', 'Sentence']
    columns.extend(emotions)

    # Create dictionary for dataframe
    data = {}

    # map meta data
    data['GOID'] = parsed_goids
    data['Date'] = parsed_dates
    data['Pub ID'] = parsed_pubids
    data['Sentence'] = parsed_sents
    for i, emotion in enumerate(emotions):
        data[emotion] = raw_predictions[:, i]
    results_df = pd.DataFrame(data=data, columns=columns)

    # Create document-level dataframe
    means_df = results_df.groupby(['GOID'], as_index=True).agg({'Date': 'first', 
                                                                'anger': 'mean',
                                                                'disgust': 'mean',
                                                                'fear': 'mean',
                                                                'happiness': 'mean',
                                                                'love': 'mean',
                                                                'neutral': 'mean',
                                                                'other': 'mean',
                                                                'sadness': 'mean',
                                                                'surprise': 'mean'})
    
    return means_df

# Main Code

In [None]:
print(f'total number of docs to process: {len(input_files)}\n')

scale=1000
interval = np.arange(0, len(input_files), scale)
for i in tqdm(range(len(interval))):
    parsed_sents, parsed_goids, parsed_dates, parsed_pubids = parser(input_files[interval[i] : interval[i]+scale])
    sentence_embeddings = encoding_process(parsed_sents)
    raw_predictions = prediction(sentence_embeddings)

    means_df = doc_mean(raw_predictions, parsed_goids, parsed_dates, parsed_pubids, parsed_sents)

    # Save output to file
    means_df.to_csv(f'SA_results/doc_score_full.csv', mode='a', header=False, index=False)