# Project Overview: Sentiment Analysis on TripAdvisor Hotel Reviews

In this project, we focus on conducting Sentiment Analysis on hotel reviews sourced from TripAdvisor. The dataset used can be accessed [here](https://notebook.community/melqkiades/yelp/notebooks/TripAdvisor-Datasets).

## Project Pipeline:

1. **Data Preprocessing with MapReduce:**
   - We employ a MapReduce job for data preprocessing. This step involves aggregating hotel reviews based on their unique identifiers provided in the dataset.
   - All reviews corresponding to each hotel are merged under their respective identifiers to form a consolidated review dataset for each hotel.

2. **Hotel Evaluation:**
   - Post-aggregation, our objective is to identify the best hotels by analyzing two key metrics:
     - **Average Ratings:** Calculated from the collected review data.
     - **Sentiment Scores:** Derived from our Sentiment Analysis on the reviews.
     - **Other Measures:** That might involve the number of reviews and the sentiment score from the classifier.
  
3. **Data Filtering for Significance:**
   - To enhance computational efficiency and ensure meaningful results, we filter the dataset to include only hotels with more than 3,000 reviews. This threshold helps in avoiding skewed or insignificant data from hotels with fewer reviews.

4. **Ablative studies**:
 - We also implemented our own sentiment analysis model using PySpark ML to test this powerful library.

This approach allows us to effectively assess and rank hotels based on a combination of quantitative ratings and qualitative sentiment scores from user reviews.

## Imports

In [1]:
import shutil
import time
import json 
import os 
import re
import csv 

import plotly.express as px
import pandas as pd 
import numpy as np 

from pyspark import SparkContext, SparkConf
from flair.models import TextClassifier
from flair.data import Sentence 
from tqdm import tqdm
import torch

classifier = TextClassifier.load('en-sentiment')
score_to_value = {'NEGATIVE': 0, 'NEUTRAL': 1, 'POSITIVE': 2}  

## Map function

In [2]:
def map_function(reviews):    
    values_extracted = []
    
    reviews = dict(reviews)
    hotel_id = reviews['offering_id']
    ratings = reviews['ratings']
    review = reviews['text']
    key = hotel_id

    avg_ratings = sum(ratings.values())/len(ratings)
    values_extracted.append(avg_ratings)
    values_extracted.append(review)
    
    return [(key, values_extracted)]

## Reduce function 

In [3]:
def reduce_function(value1, value2):
    rating = (value1[0] + value2[0]) / 2
    reviews_total = value1[1] + ' ******* ' + value2[1]

    return (rating, reviews_total)

### JSON parser function

In [4]:
def parse_json(line):
    """
    Parse a JSON string into a Python dictionary.
    """
    try:
        return json.loads(line)
    except json.JSONDecodeError:
        return None

## MapReduce job

In [5]:
# Initialize Spark context
start = time.time()
print("*"*50)
print("Initializing Spark context...\n")
conf = SparkConf().setAppName("MapReduce")
sc = SparkContext(conf=conf)

# Load input data
input_path = "review.txt" 
# input_path = "test.txt" 

print("*"*50)
print(f"Reading input data from {input_path}...\n")
# Read the text file as an RDD
rdd = sc.textFile(input_path)

# Parse each JSON string into a Python dictionary
json_rdd = sc.textFile(input_path)

# Parse each JSON string into a Python dictionary
parsed_rdd = json_rdd.map(lambda line: json.loads(line))

# Map transformation
mapped_rdd = parsed_rdd.flatMap(map_function)

# Reduce transformation
reduced_rdd = mapped_rdd.reduceByKey(reduce_function)
output_path = "output_data" 

# Check if directory exists
if os.path.exists(output_path):
    shutil.rmtree(output_path)
    print(f"Removed existing directory {output_path}...\n")

print("*"*50)
print("Saving Resilient Distributed Dataset (RDD) to disk...\n")
# Save the output data
reduced_rdd.saveAsTextFile(output_path)

# Stop SparkContext
print("*"*50)
print("Stopping Spark context...\n")
sc.stop()
end = time.time()
print("Done.")
print(f"Time taken: {round(end - start, 2)} seconds.\n")

**************************************************
Initializing Spark context...



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/13 12:19:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**************************************************
Reading input data from review.txt...

Removed existing directory output_data...

**************************************************
Saving Resilient Distributed Dataset (RDD) to disk...



                                                                                

**************************************************
Stopping Spark context...

Done.
Time taken: 12.85 seconds.



## Additional functions

In [6]:
def parse_line(line):
    # Strip any leading/trailing whitespace
    line = line.strip()
    
    # Remove the outer parentheses from the line
    line = line[1:-1]
    line = line.replace("[", "")
    line = line.replace("]", "")

    # Split the line at the comma to separate the id from the rest
    id, rating, review = line.split(", ", 2)
    
    id.replace("(", "")

    # Convert id_part to an integer
    numerical_id = int(id)
    
    
    # Convert float_part to a float
    rating = rating.strip("(")
    try:
        float_number = float(rating)
    except ValueError:
        print(f"Error: Could not convert {rating} to a float., id = {numerical_id}")
    
    # Remove the surrounding quotes from the review part
    review_string = review.strip("'").strip(')')


    # Return the numerical ID, float number, and review string as a tuple
    return numerical_id, (float_number, review_string)

def read_file_and_extract_data(file_path):
    data_dict = {}
    
    # Open the file and read each line
    with open(file_path, 'r') as file:
        for line in file:
            # Parse each line
            numerical_id, value_tuple  = parse_line(line)
            
            # Store the parsed data in the dictionary
            data_dict[numerical_id] = value_tuple
    
    # Return the dictionary containing the extracted data
    return data_dict

def write_data_dict_to_file(data_dict, output_file_path):
    # Open the output file in write mode
    with open(output_file_path, 'a') as file:
        # Iterate over each key-value pair in the dictionary
        for numerical_id, value_tuple in data_dict.items():

            float_number, review_string = value_tuple
            line = f"{numerical_id}, ({float_number}, '{review_string}')\n"
            
            # Write the formatted line to the file
            file.write(line)

In [8]:
def find_part_files(directory):
    # Define the pattern to match part files (e.g., "part-00000" to "part-00034")
    pattern = re.compile(r"part-000[0-3][0-9]$")

    # List to hold the part files
    part_files = []

    # Iterate through each file in the directory
    for file_name in os.listdir(directory):
        # Check if the file name matches the pattern
        if pattern.match(file_name):
            # Add the file name to the list
            part_files.append(file_name)
    
    return part_files

directory_path = "/Users/sebastianodarconso/Desktop/università/magistrale_progetti/big_data_final/output_data"  

# Find the part files in the specified directory
part_files = find_part_files(directory_path)
part_files = sorted(part_files)

# Print the list of part files
print(f"Part files found: {len(part_files)}")

Part files found: 35


## Merge all files

In [8]:
output_file_path = "merged_data.txt"

if os.path.exists(output_file_path):
    os.remove(output_file_path)
    print(f"Removed existing directory {output_file_path}...")

print("Merging all files...\n")
for file in tqdm(part_files):
    file_path = "/Users/sebastianodarconso/Desktop/università/magistrale_progetti/big_data_final/output_data/"
    file_path = file_path + file
    try:
        data = read_file_and_extract_data(file_path)
    except Exception as e:
        print(f"Error reading file {file_path}: {e}")
    write_data_dict_to_file(data, output_file_path)

Removed existing directory merged_data.txt...
Merging all files...



100%|██████████| 35/35 [00:02<00:00, 13.40it/s]


In [9]:
lines = 0
with open('merged_data.txt', 'r') as file:
    text = file.readlines()
    for line in text:
        lines += 1

print(f"We found {lines} hotels.")

We found 3945 hotels.


In [10]:
def extract_info(line):
    id_hotel = line.split(',', 1)[0].replace('(', '').strip().replace("'", '')
    rating = line.split(',', 2)[1].replace('(', '').strip().replace("'", '')
    line = line.split(',', 2)[2].replace('"', '').strip()
    line = line.replace("'", '').strip()
    reviews = line.split(' ******* ')
    num_rews = len(reviews)	 

    return id_hotel, rating, num_rews, reviews

In [11]:
import math

# Given values
def n_sample_size(N):
    Z = 1.96  # Z-score for 95% confidence
    E = 0.05  # Margin of error (5%)
    p = 0.5   # Expected proportion

    # Sample size formula for finite population
    n = (N * Z**2 * p * (1 - p)) / ((E**2 * (N - 1)) + (Z**2 * p * (1 - p)))
    n = math.ceil(n)  # Always round up to the next whole number

    return n

## Sentiment analysis

In [None]:
# import random 

# line_dict = {}
# review_sentiments = []
# reviewd_hotels = []
# with open('merged_data.txt', 'r') as file:
# 	lines = file.readlines()

# 	for line in tqdm(lines):
# 		id_hotel, rating, num_rews, reviews = extract_info(line)
# 		if num_rews < 1000:
# 			continue
		
# 		reviewd_hotels.append(id_hotel)

# 		sentiments = []
# 		sentiment_scores = []
# 		for review in random.sample(reviews, n_sample_size(num_rews)):
# 			if review == '':
# 				continue
			
# 			sentence = Sentence(review)
# 			classifier.predict(sentence)
			
# 			sentiment = sentence.labels[0].value
# 			sentiment_score = sentence.labels[0].score

# 			sentiments.append(score_to_value[sentiment] * sentiment_score)
# 			sentiment_scores.append(sentiment_score)

# 			review_sentiments.append((review, sentiment, sentiment_score))
		
# 		avg_sentiment = np.mean(sentiments)
# 		avg_sentiment_score = np.mean(sentiment_scores)
		
# 		line_dict[id_hotel] = (	round(avg_sentiment, 4), 
# 						 		round(float(rating), 4), 
# 								round((avg_sentiment/2.0) * 5.0, 4), 
# 								round(avg_sentiment_score, 4), num_rews)
		

# with open('review_sentiments.csv', 'w', newline='') as csvfile:
# 	csvwriter = csv.writer(csvfile)
# 	csvwriter.writerow(['Review', 'Sentiment', 'Sentiment Score'])
# 	for review, sentiment, sentiment_score in review_sentiments:
# 		csvwriter.writerow([review, score_to_value[sentiment], round(sentiment_score, 4)])


In [None]:
# df = pd.DataFrame.from_dict(line_dict, orient='index', columns=['avg_sentiment', 'avg_rating', 'final_score', 'avg_sentiment_score', 'num_reviews'])
# df['hotel_id'] = df.index
# df = df[['hotel_id', 'avg_sentiment', 'avg_rating', 'final_score', 'avg_sentiment_score', 'num_reviews']]
# df = df.sort_values(by=['num_reviews' ,'avg_sentiment', 'avg_sentiment_score', 'avg_rating', 'final_score'], ascending=[False, False, False, False, False])
# #add a new column with num_reviews cliped to 100
# df['num_reviews_clipped'] = df['num_reviews'].clip(0, 100)
# df.to_csv('final_scores.csv', index=False)

In [22]:
df = pd.read_csv('final_scores.csv')
df = df.sort_values(by=['final_score ' ,'avg_sentiment ', 'avg_rating ', 'avg_sentiment_score ', 'num_reviews '], ascending=[False, False, False, False, False])
df.to_csv('final_scores_sorted.csv', index=False)

In [23]:
df = pd.read_csv('final_scores_sorted.csv')

fig = px.scatter(df, x='avg_rating ', y='final_score ', color='avg_rating ', size='num_reviews_clipped', hover_data=['hotel_id ', 'num_reviews '])
fig.update_traces(marker=dict(size=12,
                              line=dict(width=2,
                                        color='DarkSlateGrey')),
                  selector=dict(mode='markers+text'))

fig.update_layout(
    yaxis_title="Final Score",
    xaxis_title="Average Rating",
    title=f"Scatter Plot of Final Score vs Average Rating for Hotels with at least 1000 Reviews",
)
fig.show()

In [None]:
## print the top 5 hotels, with each hotel's average sentiment score, average rating, final score, and average sentiment score
print("Top 5 hotels with at least 1000 reviews:")
df.drop(columns=['num_reviews_clipped']).head(5)

## Sentiment analysis pyspark ML

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

In [None]:
appName="Sentimen Analysis Spark"
spark=SparkSession.builder.master('local').appName(appName).getOrCreate()

In [None]:
## reading the review sentiment csv file
review_csv = spark.read.csv('review_sentiments.csv', header=True, inferSchema=True)
review_csv.show(truncate=True, n=3)

In [None]:
## selecting the review and sentiment columns
data=review_csv.select(
    "Review",col("Sentiment").cast("Int").alias("label"))
data.show(truncate=True, n=5)

In [None]:
## splitting the data into training and testing
split_data = data.randomSplit([0.8, 0.2])
train = split_data[0]

test = split_data[1].withColumnRenamed("label", "true_label")
train_rows = train.count()
test_rows = test.count()

print("Training Rows:", train_rows, " Testing Rows:", test_rows)

In [None]:
## tokenization of the review coloumn, adding and showing the words that influence the sentiment 
tokenizer = Tokenizer(inputCol="Review", outputCol="SentimentWords")
tokenizedTrain = tokenizer.transform(train)
tokenizedTrain.show(truncate=True, n=5)

In [None]:
## removing stopwords, adding and showing the meaningful words
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=True, n=5)

In [None]:
## hashing the meaningful words and showing the features
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrain = hashTF.transform(SwRemovedTrain).select(
    'label', 'MeaningfulWords', 'features')
numericTrain.show(truncate=True, n=3)

In [None]:
## training the model
lr = LogisticRegression(labelCol="label", featuresCol="features", 
                        maxIter=50, regParam=0.01)

# print if a label is Nan or Null
numericTrain = numericTrain.filter(numericTrain.label.isNotNull())

model = lr.fit(numericTrain)
print ("Training Done")

In [None]:
tokenizedTest = tokenizer.transform(test)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest)
numericTest.show(truncate=True, n=2)

In [None]:
#Prediction
raw_prediction = model.transform(numericTest)

In [None]:
Final_prediction = raw_prediction.select("MeaningfulWords", "prediction", "true_label")
Final_prediction.show(n=10, truncate = True)

In [None]:
from pyspark.sql.functions import col
data.groupBy("Label").count().orderBy(col("count").desc()).show()

In [None]:
Total_True=Final_prediction.filter(Final_prediction['prediction']==Final_prediction['true_label']).count()
Alldata=Final_prediction.count()
Accuracy=Total_True/Alldata
print("Accuracy Score is:", Accuracy*100, '%')

In [None]:
# ## remove the output_data, output_data2, location directories, output_data.txt, output_data2.txt, and final_scores.csv files
# shutil.rmtree('output_data')
# os.remove('merged_data.txt')
# os.remove('final_scores.csv')
# os.remove('review_sentiments.csv')
# print("All files and directories have been removed...")