In [1]:
import pandas as pd
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, regexp_replace, sum, when, count, round as sparkrnd, max as sparkmax, min as sparkmin, year, avg as sparkavg, stddev as sparkstd
from pyspark.sql.functions import regexp_replace
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

In [2]:
spark = SparkSession.builder.getOrCreate()

24/06/25 09:11:10 WARN Utils: Your hostname, dsbda-vm resolves to a loopback address: 127.0.1.1; using 192.168.64.2 instead (on interface enp0s1)
24/06/25 09:11:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/25 09:11:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
#Preparing tables for spark join

file_path_listings = "/home/ubuntu/DATASET/listings_details.csv"
file_path_reviews = "/home/ubuntu/DATASET/reviews_details.csv"


df_listings = spark.read.csv(file_path_listings, header=True, inferSchema=True, sep=",", quote= '"', escape= '"', multiLine=True, mode="DROPMALFORMED")
df_cleaned_listings = df_listings.withColumn("price", regexp_replace(col("price"), r"[\$,]", "").cast("double"))
for col_name in df_cleaned_listings.columns:
        df_cleaned_listings = df_cleaned_listings.withColumn(col_name, regexp_replace(col(col_name), "[^a-zA-Z0-9 .]", ""))
df_filtered_listings = df_cleaned_listings.filter( col("id").cast(IntegerType()).isNotNull()
)

#df_filtered_listings.printSchema()
df_filtered_listings = df_filtered_listings.select("id", "name", "host_name", "neighbourhood_cleansed", "latitude", "longitude", 
                                 "property_type", "room_type", "price", "review_scores_rating", "review_scores_accuracy", 
                                 "review_scores_cleanliness", "review_scores_checkin", "review_scores_communication", 
                                 "review_scores_location", "review_scores_value", "reviews_per_month")



schema_rev = StructType([
    StructField("listing_id", StringType(), True),
    StructField("id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("reviewer_id", StringType(), True),
    StructField("reviewer_name", StringType(), True),
    StructField("comments", StringType(), True)
])



df_reviews = spark.read.csv(file_path_reviews, header=True, schema=schema_rev, sep=",", quote= '"', escape= '"', multiLine=True, mode="DROPMALFORMED")
for col_name in df_reviews.columns:
        if col_name == "date":
            df_reviews= df_reviews.withColumn(col_name, regexp_replace(col(col_name), "[^0-9 -]", ""))
        else:
            df_reviews = df_reviews.withColumn(col_name, regexp_replace(col(col_name), "[^a-zA-Z0-9 .]", ""))
df_reviews = df_reviews.filter(col("listing_id").cast("int").isNotNull())

#df_reviews.printSchema()

#I have to limit the output because on spark it's costly!
#df_reviews.show(20)

#df_reviews.select("listing_id", "date", "comments").show()
df_reviews = df_reviews.select("listing_id", "date", "comments")

In [35]:
remapped_schema = {"id":"Listing_ID", "name":"Listing_name", "host_name":"Hostname", "neighbourhood_cleansed":"Neighborhood",
                   "latitude":"Latitude", "longitude":"Longitude", "property_type":"PropertyType", "room_type":"RoomType",
                   "price":"Price", "review_scores_rating":"ReviewScoresRating", "review_scores_accuracy":"ReviewScoresAccuracy",
                   "review_scores_cleanliness":"ReviewScoresCleanliness", "review_scores_checkin":"ReviewScoresCheckin",
                   "review_scores_communication":"ReviewScoresCommunication", "review_scores_location":"ReviewScoresLocation",
                   "review_scores_value":"ReviewScoresValue", "reviews_per_month":"ReviewPerMonth", "date":"Date", "comments":"Comment"}

joined_df = df_filtered_listings.join(df_reviews, df_filtered_listings.id == df_reviews.listing_id, 'inner').drop(df_reviews.listing_id)

for old_name, new_col in remapped_schema.items():
    joined_df = joined_df.withColumnRenamed(old_name, new_col)


joined_df = joined_df.filter(col("Listing_ID").isNotNull()).filter(col("Comment").isNotNull())
joined_df.printSchema()

num_rows = joined_df.count()

# Stampa il numero di righe
print(f"spark dataframe length: {num_rows}")
#joined_df.show(5)



root
 |-- Listing_ID: string (nullable = true)
 |-- Listing_name: string (nullable = true)
 |-- Hostname: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- PropertyType: string (nullable = true)
 |-- RoomType: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- ReviewScoresRating: string (nullable = true)
 |-- ReviewScoresAccuracy: string (nullable = true)
 |-- ReviewScoresCleanliness: string (nullable = true)
 |-- ReviewScoresCheckin: string (nullable = true)
 |-- ReviewScoresCommunication: string (nullable = true)
 |-- ReviewScoresLocation: string (nullable = true)
 |-- ReviewScoresValue: string (nullable = true)
 |-- ReviewPerMonth: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Comment: string (nullable = true)



[Stage 38:>                                                         (0 + 1) / 1]

spark dataframe length: 431311


                                                                                

In [36]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, StringType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

In [37]:
def analyze_sentiment(text):
    scores = analyzer.polarity_scores(text)
    return scores['compound']

In [38]:
#Score computation of the reviews

analyzer = SentimentIntensityAnalyzer()
sentiment_udf = udf(analyze_sentiment, FloatType()) #create a function
sdf_with_score = joined_df.withColumn("SentimentScore", sentiment_udf(joined_df["Comment"]))
sdf_with_score = sdf_with_score.dropna(subset=["SentimentScore"])
num_rows = sdf_with_score.count()

# Stampa il numero di righe
print(f"spark dataframe length: {num_rows}")
#Limit for the print since I'm working with spark df
#sdf_with_score.show(5)

[Stage 42:>                                                         (0 + 1) / 1]

spark dataframe length: 431311


                                                                                

In [39]:
bins = [-1, -0.75, -0.25, 0.25, 0.75, 1]
labels = [1, 2, 3, 4, 5]

In [40]:
sdf_with_score = sdf_with_score.withColumn(
    "BinSentimentScore",
    when(col("SentimentScore") <= bins[1], labels[0])
    .when((col("SentimentScore") > bins[1]) & (col("SentimentScore") <= bins[2]), labels[1])
    .when((col("SentimentScore") > bins[2]) & (col("SentimentScore") <= bins[3]), labels[2])
    .when((col("SentimentScore") > bins[3]) & (col("SentimentScore") <= bins[4]), labels[3])
    .otherwise(labels[4])
)

In [41]:
sdf_with_score.printSchema()

root
 |-- Listing_ID: string (nullable = true)
 |-- Listing_name: string (nullable = true)
 |-- Hostname: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- PropertyType: string (nullable = true)
 |-- RoomType: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- ReviewScoresRating: string (nullable = true)
 |-- ReviewScoresAccuracy: string (nullable = true)
 |-- ReviewScoresCleanliness: string (nullable = true)
 |-- ReviewScoresCheckin: string (nullable = true)
 |-- ReviewScoresCommunication: string (nullable = true)
 |-- ReviewScoresLocation: string (nullable = true)
 |-- ReviewScoresValue: string (nullable = true)
 |-- ReviewPerMonth: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Comment: string (nullable = true)
 |-- SentimentScore: float (nullable = true)
 |-- BinSentimentScore: integer (nullable = false)



In [42]:
sdf_with_score = sdf_with_score.dropna(subset=["Comment", "Date", "BinSentimentScore"])

In [43]:
sdf_with_score.show(10)

[Stage 50:>                                                         (0 + 1) / 1]

+----------+--------------------+--------+------------+-----------------+----------------+------------+--------------+-----+------------------+--------------------+-----------------------+-------------------+-------------------------+--------------------+-----------------+--------------+----------+--------------------+--------------+-----------------+
|Listing_ID|        Listing_name|Hostname|Neighborhood|         Latitude|       Longitude|PropertyType|      RoomType|Price|ReviewScoresRating|ReviewScoresAccuracy|ReviewScoresCleanliness|ReviewScoresCheckin|ReviewScoresCommunication|ReviewScoresLocation|ReviewScoresValue|ReviewPerMonth|      Date|             Comment|SentimentScore|BinSentimentScore|
+----------+--------------------+--------+------------+-----------------+----------------+------------+--------------+-----+------------------+--------------------+-----------------------+-------------------+-------------------------+--------------------+-----------------+--------------+----

                                                                                

In [44]:
sdf_with_year = sdf_with_score.withColumn("Year", year(sdf_with_score["date"]))

In [45]:
sdf_with_year.head(1)

                                                                                

[Row(Listing_ID='588116', Listing_name='Sweet Amsterdam ap. with garden...', Hostname='Fanneke', Neighborhood='OudOost', Latitude='52.35045235319899', Longitude='4.91614014872195', PropertyType='Apartment', RoomType='Entire homeapt', Price='90.0', ReviewScoresRating='96', ReviewScoresAccuracy='10', ReviewScoresCleanliness='10', ReviewScoresCheckin='10', ReviewScoresCommunication='10', ReviewScoresLocation='9', ReviewScoresValue='10', ReviewPerMonth='0.57', Date='2013-08-10', Comment='I had a great stay in Fannekes appartment. Its very well located  in a quiet street and next to a canal  and you kind of get an insight in a normalauthentic neighbourhood without all the touriststuff but youre very close to everywhere by bike. The place itself is spacious perfect for 2 people lovely decorated you got everything you need and on top of that a beautiful garden and a cute little cat. I felt very comfortable there from the day I arrived. I met Fanneke just for half an hour because we both were 

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, year, when, lit
import pandas as pd
from scipy.stats import mannwhitneyu

In [47]:
def mannwhitneyu_test(before_scores, after_scores):
    stat, p_val = mannwhitneyu(before_scores, after_scores, alternative='two-sided')
    return float(p_val)

def analyze_difference(before_scores, after_scores, datetime1, datetime2, year, alpha=0.05):
    len_before = len(before_scores)
    len_after = len(after_scores)
    
    if len_before > 0 and len_after > 0:
        stat, p_val = mannwhitneyu(before_scores, after_scores, alternative='two-sided')
        
        print(f"Comparison: {datetime1} vs {datetime2}")
        print(f"P-value: {p_val}")
        
        if p_val < alpha:
            print("Significative difference between the sentiment scores before and after (Reject the null hypothesis).")
        else:
            print("Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).")
            

In [48]:
inner = ["CentrumWest", "CentrumOost", "Westerpark", "De Baarsjes  OudWest", 
         "Oostelijk Havengebied  Indische Buurt", "OudOost", 
         "De Pijp  Rivierenbuurt"]



alpha = 0.05

for year_val in range(2010, 2018):

    date_str_prev = f"{year_val}-06-01"
    date_str_next = f"{year_val+1}-06-01"

    date_prev = pd.to_datetime(date_str_prev)
    date_next = pd.to_datetime(date_str_next)

    next_year = year_val+1

    df_filtered = sdf_with_year.filter(~col("Neighborhood").isin(inner))
    df_out_center_prev = df_filtered.filter(col("Year") == year_val)
    df_out_center_next = df_filtered.filter(col("Year") == year_val + 1)

    sentiment_scores_outer_before_prev = df_out_center_prev.filter(col("Date") < lit(date_str_prev)).select("SentimentScore")
    sentiment_scores_outer_after_prev = df_out_center_prev.filter(col("Date") >= lit(date_str_prev)).select("SentimentScore")
    sentiment_scores_outer_before_next = df_out_center_next.filter(col("Date") < lit(date_str_next)).select("SentimentScore")
    sentiment_scores_outer_after_next = df_out_center_next.filter(col("Date") >= lit(date_str_next)).select("SentimentScore")

    #Since on spark works on spark dataframes I tried to use the mann-whitney U test, but it wasn't available :c -> sorry :C, but the function was not available in DataFrameStatFunctions
    before_prev_list = [row.SentimentScore for row in sentiment_scores_outer_before_prev.collect()]
    after_prev_list = [row.SentimentScore for row in sentiment_scores_outer_after_prev.collect()]
    before_next_list = [row.SentimentScore for row in sentiment_scores_outer_before_next.collect()]
    after_next_list = [row.SentimentScore for row in sentiment_scores_outer_after_next.collect()]

    #Differences between current year and next year
    analyze_difference(before_prev_list, after_prev_list, str(year_val) + " first semester", str(next_year) + " first semester", year_val, alpha)
    analyze_difference(before_next_list, after_next_list, str(year_val) + " second semester", str(next_year) + " second semester", year_val, alpha)

    #Cross analysis between the two semesters of the same year
    if len(before_prev_list) > 0 and len(before_next_list) > 0:
        analyze_difference(before_prev_list, before_next_list, str(year_val) + " first semester", str(year_val) + " second semester", year_val, alpha)


                                                                                

Comparison: 2010 first semester vs 2011 first semester
P-value: 0.875
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).
Comparison: 2010 second semester vs 2011 second semester
P-value: 0.3176287929810796
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).
Comparison: 2010 first semester vs 2010 second semester
P-value: 0.7450173057662387
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).


                                                                                

Comparison: 2011 first semester vs 2012 first semester
P-value: 0.3176287929810796
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).
Comparison: 2011 second semester vs 2012 second semester
P-value: 0.759175444693613
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).
Comparison: 2011 first semester vs 2011 second semester
P-value: 0.961419656926375
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).


                                                                                

Comparison: 2012 first semester vs 2013 first semester
P-value: 0.759175444693613
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).
Comparison: 2012 second semester vs 2013 second semester
P-value: 0.6354308490354297
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).
Comparison: 2012 first semester vs 2012 second semester
P-value: 0.9166001150990198
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).


                                                                                

Comparison: 2013 first semester vs 2014 first semester
P-value: 0.6354308490354297
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).
Comparison: 2013 second semester vs 2014 second semester
P-value: 2.044540868821777e-06
Significative difference between the sentiment scores before and after (Reject the null hypothesis).
Comparison: 2013 first semester vs 2013 second semester
P-value: 0.3152465101101586
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).


                                                                                

Comparison: 2014 first semester vs 2015 first semester
P-value: 2.044540868821777e-06
Significative difference between the sentiment scores before and after (Reject the null hypothesis).
Comparison: 2014 second semester vs 2015 second semester
P-value: 0.0004467794465107559
Significative difference between the sentiment scores before and after (Reject the null hypothesis).
Comparison: 2014 first semester vs 2014 second semester
P-value: 1.2260366170514692e-15
Significative difference between the sentiment scores before and after (Reject the null hypothesis).


                                                                                

Comparison: 2015 first semester vs 2016 first semester
P-value: 0.0004467794465107559
Significative difference between the sentiment scores before and after (Reject the null hypothesis).
Comparison: 2015 second semester vs 2016 second semester
P-value: 8.411176590343428e-12
Significative difference between the sentiment scores before and after (Reject the null hypothesis).
Comparison: 2015 first semester vs 2015 second semester
P-value: 0.58776804427465
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).


                                                                                

Comparison: 2016 first semester vs 2017 first semester
P-value: 8.411176590343428e-12
Significative difference between the sentiment scores before and after (Reject the null hypothesis).
Comparison: 2016 second semester vs 2017 second semester
P-value: 0.3125231974366046
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).
Comparison: 2016 first semester vs 2016 second semester
P-value: 2.420560929498966e-55
Significative difference between the sentiment scores before and after (Reject the null hypothesis).


[Stage 120:>                                                        (0 + 1) / 1]

Comparison: 2017 first semester vs 2018 first semester
P-value: 0.3125231974366046
Non-Significative difference between the sentiment scores before and after (Accept the null hypothesis).
Comparison: 2017 second semester vs 2018 second semester
P-value: 6.707857903983934e-25
Significative difference between the sentiment scores before and after (Reject the null hypothesis).
Comparison: 2017 first semester vs 2017 second semester
P-value: 2.9185595992502613e-16
Significative difference between the sentiment scores before and after (Reject the null hypothesis).


                                                                                

In [109]:
spark.stop()