In [1]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Movie Reviews Analysis") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/25 16:11:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
reviews_df = spark.read.csv(
    "../data/reviews_from_movies.csv",
    header=True,
    inferSchema=True,
    multiLine=True,         # Handles multiline fields
    escape='"',             # Escapes quotes within fields
    quote='"',              # Indicates that quotes are used for enclosing fields
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True
)

# Show the DataFrame schema
reviews_df.printSchema()

# Show a preview of the data
reviews_df.show(2, truncate=False)

root
 |-- movie_title: string (nullable = true)
 |-- movie_year: integer (nullable = true)
 |-- user_url: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- review_date: string (nullable = true)
 |-- movie_url: string (nullable = true)

+-----------+----------+---------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+-----------+-----------------------------------+
|movie_title|movie_year|user_url                               |review_text                                                                                                                                                                                                                                           

In [10]:
reviews_df.select("user_url").show(5)

+--------------------+
|            user_url|
+--------------------+
|https://letterbox...|
|https://letterbox...|
|https://letterbox...|
|https://letterbox...|
|https://letterbox...|
+--------------------+
only showing top 5 rows



In [11]:
from pyspark.sql.functions import col, lower, regexp_replace

# Example: Clean text and create a new column
reviews_df = reviews_df.withColumn("cleaned_review", 
                                   lower(col("review_text"))) \
                       .withColumn("cleaned_review", 
                                   regexp_replace(col("cleaned_review"), "[^a-zA-Z\s]", ""))

# Show the cleaned data
reviews_df.select("review_text", "cleaned_review").show(5)


+--------------------+--------------------+
|         review_text|      cleaned_review|
+--------------------+--------------------+
|Addressed my iner...|addressed my iner...|
|Perfeitamente dec...|perfeitamente dec...|
|  senti a dor do ken|  senti a dor do ken|
|it was good for t...|it was good for t...|
|l'univers la da i...|lunivers la da in...|
+--------------------+--------------------+
only showing top 5 rows



In [12]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.pipeline import Pipeline

# Tokenize the text
tokenizer = Tokenizer(inputCol="cleaned_review", outputCol="words")

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Compute term frequencies
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=5000)

# Compute inverse document frequencies
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Build the pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])

# Fit and transform the data
reviews_df = pipeline.fit(reviews_df).transform(reviews_df)

# Show the processed features
reviews_df.select("features").show(5)

                                                                                

+--------------------+
|            features|
+--------------------+
|(5000,[1029,1720,...|
|(5000,[82,154,221...|
|(5000,[154,2208,3...|
|(5000,[366,453,48...|
|(5000,[30,98,480,...|
+--------------------+
only showing top 5 rows



In [14]:
reviews_df.printSchema()

root
 |-- movie_title: string (nullable = true)
 |-- movie_year: integer (nullable = true)
 |-- user_url: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- review_date: string (nullable = true)
 |-- movie_url: string (nullable = true)
 |-- cleaned_review: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)



In [21]:
from pyspark.sql.functions import when, col

# 1. Categorize ratings into 'bad', 'neutral', 'good'
reviews_df = reviews_df.withColumn(
    "rating_category",
    when(col("rating") <= 0.4, "bad")
    .when((col("rating") > 0.4) & (col("rating") <= 0.7), "neutral")
    .when(col("rating") > 0.7, "good")
)

# 2. Encode these categories into numeric labels
indexer = StringIndexer(inputCol="rating_category", outputCol="rating_category_encoded")
reviews_df = indexer.fit(reviews_df).transform(reviews_df)

# 3. Verify the creation of the encoded column
reviews_df.select("rating", "rating_category", "rating_category_encoded").show(5)

+------------------+---------------+-----------------------+
|            rating|rating_category|rating_category_encoded|
+------------------+---------------+-----------------------+
|0.6000000000000001|        neutral|                    1.0|
|0.7000000000000001|           good|                    0.0|
|               1.0|           good|                    0.0|
|0.7000000000000001|           good|                    0.0|
|0.6000000000000001|        neutral|                    1.0|
+------------------+---------------+-----------------------+
only showing top 5 rows



In [22]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer

# Convert labels to index (if necessary)
indexer = StringIndexer(inputCol="rating_category_encoded", outputCol="label")
reviews_df = indexer.fit(reviews_df).transform(reviews_df)

# Split data
train, test = reviews_df.randomSplit([0.8, 0.2], seed=12345)

# Initialize and train the model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train)

# Predict on the test set
predictions = model.transform(test)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

24/08/25 17:00:58 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Accuracy: 0.36363636363636365


In [23]:
reviews_df

DataFrame[movie_title: string, movie_year: int, user_url: string, review_text: string, rating: double, review_date: string, movie_url: string, cleaned_review: string, words: array<string>, filtered: array<string>, rawFeatures: vector, features: vector, rating_category: string, rating_category_encoded: double, label: double]