### MSc Data Analytics 

##### CA2 - Integrated Assignment sem2

#### 2020274 - Clarissa Cardoso





This Notebook contains experimental features for CA2 using different databases to store and retreat files from. The goal of this project is to combine language processing techniques and a time series forecasting to predict the average sentiment of tweets for a certein period of time after the apropriate data cleaning and processing tecniques are applied.

Fpr a better understanding the project will be divided into the following sections:

- First section consists of importing dataset from various databases, and an attemp to evaluate their performance and usability, helping to select the most suitable dataset for the analysis.

- Second section will focus on data cleaning and preprocessing the dataset

- Third section focus on deeper EDA features and Natural Language Processing to undertand the dataset better prior to modeling and extract the sentiment from tweets given.

- Section four centers on creating the time-series model and selection of apropriate parametrers and hyperparameters to run it.

- Fifth section relies on training the model and validating/reacessing features that can be modified for better performance and compare model's results. 






#### Introduction


The goal of this project is to perform an analysis of the given dataset containing several tweets while experimenting with different databases to store data as well as creating a  time series forecast of the sentiment of the dataset. 

For the initial experimentation, after instalation of different noSQL databases as seen in class tutorials, I have decided to start with Hbase. One of the reasons why this was the first database used for the project is that it is built on top of HDFS as a part of Hadoop environment and provides a faster lookup on files while displaying lower latency for queries. 



### Libraries required for project


In [7]:
# importing necessary libraries to deploy pyspark functions

from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

from pyspark.sql.functions import count # Funcion to get the "size" of the data.
from pyspark.sql.functions import when # When function.
from pyspark.sql.functions import col # Function column.
from pyspark.sql.functions import mean, min, max, stddev # Imports function for statistical features. 
from pyspark.sql import functions as F # Data processing framework.
from pyspark.sql.functions import size, split # Imports function size and split.
from pyspark.ml.feature import Tokenizer # Importing Tokenizer.
from pyspark.sql.functions import regexp_replace # Remove / Replace function.
from pyspark.sql.types import StructField, StructType # Importing features for Schema.
from pyspark.sql.types import IntegerType, StringType, TimestampType # Tools to create the schema.
from pyspark.sql.functions import udf # Imports function UDF (user defined functions).
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import max as max_


from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer, NGram, VectorAssembler, ChiSqSelector

from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from bs4 import BeautifulSoup  # For HTML parsing
from pyspark.sql.functions import lower

import numpy as np # for numerical operations.
import pandas as pd

import matplotlib.pyplot as plt # visualization
%matplotlib inline 

import warnings # Ignore warnings.
warnings.filterwarnings("ignore")

ModuleNotFoundError: No module named 'pandas'

#### Importing dataset from HDFS

My initial idea, once the dataset given was succesfuly stored in HDFS directory alocated for the CA development ("CA2/ProjectTweets.csv"), was to import it straighaway to a noSQL database and perform initial queries inside the HBase enviroment/shell to verify functionality.

However my VM had continuous crashes during this process, and the HMaster node managed by Zookeeper kept showing slower times for initializing the commands. After a few seconds the Zookeeper Connection with HDFS and Hase nodes was lost and it was taking me a longer time span to find an alternative. Since the csv file was already in hadoop, I decided to first import from HDFS and the perform some initial cleaning and EDA using Spark framework to process the data to then store the cleneaded data back to HBAse through a connector between Pyspark and the database.


- HDFS (Hadoop Distributed File System) is the primary storage system used by Hadoop applications. This open source framework works by rapidly transferring data between nodes. It's often used by companies who need to handle and store big data. <https://www.databricks.com/glossary>

## Import modules, create Spark Session and read file into dataframe

First step is to perform some basic exploratory data analysis to get a sense of the data. 

#### Check the first few rows of the dataset with .show()

File was imported with a header marked as 'false' so pyspark will input labels insted of using the first row. This makes room to rename the labels in coming steps. set up schema as true so pyspark utilises the same scema present in the original file, without overlapping the columns.



In [8]:
from pyspark.sql import SparkSession

# Initialize SparkSession including Legacy for timestamp
spark = SparkSession.builder.appName("Test Tweets")\
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "com.johnsnowlabs.nlp.serialization.SparkNLPKryoRegistrator") \
    .getOrCreate()

# Define the file path in HDFS
file_path = "hdfs:///user/hduser/CA2/ProjectTweets.csv"

# Read the CSV file
tweets_test = spark.read.csv(file_path, header=False, inferSchema=True)

# Show the DataFrame (optional)
tweets_test.show()


                                                                                

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

#### Checking the schema of the dataset

From this function we see most of the data is composed by strings, which makes sense, since we are working with mostly text. However, on the third column, with the dates of each tweet, we must have a datetime datatype in order to perform the timeseries analysis on further stages. 



In [9]:
# print schema
tweets_test.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



In [10]:
# get summary statistics
tweets_test.describe().show()



+-------+------------------+--------------------+--------------------+--------+--------------------+--------------------+
|summary|               _c0|                 _c1|                 _c2|     _c3|                 _c4|                 _c5|
+-------+------------------+--------------------+--------------------+--------+--------------------+--------------------+
|  count|           1600000|             1600000|             1600000| 1600000|             1600000|             1600000|
|   mean|          799999.5|1.9988175522956276E9|                null|    null| 4.325887521835714E9|                null|
| stddev|461880.35968924535|1.9357607362267256E8|                null|    null|5.162733218454889E10|                null|
|    min|                 0|          1467810369|Fri Apr 17 20:30:...|NO_QUERY|        000catnap000|                 ...|
|    max|           1599999|          2329205794|Wed May 27 07:27:...|NO_QUERY|          zzzzeus111|ï¿½ï¿½ï¿½ï¿½ï¿½ß§...|
+-------+---------------

                                                                                

### Part I : Data Pre-Processing and Cleaning


Rename cols and drop c1

Convert the date column to a timestamp format

In [11]:
from pyspark.sql.functions import to_timestamp

tweets_test = tweets_test.drop("_c1") \
           .withColumnRenamed("_c0", "index") \
           .withColumnRenamed("_c2", "date") \
           .withColumnRenamed("_c3", "query_flag") \
           .withColumnRenamed("_c4", "user") \
           .withColumnRenamed("_c5", "text") \

tweets_test.show()

+-----+--------------------+----------+---------------+--------------------+
|index|                date|query_flag|           user|                text|
+-----+--------------------+----------+---------------+--------------------+
|    0|Mon Apr 06 22:19:...|  NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|    1|Mon Apr 06 22:19:...|  NO_QUERY|  scotthamilton|is upset that he ...|
|    2|Mon Apr 06 22:19:...|  NO_QUERY|       mattycus|@Kenichan I dived...|
|    3|Mon Apr 06 22:19:...|  NO_QUERY|        ElleCTF|my whole body fee...|
|    4|Mon Apr 06 22:19:...|  NO_QUERY|         Karoli|@nationwideclass ...|
|    5|Mon Apr 06 22:20:...|  NO_QUERY|       joy_wolf|@Kwesidei not the...|
|    6|Mon Apr 06 22:20:...|  NO_QUERY|        mybirch|         Need a hug |
|    7|Mon Apr 06 22:20:...|  NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|    8|Mon Apr 06 22:20:...|  NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|    9|Mon Apr 06 22:20:...|  NO_QUERY|        mimismo|@twittera que me ...|

View a sample of the 'date' column, using the sample() function to double check the timezone used before conversion.


In [12]:
# see 10% of date row to see correct timezone before converting
tweets_test.select("date").sample(False, 0.1, seed=42).show()


+--------------------+
|                date|
+--------------------+
|Mon Apr 06 22:20:...|
|Mon Apr 06 22:20:...|
|Mon Apr 06 22:20:...|
|Mon Apr 06 22:22:...|
|Mon Apr 06 22:22:...|
|Mon Apr 06 22:23:...|
|Mon Apr 06 22:23:...|
|Mon Apr 06 22:23:...|
|Mon Apr 06 22:25:...|
|Mon Apr 06 22:26:...|
|Mon Apr 06 22:26:...|
|Mon Apr 06 22:26:...|
|Mon Apr 06 22:26:...|
|Mon Apr 06 22:26:...|
|Mon Apr 06 22:26:...|
|Mon Apr 06 22:27:...|
|Mon Apr 06 22:27:...|
|Mon Apr 06 22:28:...|
|Mon Apr 06 22:28:...|
|Mon Apr 06 22:31:...|
+--------------------+
only showing top 20 rows



In [13]:
# limit to 10 rows of date col.
sample_date_values = tweets_test.select("date").limit(10).collect()
for row in sample_date_values:
    print(row.date)

    #with this we can confirm the PDT - Pacific Day Time for apropriate conversion to timestamp.
    # this may influence further analysis. 

Mon Apr 06 22:19:45 PDT 2009
Mon Apr 06 22:19:49 PDT 2009
Mon Apr 06 22:19:53 PDT 2009
Mon Apr 06 22:19:57 PDT 2009
Mon Apr 06 22:19:57 PDT 2009
Mon Apr 06 22:20:00 PDT 2009
Mon Apr 06 22:20:03 PDT 2009
Mon Apr 06 22:20:03 PDT 2009
Mon Apr 06 22:20:05 PDT 2009
Mon Apr 06 22:20:09 PDT 2009


it's important to account for the PDT timezone used. When converting to datetime, the new schema was in the apropriate datatypes, however when i tried to sample the 'date' rows again i got an error as seen below: 

> <font color='red'> <b>Py4JJavaError:</b> An error occurred while calling o100.showString.
: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'EEE MMM dd HH:mm:ss z yyyy' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html </font> 

So according to Apache Spark documentation, I added a date parsing from java with SimpleDateFormat class to allow customization of the date format of the strings. For that the timezone needs to be specified to avoid any discrepancies. In this case, PDT is UTC-7 which is represented by 'z' in the Apache datetime patterns doc. 


In [14]:
from pyspark.sql.functions import to_timestamp

tweets_test = tweets_test.withColumn("date", to_timestamp(tweets_test.date, "EEE MMM dd HH:mm:ss z yyyy"))


In [15]:
# print schema
tweets_test.printSchema()

root
 |-- index: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- query_flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



#### Checking for missing values and shape


In [16]:
from pyspark.sql.functions import count, when, col
# Check for missing values in each column
tweets_test.select([count(when(col(c).isNull(), c)).alias(c) for c in tweets_test.columns]).show()



+-----+----+----------+----+----+
|index|date|query_flag|user|text|
+-----+----+----------+----+----+
|    0|   0|         0|   0|   0|
+-----+----+----------+----+----+



                                                                                

In [17]:
# print the sahpe of the dataset
num_rows = tweets_test.count()
num_cols = len(tweets_test.columns)

print(f"Number of Rows: {num_rows}")
print(f"Number of Columns: {num_cols}")

Number of Rows: 1600000
Number of Columns: 5


                                                                                

In [18]:
tweets_test.dropna()  # Drop rows containing NaN values for simplicity
tweets_test.show(5)

+-----+-------------------+----------+---------------+--------------------+
|index|               date|query_flag|           user|                text|
+-----+-------------------+----------+---------------+--------------------+
|    0|2009-04-07 05:19:45|  NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|    1|2009-04-07 05:19:49|  NO_QUERY|  scotthamilton|is upset that he ...|
|    2|2009-04-07 05:19:53|  NO_QUERY|       mattycus|@Kenichan I dived...|
|    3|2009-04-07 05:19:57|  NO_QUERY|        ElleCTF|my whole body fee...|
|    4|2009-04-07 05:19:57|  NO_QUERY|         Karoli|@nationwideclass ...|
+-----+-------------------+----------+---------------+--------------------+
only showing top 5 rows



#### Pyspark has some inbuilt functions for starting tne text processing, such as lowercasing, removing special characters and stopwords. 

The following text data preprocessing steps using PySpark functions are:

Lowercasing: We use the lower() function to convert all text to lowercase.

Removing Special Characters: We use regexp_replace() to remove any characters that are not alphanumeric or whitespace.

Removing Stopwords: We use the StopWordsRemover from the pyspark.ml.feature module to remove common stopwords.

The resulting DataFrame tweets_test will have the preprocessed text in the 'text' column.

PySpark doesn't have built-in support for stemming or lemmatization. To implement these kind of techniques, external libraries such as nltk can be implemented/imported. 
<b>NLTK<b/> provides a wide range of tools and resources for working with human language data, and it can complement Spark's capabilities in certain scenarios. 

However, when i tried to import the nltk functions to my vm, i encountered a series of incompatibility issues. Even after creating a virtual environment i was not able to install the library, and the same happened when i tried to install <b>Sparknlp<b/>, which is the language processing tool whitin the Spark enviroment considered to be the state of the art for a number of functionalities in the NLP area. 

There was also an attempt, as suggested by SparkNLP documentaroin, to inicialize pyspark with the additional packages for language processing but when i tried to import it back to the notebook it would not find the module installed, even when i apply the same command (pyspark --packages com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4) to the virtual env the packages were installed. A third nlp library was also atempted to import via pip: <b>textblob<b/>
    
#error: externally-managed-environment (#error: externally-managed-environment)

With the command found in stackoverflow (sudo apt install python3-nltk) available at https://askubuntu.com/questions/996185/how-can-i-install-nltk-for-python-3

i was able to go back to my original choice of applying nltk tools for extracting the sentiment of the tweets given.

In [19]:
#pip install beautifulsoup4 using virtual enviroment 'myenv'

Tokenization is performed using the Tokenizer class.

HTML parsing is done using the BeautifulSoup library, and a user-defined function (parse_html_udf) is registered and applied to create a new column named "cleaned_text."

A user-defined function is applied to remove special characters and numbers from the "cleaned_text" column.
Stop words are removed using the StopWordsRemover class.

In [20]:


# Convert text to lowercase
#df_cleaned = df_cleaned.withColumn('cleaned_words', lower('cleaned_words'))

# Remove duplicate rows based on the 'cleaned_words' column
#df_cleaned = df_cleaned.dropDuplicates(['cleaned_words'])

# Remove rows with empty 'cleaned_words'
#df_cleaned = df_cleaned.filter(df_cleaned.cleaned_words != '')

# Show the result
#df_cleaned.show(5,truncate=False)

Removing noise from data: 
    -Stop words
    -Special characters
    -transform all to lower case letters
    -remove numbers, duplicate characters and punctuation

In [21]:
# Remove URLs
tweets_test = tweets_test.withColumn("text", F.regexp_replace(F.col("text"), "http(s)?://[^\\s]+", ""))

# Remove HTML tags
tweets_test = tweets_test.withColumn("text", F.regexp_replace(F.col("text"), "<[^>]+>", ""))

# Remove mentions (i.e., @username)
tweets_test = tweets_test.withColumn("text", F.regexp_replace(F.col("text"), "@\\w+", ""))

# Convert to lowercase
tweets_test = tweets_test.withColumn('text', lower(tweets_test['text']))

# Remove numbers from the "text" column
tweets_test = tweets_test.withColumn('text', regexp_replace(tweets_test['text'], r'\d+', ''))

# Reduce excessive characters (more than two of the same in a row)
tweets_test = tweets_test.withColumn('text', regexp_replace('text', r'(.)\1{2,}', r'\1\1'))

# Remove punctuation
tweets_test = tweets_test.withColumn('text', regexp_replace(tweets_test['text'], r"[^\w\s]", ""))

tweets_test.select("text").show(truncate=False)

+-----------------------------------------------------------------------------------------------------------+
|text                                                                                                       |
+-----------------------------------------------------------------------------------------------------------+
|   a11 thats a bummer  you shoulda got david carr of third day to do it d                                  |
|is upset that he cant update his facebook by texting it11 and might cry as a result  school today also blah|
| i dived many times for the ball managed to save   the rest go out of bounds                               |
|my whole body feels itchy and like its on fire                                                             |
| no its not behaving at all im mad why am i here because i cant see you all over there                     |
| not the whole crew                                                                                        |
|need a hu

In [22]:
# Drop the 'flag' and 'user' columns
tweets_test = tweets_test.drop('query_flag', 'user')

# Show the result
tweets_test.show(5,truncate=False)

+-----+-------------------+-----------------------------------------------------------------------------------------------------------+
|index|date               |text                                                                                                       |
+-----+-------------------+-----------------------------------------------------------------------------------------------------------+
|0    |2009-04-07 05:19:45|   a11 thats a bummer  you shoulda got david carr of third day to do it d                                  |
|1    |2009-04-07 05:19:49|is upset that he cant update his facebook by texting it11 and might cry as a result  school today also blah|
|2    |2009-04-07 05:19:53| i dived many times for the ball managed to save   the rest go out of bounds                               |
|3    |2009-04-07 05:19:57|my whole body feels itchy and like its on fire                                                             |
|4    |2009-04-07 05:19:57| no its not behaving 

### Tokenization



In [23]:
# Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tweets_df = tokenizer.transform(tweets_test)

# Show the result
tweets_df.show(5,truncate=False)

+-----+-------------------+-----------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+
|index|date               |text                                                                                                       |words                                                                                                                             |
+-----+-------------------+-----------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------+
|0    |2009-04-07 05:19:45|   a11 thats a bummer  you shoulda got david carr of third day to do it d                                  |[, , , a11, thats, a, bummer, , you, shoulda, got, david, carr, 

In [24]:
# Show the result
tweets_test.show(5,truncate=False)

+-----+-------------------+-----------------------------------------------------------------------------------------------------------+
|index|date               |text                                                                                                       |
+-----+-------------------+-----------------------------------------------------------------------------------------------------------+
|0    |2009-04-07 05:19:45|   a11 thats a bummer  you shoulda got david carr of third day to do it d                                  |
|1    |2009-04-07 05:19:49|is upset that he cant update his facebook by texting it11 and might cry as a result  school today also blah|
|2    |2009-04-07 05:19:53| i dived many times for the ball managed to save   the rest go out of bounds                               |
|3    |2009-04-07 05:19:57|my whole body feels itchy and like its on fire                                                             |
|4    |2009-04-07 05:19:57| no its not behaving 

In [36]:
from sparknlp.pretrained import PretrainedPipeline

#pipeline = PretrainedPipeline("analyze_sentiment")

In [37]:
from pyspark.sql.functions import lit

# Assuming your existing DataFrame is named `tweets_df` and tokenized words are in the column "words"
# Add a label column (e.g., 1 for positive, 0 for negative)
labeled_df = tweets_df.withColumn("label", lit(1))  # You can customize this based on your sentiment classes

In [38]:
from pyspark.sql.functions import concat_ws

# Combine tokenized words into a single column
labeled_df = labeled_df.withColumn("combined_text", concat_ws(" ", "words"))

In [43]:
from sparknlp.annotator import SentimentDetector

# SentimentDetector setup
sentiment_detector = SentimentDetector() \
    .setInputCols(["combined_text", "words"]) \
    .setOutputCol("sentiment")

# Create a new pipeline with the SentimentDetector
pipeline_sentiment = Pipeline(stages=[sentiment_detector])

# Apply sentiment detection
analyzed_df = pipeline_sentiment.fit(labeled_df).transform(labeled_df)

# Show the results
analyzed_df.select("combined_text", "sentiment.result", "label").show(truncate=False)


Py4JJavaError: An error occurred while calling o747.fit.
: java.util.NoSuchElementException: Failed to find a default value for dictionary
	at org.apache.spark.ml.param.Params.$anonfun$getOrDefault$2(params.scala:758)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.ml.param.Params.getOrDefault(params.scala:758)
	at org.apache.spark.ml.param.Params.getOrDefault$(params.scala:755)
	at org.apache.spark.ml.PipelineStage.getOrDefault(Pipeline.scala:41)
	at org.apache.spark.ml.param.Params.$(params.scala:764)
	at org.apache.spark.ml.param.Params.$$(params.scala:764)
	at org.apache.spark.ml.PipelineStage.$(Pipeline.scala:41)
	at com.johnsnowlabs.nlp.annotators.sda.pragmatic.SentimentDetector.train(SentimentDetector.scala:296)
	at com.johnsnowlabs.nlp.annotators.sda.pragmatic.SentimentDetector.train(SentimentDetector.scala:126)
	at com.johnsnowlabs.nlp.AnnotatorApproach._fit(AnnotatorApproach.scala:69)
	at com.johnsnowlabs.nlp.AnnotatorApproach.fit(AnnotatorApproach.scala:75)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


transform these tokens into numerical feature vectors using HashingTF: Hashing Term Frequency. The output of this function is a sparse vector of term frequency counts for each string.

In [19]:
hashtf = HashingTF(inputCol="words", outputCol='tf')

After obtaining this frequency vector, we need to pass it into the IDF function. This function will add weights to each word. The more frequent the word is, the lower its weight, and vice versa. This allows us to compensate for the bias in a large corpus of text like this

In [20]:
idf = IDF(inputCol='tf', outputCol="features")

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Create a SentimentIntensityAnalyzer instance
sid = SentimentIntensityAnalyzer()

# Define a UDF to apply sentiment analysis
def analyze_sentiment(text):
    return sid.polarity_scores(text)['compound']

# Register the UDF
analyze_sentiment_udf = udf(analyze_sentiment, StringType())

# Apply sentiment analysis to the DataFrame
tweets_test = tweets_test.withColumn("sentiment_score", analyze_sentiment_udf("text"))

# Show the result
tweets_test.show(5,truncate=False)

ModuleNotFoundError: No module named 'nltk'

In [22]:
# Broadcast the NLTK library to all worker nodes
nltk.download('vader_lexicon')
broadcast_nltk = spark.sparkContext.broadcast(nltk)

# Create a SentimentIntensityAnalyzer instance using the broadcasted NLTK
sid = broadcast_nltk.value.sentiment.vader.SentimentIntensityAnalyzer()

# Define a UDF to apply sentiment analysis
def analyze_sentiment(text):
    return sid.polarity_scores(text)['compound']

# Register the UDF
analyze_sentiment_udf = udf(analyze_sentiment, StringType())

# Apply sentiment analysis to the DataFrame
tweets_test = tweets_test.withColumn("sentiment_score", analyze_sentiment_udf("text"))

# Show the result
tweets_test.show(5, truncate=False)

NameError: name 'nltk' is not defined

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer

ModuleNotFoundError: No module named 'nltk'

In [27]:
#!pip install textblob
#error: externally-managed-environment

#!pip3 install sparknlp

#### Sentiment extration using SparkNLP library/pipeline

The sentiment analysis results will be stored in the 'sentiment.result' column once the data is preprocessed.

This pipeline uses the DocumentAssembler to assemble the words into documents, which is required for the Spark NLP SentimentDetector. Then, it applies the sentiment analysis using the trained model.

In [25]:
#!pip install sparknlp

In [26]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from sparknlp.annotator import SentimentDetector
from sparknlp.base import DocumentAssembler
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

# 
# Create a DocumentAssembler
document_assembler = DocumentAssembler().setInputCol("words").setOutputCol("document")

# Sentiment analysis
sentiment_detector = SentimentDetector().setInputCols(["document"]).setOutputCol("sentiment")

# Create a pipeline
pipeline = Pipeline(stages=[document_assembler, sentiment_detector])

# Fit the pipeline on your DataFrame
model = pipeline.fit(tweets_df)

# Transform the DataFrame
result = model.transform(tweets_df)

# Show the result
result.select("text", "sentiment.result").show(truncate=False)

TypeError: setInputCols in SentimentDetector_43d6bdaa0c1c expecting 2 columns. Provided column amount: 1. Which should be columns from the following annotators: ['token', 'document']

In [None]:
#!pip3 install vaderSentiment
#error: externally-managed-environment

the main differences between the Pandas & PySpark, operations on Pyspark run faster than Pandas due to its distributed nature and parallel execution on multiple cores and machines.

In other words, pandas run operations on a single node whereas PySpark runs on multiple machines. If you are working on a Machine Learning application where you are dealing with larger datasets, PySpark processes operations many times faster than pandas.

scaling a cluster involves a trade-off between performance and cost. Adding more resources may improve performance, but it also increases infrastructure costs. It's essential to find the right balance based on your application's requirements and budget constraints.

In [None]:
#tweets_df = tweets_test.toPandas()
#print(tweets_df)

In [None]:
#tweets_df

In [None]:
import nltk
nltk.download('punkt')
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')
nltk.download('vader_lexicon')

In [None]:
from nltk.sentiment import SentimentIntensityAnalyzer

sid = SentimentIntensityAnalyzer()
pandas_df['sentiment'] = pandas_df['cleaned_text'].apply(lambda x: 1 if sid.polarity_scores(str(x))['compound'] > 0 else 0)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(StringType())
def analyze_sentiment(tokens):
    # Import NLTK and perform sentiment analysis
    from nltk.sentiment import SentimentIntensityAnalyzer
    sid = SentimentIntensityAnalyzer()
    
    # Combine tokens into a string (assuming 'tokens' is a list of words)
    text = " ".join(tokens)

    # Perform sentiment analysis
    sentiment_score = sid.polarity_scores(text)

    # Return sentiment label based on the compound score
    if sentiment_score['compound'] >= 0.05:
        return 'positive'
    elif sentiment_score['compound'] <= -0.05:
        return 'negative'
    else:
        return 'neutral'


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

# Assuming 'df' is your tokenized DataFrame with a column named 'tokens'
tweets_test = tweets_test.withColumn("sentiment", analyze_sentiment(tweets_test.tokens))

The VADER sentiment analyzer is a simple rule-based model that works well for social media text. If you need more advanced sentiment analysis, you might want to explore machine learning-based approaches, such as using pre-trained models like BERT or spaCy.

"By far the most popular and comprehensive library, to my knowledge, for Spark-native distributed NLP, is spark-nlp from John Snow Labs. https://nlp.johnsnowlabs.com/ It is open source (but with commercial support options) and has a whole lot of functionality.

You can also use spacy, nltk, and other non-Spark NLP libraries with Spark, by writing pandas UDFs that leverage these libraries, then applying them to data with Spark."https://community.databricks.com/t5/machine-learning/what-are-best-nlp-libraries-to-use-with-spark/td-p/24033#:~:text=You%20can%20also%20use%20spacy,them%20to%20data%20with%20Spark.


In [None]:
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
sid = SentimentIntensityAnalyzer()

- store cleaned dataset back to hbase/mysql


#### Importing dataset from HBase using a Connector.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("ProjectTweets") \
    .master("local[*]") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "com.johnsnowlabs.nlp.serialization.SparkNLPKryoRegistrator") \
    .getOrCreate()

# Download NLTK data
nltk.download('vader_lexicon')

# Broadcast the NLTK library to all worker nodes
broadcast_nltk = spark.sparkContext.broadcast(nltk)

# Define a UDF to apply sentiment analysis
def analyze_sentiment(text):
    # Access the SentimentIntensityAnalyzer from the broadcasted NLTK
    sid = SentimentIntensityAnalyzer()
    return sid.polarity_scores(text)['compound']

# Register the UDF
analyze_sentiment_udf = udf(analyze_sentiment, StringType())

# Apply sentiment analysis to the DataFrame
tweets_test = tweets_test.withColumn("sentiment_score", analyze_sentiment_udf("text"))

# Show the result
tweets_test.show(5, truncate=False)