In [0]:
# Implementing Naive Bayes Classifier using Spark MapReduce

In [0]:
import requests
import pandas as pd
import io
# I will perform sentiment analysis on financial data

url = "https://drive.google.com/file/d/1uDCTfXfE7mrvb1U9ta7jzvgWt551H7br/view?usp=sharing"

file_id=url.split('/')[-2]
dwn_url='https://drive.google.com/uc?id=' + file_id

# Download the CSV file
response = requests.get(dwn_url)
financial_csv_data = response.content.decode('utf-8')

# Replace '\t' with '\n' to split lines
financial_csv_data = financial_csv_data.replace('\t', '\n')

In [0]:
# Load CSV data into Pandas DataFrame
financial_data_df = pd.read_csv(io.StringIO(financial_csv_data))

# show full width
pd.set_option('display.max_colwidth', None)

# Display 10 rows
pd.set_option('display.max_rows', 10)

In [0]:
# show the dataframe
financial_data_df

Unnamed: 0,Sentence,Sentiment
0,"The GeoSolutions technology will leverage Benefon 's GPS solutions by providing Location Based Search Technology , a Communities Platform , location relevant multimedia content and a new and powerful commercial model .",positive
1,"$ESI on lows, down $1.50 to $2.50 BK a real possibility",negative
2,"For the last quarter of 2010 , Componenta 's net sales doubled to EUR131m from EUR76m for the same period a year earlier , while it moved to a zero pre-tax profit from a pre-tax loss of EUR7m .",positive
3,"According to the Finnish-Russian Chamber of Commerce , all the major construction companies of Finland are operating in Russia .",neutral
4,"The Swedish buyout firm has sold its remaining 22.4 percent stake , almost eighteen months after taking the company public in Finland .",neutral
...,...,...
5837,RISING costs have forced packaging producer Huhtamaki to axe 90 jobs at its Hampshire manufacturing plant .,negative
5838,Nordic Walking was first used as a summer training method by cross-country skiers .,neutral
5839,"According shipping company Viking Line , the EU decision will have a significant financial impact .",neutral
5840,"In the building and home improvement trade , sales decreased by 22.5 % to EUR 201.4 mn .",neutral


In [0]:
# Convert to spark dataframe
spark_df = spark.createDataFrame(financial_data_df)

# Convert Spark DataFrame to RDD
financial_data_rdd = spark_df.rdd
print(financial_data_rdd.take(1))

[Row(Sentence="The GeoSolutions technology will leverage Benefon 's GPS solutions by providing Location Based Search Technology , a Communities Platform , location relevant multimedia content and a new and powerful commercial model .", Sentiment='positive')]


In [0]:
# 1. Data Preprocessing: The first step would involve pre-processing a large text corpus of your choice using PySpark. This would involve steps such as tokenization, stemming, stop word removal. Remember to use a dataset that corresponds to classification problem.

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
import re

# Download NLTK resources if not already downloaded
nltk.download('punkt')
nltk.download('stopwords')

# Initialize the PorterStemmer
stemmer = PorterStemmer()

# get english stop words
stop_words = set(stopwords.words("english"))

# list of punctuation characters to remove
numeric_pattern = re.compile(r'^[0-9]+$')

def pre_process_sentence(sentence):
    # strip sentence
    sentence = sentence.strip()

    # Remove punctuation and non-word characters
    sentence = re.sub(r'[^\w\s]', '', sentence)

    # tokenize the sentence
    tokens = word_tokenize(sentence)

    # stemming (reducing word to their root base) and removing english stop words
    filtered_tokens = []
    # Iterate over each word in the tokens list
    for word in tokens:
      # filter out numbers
      if not numeric_pattern.match(word):
        # stemming word
        lowercase_word = word.lower()
        #stemmed_word = stemmer.stem(lowercase_word) # commented because hurting accuracy

        # Convert the word to lowercase and check if it's not in the stop_words set
        if lowercase_word not in stop_words:
            # If the word passes the condition, include it in the filtered_tokens list
            filtered_tokens.append(lowercase_word)
    return filtered_tokens

financial_data_rdd = financial_data_rdd.map(lambda x: (pre_process_sentence(x[0]), x[1]))

print(financial_data_rdd.take(1))

[(['geosolutions', 'technology', 'leverage', 'benefon', 'gps', 'solutions', 'providing', 'location', 'based', 'search', 'technology', 'communities', 'platform', 'location', 'relevant', 'multimedia', 'content', 'new', 'powerful', 'commercial', 'model'], 'positive')]


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [0]:
#2. Splitting the dataset: Split the preprocessed dataset into a training set and a testing set.
training_rdd, testing_rdd = financial_data_rdd.randomSplit([0.8, 0.2]) # 80/20 split
print(training_rdd.take(1))
print("")
print(testing_rdd.take(1))

[(['last', 'quarter', 'componenta', 'net', 'sales', 'doubled', 'eur131m', 'eur76m', 'period', 'year', 'earlier', 'moved', 'zero', 'pretax', 'profit', 'pretax', 'loss', 'eur7m'], 'positive')]

[(['geosolutions', 'technology', 'leverage', 'benefon', 'gps', 'solutions', 'providing', 'location', 'based', 'search', 'technology', 'communities', 'platform', 'location', 'relevant', 'multimedia', 'content', 'new', 'powerful', 'commercial', 'model'], 'positive')]


In [0]:
#3. Training the Naive Bayes model: Implement the Naive Bayes algorithm in PySpark using MapReduce to train the model on the training set.

In [0]:
#3a. calculate the probability of each class (i.e the class priors), in other words class / total rows
# Get the total count of all rows
total_class_count = training_rdd.count()

class_priors = training_rdd.map(lambda x: (x[1], 1))
class_priors = class_priors.reduceByKey(lambda x, y: x + y)
class_priors_collected = class_priors.collect()
# print the class priors
print(class_priors_collected)

negative_class_prior = class_priors_collected[0][1] / total_class_count
print(negative_class_prior)

positive_class_prior = class_priors_collected[1][1] / total_class_count
print(positive_class_prior)

neutral_class_prior = class_priors_collected[2][1] / total_class_count
print(neutral_class_prior)

[('neutral', 2521), ('positive', 1477), ('negative', 686)]
0.5382152006831767
0.31532877882152005
0.14645602049530315


In [0]:
#3b. For each word we will calculate the conditional probability for each class

# To do this we first have to make a vocabulary of the words we have:
vocabulary_rdd = training_rdd.flatMap(lambda x: x[0]).distinct()

# Step 4: Collect the vocabulary
print(vocabulary_rdd.collect())
print("")

# -----for positive class-----
# Map Step: Extract words from the first part of the tuple where the second tuple element is "positive"
positive_words_rdd = training_rdd.filter(lambda x: x[1] == 'positive').flatMap(lambda x: x[0])

# Count the frequency of each word
positive_words_count_rdd = positive_words_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Count the number of words in the positive class
positive_words_count = positive_words_count_rdd.count()

# Collect the words from positive class into dict
positive_words_class_dict = positive_words_count_rdd.collectAsMap()


# -----for negative class-----
# Map Step: Extract words from the first part of the tuple where the second tuple element is "negative"
negative_words_rdd = training_rdd.filter(lambda x: x[1] == 'negative').flatMap(lambda x: x[0])

# Count the frequency of each word
negative_words_count_rdd = negative_words_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Count the number of words in the negative class
negative_words_count = negative_words_count_rdd.count()

# Collect the words from negative class into dict
negative_words_class_dict = negative_words_count_rdd.collectAsMap()


# -----for neutral class-----
# Map Step: Extract words from the first part of the tuple where the second tuple element is "neutral"
neutral_words_rdd = training_rdd.filter(lambda x: x[1] == 'neutral').flatMap(lambda x: x[0])

# Count the frequency of each word
neutral_words_count_rdd = neutral_words_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Count the number of words in the neutral class
neutral_words_count = neutral_words_count_rdd.count()

# Collect the words from neutral class into dict
neutral_words_class_dict = neutral_words_count_rdd.collectAsMap()


['quarter', 'period', 'remaining', 'months', 'meets', 'corp', 'updates', 'results', 'department', 'store', 'floor', 'space', 'q1', 'adbe', 'nasdaq', 'listed', 'two', 'unnamed', 'water', 'staff', 'depending', 'httpstcofa5cnh2t0t', 'gross', 'olavi', 'work', 'led', 'dutch', 'service', 'assa', 'abloy', 'item', 'eur13', 'stood', 'ton', 'money', 'jump', 'declare', 'invalid', 'tendering', 'investments', 'lagardere', 'result', 'strengthened', 'skogberg', 'improved', 'obtained', 'technology', 'aluminium', 'sintering', 'like', 'generated', 'authorities', 'httpstksco1itr', 'crazy', 'stocks', 'gdx', 'inc', 'r', 'dissolved', 'transferred', 'apriljune', 'bn', 'delight', 'consumers', 'afe1v', 'gazpromshell', 'however', 'ceo', 'tougher', 'bread', 'connection', 'clients', 'performance', 'volumes', 'quality', 'lte', 'start', 'oy', 'production', 'grocer', 'sfo', 'investigation', 'lember', 'matter', 'topical', 'social', 'ministry', 'would', 'a1', 'aspocomp', 'decision', 'layoffs', 'orders', 'smarket', 'ma

In [0]:
# 3c. Calculate conditional probability for all words in the vocabulary (for all classes positive, negative, and neutral)

# ----- naive bayes with laplace-----
size_of_vocab = vocabulary_rdd.count()

# Calculate conditional probability
def conditional_probability(word, class_label, alpha = 1):
  if class_label == "positive":
    class_dict = positive_words_class_dict
    class_words_count = positive_words_count
  elif class_label == "negative":
    class_dict = negative_words_class_dict
    class_words_count = negative_words_count
  elif class_label == "neutral":
    class_dict = neutral_words_class_dict
    class_words_count = neutral_words_count

  if word in class_dict:
    conditional_probability = (class_dict[word] + alpha) / (class_words_count + alpha * size_of_vocab)
  else:
    conditional_probability = alpha / (class_words_count + alpha * size_of_vocab)

  return (word, conditional_probability)

# -----Calculate Positive Conditional Probabilities------
positive_cond_prob_rdd = vocabulary_rdd.map(lambda x: conditional_probability(x, "positive"))
positive_cond_prob_dict = positive_cond_prob_rdd.collectAsMap()

# -----Calculate Negative Conditional Probabilities------
negative_cond_prob_rdd = vocabulary_rdd.map(lambda x: conditional_probability(x, "negative"))
negative_cond_prob_dict = negative_cond_prob_rdd.collectAsMap()

# -----Calculate Neutral Conditional Probabilities------
neutral_cond_prob_rdd = vocabulary_rdd.map(lambda x: conditional_probability(x, "neutral"))
neutral_cond_prob_dict = neutral_cond_prob_rdd.collectAsMap()


In [0]:
#4. Testing the model: Use the trained model to classify the documents in the testing set and evaluate the performance of the model.

# ----- naive bayes with laplace-----

def test_sentence(sentence, alpha = 1):
  positive_posterior_probability = 1
  negative_posterior_probability = 1
  neutral_posterior_probability = 1

  for word in sentence:
    # Positive probability
    if word in positive_cond_prob_dict:
      positive_posterior_probability *= positive_cond_prob_dict[word]
    else:
      positive_posterior_probability *= alpha / (positive_words_count + alpha * size_of_vocab)

    # Negative probability
    if word in negative_cond_prob_dict:
      negative_posterior_probability *= negative_cond_prob_dict[word]
    else:
      negative_posterior_probability *= alpha / (negative_words_count + alpha * size_of_vocab)

    # Neutral probability
    if word in neutral_cond_prob_dict:
      neutral_posterior_probability *= neutral_cond_prob_dict[word]
    else:
      neutral_posterior_probability *= alpha / (neutral_words_count + alpha * size_of_vocab)

  # multiply each by prior
  positive_posterior_probability *= positive_class_prior
  negative_posterior_probability *= negative_class_prior
  neutral_posterior_probability *= neutral_class_prior

  if positive_posterior_probability > negative_posterior_probability and positive_posterior_probability > neutral_posterior_probability:
    return "positive"
  elif negative_posterior_probability > positive_posterior_probability and negative_posterior_probability > neutral_posterior_probability:
    return "negative"
  else:
    return "neutral"


test_predictions_rdd = testing_rdd.map(lambda x: (x[0], x[1], test_sentence(x[0])))


In [0]:
class_priors = [(positive_class_prior, negative_class_prior, neutral_class_prior)]

# Create a Pandas DataFrame
class_priors_df = pd.DataFrame(class_priors, columns=["positive_class_prior", "negative_class_prior", "neutral_class_prior"])

class_priors_df

Unnamed: 0,positive_class_prior,negative_class_prior,neutral_class_prior
0,0.315329,0.538215,0.146456


In [0]:
# Convert RDD to DataFrame
# Collect the data from the RDD
test_predictions_collected = test_predictions_rdd.collect()

# Create a Pandas DataFrame
test_predictions_df = pd.DataFrame(test_predictions_collected, columns=["Sentence (Features)", "Actual", "Prediction"])

test_predictions_df

Unnamed: 0,Sentence (Features),Actual,Prediction
0,"[geosolutions, technology, leverage, benefon, gps, solutions, providing, location, based, search, technology, communities, platform, location, relevant, multimedia, content, new, powerful, commercial, model]",positive,neutral
1,"[esi, lows, bk, real, possibility]",negative,positive
2,"[kone, net, sales, rose, yearonyear, first, nine, months]",positive,positive
3,"[circulation, revenue, increased, finland, sweden]",positive,positive
4,"[fb, gone, green, day]",positive,positive
...,...,...,...
1153,"[cern, consolidating, nice, long, entry, w, stop, targeting, area]",positive,positive
1154,"[pay, 2nd, time, test, bo, zone, still, breaking, moved, stop]",neutral,positive
1155,"[therefore, phase, iii, research, conducted, abbott]",neutral,neutral
1156,"[newly, formed, company, yit, stavo, local, contact, network, expertise, market, know, euro, stavokonsult, combined, yit, housing, concept, said, juha, kostiainen, yit, vice, president, corporate, communications, business, development]",positive,neutral


In [0]:
# Count the number of correctly classified documents
correct_predictions = test_predictions_rdd.filter(lambda x: x[1] == x[2]).count()

# Calculate total number of documents
total_predictions = test_predictions_rdd.count()

# Calculate accuracy
accuracy = correct_predictions / total_predictions

print("Accuracy:", accuracy)

Accuracy: 0.6606217616580311
