In [None]:
# import required libraries
import nltk
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split

import pandas as pd
import string

# download punctuation and stopwords from nltk
nltk.download('punkt')
nltk.download("stopwords")
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


True

In [None]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.3'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic I

In [None]:
# load tweets_df and view
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()
from pyspark import SparkFiles
url ="https://tweet-2022.s3.amazonaws.com/Tweets.csv"
# spark.sparkContext.addFile(url)
# spark_tweets_df = spark.read.csv(SparkFiles.get("Tweets.csv"), sep=",", header=True)
# tweets_df = spark.read.csv(SparkFiles.get("Tweets.csv"), sep=",", header=True)


tweets_df = pd.read_csv("/content/Tweets.csv")
# tweets_df=spark_tweets_df.toPandas()
# tweets_df

In [None]:
# get dataframe ready for processing

# make sure the tweets in column "text" are strings
tweets_df['text'] = tweets_df['text'].astype('str')

# delete the unneccessary columns
tweets_df = tweets_df.drop(columns=["textID", "selected_text"])
tweets_df=tweets_df.rename(columns={'sentiment':'class'})
tweets_df=tweets_df[['class','text']]


In [None]:
def nb_process_tweets(tweet):
    # make the text all lowercase
    tweet = tweet.lower()
    
    # remove punctuation
    tweet = "".join(char for char in tweet if char not in string.punctuation)
  
    # remove urls
    tweet_wo_stop = "".join([i for i in tweet if 'http' not in i])
    
    
    # lemmatization
    lemm = WordNetLemmatizer()
    lemmed = [lemm.lemmatize(word) for word in tweet_wo_stop]
    
    # put string together
    final_tweet = "".join(lemmed)
    
    return final_tweet

In [None]:
# # process tweets using above function
tweets_df['text'] = tweets_df['text'].apply(lambda x: nb_process_tweets(x))

tweets_df['length']=""
for index,row in tweets_df.iterrows():
  tweet_length=len(row['text'])
  tweets_df.loc[index,'length']=tweet_length
tweets_df

Unnamed: 0,class,text,length
0,neutral,id have responded if i were going,34
1,negative,sooo sad i will miss you here in san diego,43
2,negative,my boss is bullying me,22
3,negative,what interview leave me alone,30
4,negative,sons of why couldnt they put them on the rel...,69
...,...,...,...
27476,negative,wish we could come see u on denver husband l...,76
27477,negative,ive wondered about rake to the client has ma...,115
27478,positive,yay good for both of you enjoy the break you...,109
27479,positive,but it was worth it,22


In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [None]:
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [None]:
from pyspark.sql import SQLContext
sc = SparkSession.builder.getOrCreate()
sqlContext = SQLContext(sc)
spark_tweets = sqlContext.createDataFrame(tweets_df)
cleaner = data_prep_pipeline.fit(spark_tweets)
cleaned = cleaner.transform(spark_tweets)



In [None]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
# training, testing = cleaned.randomSplit([0.7, 0.3])
training, testing = cleaned.randomSplit([0.80, 0.20])




# Create a Naive Bayes model and fit training data
nb = NaiveBayes(labelCol="label", featuresCol="features", modelType="multinomial")
predictor = nb.fit(training)

In [None]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator(labelCol="label")
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.574141
