# Video Game Playability Analysis Based on Players’ Reviews with PySpark

## Big Data Computing final project - A.Y. 2022-2023

Prof. Gabriele Tolomei

MSc in Computer Science

La Sapienza, University of Rome

### Author

Ilaria De Sio - [desio.2064970@studenti.uniroma1.it](mailto:desio.2064970@studenti.uniroma1.it)

The project is based on the paper entitled *A Data-Driven Approach for Video Game
Playability Analysis Based on Players’ Reviews* in this case study, the definition of
playability analyzed consists of three basic concepts ”**functionality**, **usability**, and
**gameplay**” defined by the *framework of Paavilainen*.

The goal is to obtain an explicit
and simplified framework so that not only the intuitively quantified assessment of the
overall playability of the chosen game is obtained but also to analyze and be able
to view the positive and negative aspects of it, and while classifying the information
that can be ”playability-informative” and ”non-playability-informative” divided into
the classes listed above.

## Define some global constants

In [1]:
PATH="/Users/ilariadesio/Desktop/Computerscience/Firstyear/Secondsemester/BigData/Projects/Video_Game_Playability_Analysis/input/data_clean.csv"

## Import PySpark packages and other dependencies

In [2]:
!pip install pyspark
!pip install sparknlp



In [3]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

import re

import nltk
from nltk import *

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [4]:
# Create the session
conf = SparkConf().\
                set('spark.ui.port', "4050").\
                set('spark.executor.memory', '4G').\
                set('spark.driver.memory', '45G').\
                set('spark.driver.maxResultSize', '10G').\
                setAppName("ProjectBigData").\
                setMaster("local[*]")

# Create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/06/21 13:00:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1.  Dataset initialization
I chose to use the dataset [https://doi.org/10.6084/m9.figshare.14222531.v1](https://doi.org/10.6084/m9.figshare.14222531.v1) directly provided by the authors of the paper containing the review data from Steam for **No Man’s Sky** in terms of playability by users.
This case of study is really interesting because this game was released on 2016, before which a social media “hype” had been evoked leading to an unprecedentedly high expectation.
Unexpectedly the release was disastrous, but for the last four years, the
game has been continuously maintained with its quality gradually increasing, which makes it a unique case where the changes in game quality is observable.



In [5]:
game_dataset = spark.read.load(PATH,
                               format="csv",
                               sep=",",
                               inferSchema="true",
                               header="true")

                                                                                

In [6]:
game_dataset=pd.read_csv("/Users/ilariadesio/Desktop/Computerscience/Firstyear/Secondsemester/BigData/Projects/Video_Game_Playability_Analysis/input/data_clean.csv")
game_dataset.head()

Unnamed: 0,recommendationid,language,review,timestamp_created,timestamp_updated,voted_up,votes_up,votes_funny,weighted_vote_score,comment_count,steam_purchase,received_for_free,written_during_early_access,author_num_games_owned,author_num_reviews,author_playtime_forever,author_playtime_last_two_weeks,author_last_played
0,70427607,english,This game has the elements of many games sewn ...,2020-06-07 07:33:21,2020-06-07 07:33:21,True,0,0,0.0,0,False,False,False,143,1,14368,1041,2020-06-02 07:05:32
1,70426209,english,game is k. random gen from presets. no voice a...,2020-06-07 06:48:46,2020-06-07 06:48:46,True,0,0,0.0,0,False,False,False,133,5,3927,51,2020-06-02 11:31:12
2,70425814,english,I first played 2 years ago and it was fun for ...,2020-06-07 06:35:34,2020-06-07 06:35:34,True,0,0,0.0,0,True,False,False,670,31,1942,987,2020-06-07 06:21:38
3,70425169,english,I wasn't sure if I'd like this--survival stuff...,2020-06-07 06:13:16,2020-06-07 06:13:16,True,0,0,0.0,0,False,False,False,98,37,121,121,2020-06-07 01:31:17
4,70425032,english,"This is an amazing game, Where to start?\r\nYo...",2020-06-07 06:08:28,2020-06-07 06:08:28,True,0,0,0.0,0,True,False,False,16,6,5887,5887,2020-06-07 07:18:45


## 1.1 Dataset Shape and Scheme

The dataset contains approximately 99k records of Steam's reviews.


* ```recommendationid```: The review ID;
* ```language```: Review language;
* ```review```: The text of user review;
* ```timestamp_created ```: The date a review is posted;
* ```timestamp_updated```: Update date of a review;
* ```voted_up```: True means it was a positive recommendation;
* ```votes_up```: The number of other users who found this review helpful;
* ```votes_funny```: How many other player think the review is funny;
* ```weighted_cote_score```: Helpfulness score;
* ```comment_count```: How many other player comment the review;
* ```steam_purchase```: Game purchased on steam or not;
* ```received_for_free```: Game received for free or not;
* ```written_during_early_access```:
* ```author_num_games_owned```: Number of games owned by the author;
* ```author_num_reviews```: How many other reviews has this user done;
* ```author_playtime_forever```: Number of total hours played by the author;
* ```author_playtime_last_two_weeks```: Number of hours played by the author in the last two weeks;
* ```author_last_played```:

-------





Initially in this more visual phase the dataframe provided by pandas will be used, later in text processing and for the rest of the project it will fall back to the spark dataframe

In [7]:
print(type(game_dataset))

<class 'pandas.core.frame.DataFrame'>


## 2. Data Pre-processing
In this phase involves cleaning and transforming the raw data to ensure its quality and compatibility with the analysis.



## 2.1 Data Cleaning

From the data info above, we can already notice that there are missing values in review. Since our work is going to be heavily relying on this column, we have to clean it from these missing values. In addition, we also need to check for duplicated values following the standard data cleaning procedure.

In [8]:
game_dataset[game_dataset['review'].isna()]


Unnamed: 0,recommendationid,language,review,timestamp_created,timestamp_updated,voted_up,votes_up,votes_funny,weighted_vote_score,comment_count,steam_purchase,received_for_free,written_during_early_access,author_num_games_owned,author_num_reviews,author_playtime_forever,author_playtime_last_two_weeks,author_last_played
5115,66456723,english,,2020-04-02 22:35:41,2020-04-02 22:35:41,True,0,0,0.0,0,True,False,False,29,1,28002,4295,2020-06-07 06:16:39
5191,66346015,english,,2020-04-01 14:17:30,2020-04-01 14:17:30,True,0,0,0.0,0,False,False,False,318,3,9758,0,2020-04-30 03:09:27
5232,66276955,english,,2020-03-31 18:23:38,2020-03-31 18:23:38,True,0,0,0.0,0,True,False,False,91,3,6001,0,2020-03-31 18:45:40
5541,65678049,english,,2020-03-24 03:33:57,2020-03-24 03:33:57,True,0,0,0.0,0,True,False,False,240,5,8316,0,2020-04-26 03:39:39
5636,65449433,english,,2020-03-21 05:37:26,2020-03-21 05:37:26,True,0,0,0.0,0,True,False,False,109,6,601,0,2020-03-21 16:45:18
5865,65060556,english,,2020-03-15 02:27:56,2020-03-15 02:27:56,True,0,0,0.0,0,True,False,False,38,3,1292,74,2020-05-26 20:24:50
6190,64664205,english,,2020-03-07 18:10:21,2020-03-07 18:10:21,True,0,0,0.0,0,False,False,False,33,2,4114,0,2020-03-17 19:50:32
6618,64241196,english,,2020-02-28 12:35:57,2020-02-28 12:35:57,True,1,0,0.525862,0,True,False,False,22,1,3288,46,2020-06-03 23:05:22
6675,64210477,english,,2020-02-27 20:27:52,2020-02-27 20:27:52,True,0,0,0.0,0,True,False,False,58,1,5675,0,2020-04-09 01:00:06
6840,64111598,english,,2020-02-25 19:02:33,2020-02-25 19:02:33,True,0,0,0.0,0,True,False,False,57,11,6720,0,2020-04-20 15:29:47


In [9]:
game_dataset.isna().sum()

recommendationid                   0
language                           0
review                            36
timestamp_created                  0
timestamp_updated                  0
voted_up                           0
votes_up                           0
votes_funny                        0
weighted_vote_score                0
comment_count                      0
steam_purchase                     0
received_for_free                  0
written_during_early_access        0
author_num_games_owned             0
author_num_reviews                 0
author_playtime_forever            0
author_playtime_last_two_weeks     0
author_last_played                 0
dtype: int64

In [10]:
# Drop rows with missing reviews
game_dataset.dropna(inplace=True)

# Sanity check
game_dataset.isna().sum()

recommendationid                  0
language                          0
review                            0
timestamp_created                 0
timestamp_updated                 0
voted_up                          0
votes_up                          0
votes_funny                       0
weighted_vote_score               0
comment_count                     0
steam_purchase                    0
received_for_free                 0
written_during_early_access       0
author_num_games_owned            0
author_num_reviews                0
author_playtime_forever           0
author_playtime_last_two_weeks    0
author_last_played                0
dtype: int64

In [11]:
game_dataset.count()

recommendationid                  99957
language                          99957
review                            99957
timestamp_created                 99957
timestamp_updated                 99957
voted_up                          99957
votes_up                          99957
votes_funny                       99957
weighted_vote_score               99957
comment_count                     99957
steam_purchase                    99957
received_for_free                 99957
written_during_early_access       99957
author_num_games_owned            99957
author_num_reviews                99957
author_playtime_forever           99957
author_playtime_last_two_weeks    99957
author_last_played                99957
dtype: int64

Rows with null values have been deleted correctly, now the rows are 99957.
Now let's check for duplicates.

In [12]:
game_dataset.duplicated().sum()

0

It seems that there are no duplicated rows. But are there duplicated reviews?

In [13]:
game_dataset.duplicated(subset='review').sum()

3903

In [14]:
game_dataset[game_dataset.duplicated(subset='review',keep=False)].sample(10)

Unnamed: 0,recommendationid,language,review,timestamp_created,timestamp_updated,voted_up,votes_up,votes_funny,weighted_vote_score,comment_count,steam_purchase,received_for_free,written_during_early_access,author_num_games_owned,author_num_reviews,author_playtime_forever,author_playtime_last_two_weeks,author_last_played
52209,26126311,english,No.,2016-10-20 08:06:52,2016-10-20 08:06:52,False,7,0,0.0,0,True,False,False,189,4,9832,0,2020-02-25 06:04:30
49222,27763203,english,Lies,2016-11-27 02:31:41,2016-11-27 02:31:41,False,1,0,0.48915,0,True,False,False,831,5,8943,48,2020-06-05 18:39:47
54350,25872674,english,No Man's Lie.,2016-10-05 17:35:07,2016-10-05 17:35:07,False,17,1,0.490232,0,True,False,False,173,1,499,0,2016-08-15 01:54:02
48813,27913543,english,It's getting there,2016-11-28 03:56:40,2016-11-28 03:56:40,True,2,0,0.504201,2,True,False,False,345,7,4322,0,2020-04-08 18:08:53
51744,26242149,english,Just no.,2016-10-27 07:57:24,2016-10-27 07:57:24,False,4,0,0.481752,0,True,False,False,89,1,938,0,2018-05-09 12:48:10
74128,24996583,english,.,2016-08-18 17:42:03,2016-08-18 17:42:03,False,10,1,0.411442,0,True,False,False,30,2,7709,0,2020-04-25 16:16:10
3314,67336813,english,DO IT.,2020-04-15 02:54:39,2020-04-15 02:54:39,True,0,0,0.0,0,True,False,False,61,2,9172,0,2020-05-05 16:58:25
26999,46077908,english,Good,2018-11-21 23:20:09,2018-11-21 23:20:09,True,1,0,0.52381,0,True,False,False,93,2,7109,0,2020-02-25 15:34:23
60210,25274215,english,Terrible,2016-08-31 19:43:26,2016-08-31 19:43:26,False,6,0,0.501744,0,True,False,False,164,3,1551,0,2019-08-25 16:56:48
11088,60573620,english,fun with friends,2019-12-26 05:35:53,2019-12-26 05:35:53,True,1,0,0.5,0,True,False,False,54,19,1570,0,2020-01-19 22:32:11


As we can see there are not actually equal reviews but with similar terms, most of them are very short reviews such as 'good' or 'amazing'. These reviews are still important for our classification task, so we will not drop them.

##Text-processing
We may note that some reviews may also be written only by special characters, these types of reviews should be removed, because there may be smilies or special characters are not significant and also that may have multiple or ambiguous meanings, making accurate interpretation difficult.

In [15]:
spark_df = spark.createDataFrame(game_dataset)
spark_df.select("review").show(n=3,truncate=False)


23/06/21 13:00:34 WARN TaskSetManager: Stage 2 contains a task of very large size (4301 KiB). The maximum recommended task size is 1000 KiB.


[Stage 2:>                                                          (0 + 1) / 1]

23/06/21 13:00:40 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 2 (TID 9): Attempting to kill Python Worker
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

###Convert all the text of the review to lowercase, Tokenization-into-sentences and Removal of extra-spaces and some special characters
In this phase divide each review item from the DataFrame into sentence-level review instances, due to the fact that each review with multiple sentences can contain multiple topics and various sentiments.

In [None]:
from pyspark.sql.functions import lower, regexp_replace, trim
from pyspark.sql.types import StringType

# Convert all the text of the review to lowercase
to_lower = udf(lambda x: x.lower(), StringType())

# Removal of extra-spaces and some special characters
def clean_and_split_sentences(text):
    cleaned_text = re.sub(r'[^\w\s.]', '', text)
    cleaned_text = re.sub(r'\s+', ' ', cleaned_text.strip())
    sentences = sent_tokenize(cleaned_text)
    return sentences

spark_df = spark_df.withColumn('review', to_lower(spark_df['review']))

# Apply the function to the column
clean_and_split_udf = udf(clean_and_split_sentences, ArrayType(StringType()))
spark_df = spark_df.withColumn('sentences', clean_and_split_udf(spark_df['review']))


spark_df.select("review",'sentences').show(n=1,truncate=False)
spark_df.printSchema()

###Sentence Tokenization Vecchia

In [None]:
nltk.download('punkt')
def sent_tokenize(text):
    return nltk.sent_tokenize(text)

# Creating the UDF function for the phrase tokenizer.
sent_tokenize_udf = udf(sent_tokenize, ArrayType(StringType()))

# Applying the phrase tokenizer to the 'review' column of the DataFrame.
spark_df = spark_df.withColumn('sentences', sent_tokenize_udf(spark_df['review']))

columns = spark_df.columns
columns.remove('sentences')
spark_df = spark_df.select(columns[:3] + ['sentences'] + columns[3:])

first_row = spark_df.select('sentences').show(n=5, truncate=False)
print(first_row)


# Prove

In [None]:
from pyspark.sql.functions import lit

# Aggiungi una colonna chiamata 'target' con valori numerici costanti
spark_df = spark_df.withColumn('target', lit(0.0))  # Cambia 0.0 con il valore numerico desiderato

# Mostra il DataFrame risultante
spark_df.show()


In [None]:
train_df,test_df=spark_df.randomSplit([0.7,0.3],seed=42)

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.clustering import KMeans

# Tokenizzazione del testo delle recensioni
tokenizer = Tokenizer(inputCol="review", outputCol="tokenized_review")
spark_df = tokenizer.transform(spark_df)

# Rimozione delle stopwords
stopwords_remover = StopWordsRemover(inputCol="tokenized_review", outputCol="clean_review")
spark_df = stopwords_remover.transform(spark_df)

# Estrazione delle caratteristiche del testo
count_vectorizer = CountVectorizer(inputCol="clean_review", outputCol="features")
model = count_vectorizer.fit(spark_df)
spark_df = model.transform(spark_df)

# Addestramento del modello di clustering
k = 2  # Numero di cluster desiderati
kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=k)
kmeans_model = kmeans.fit(spark_df)

# Previsione dei cluster per il DataFrame originale
predicted_df = kmeans_model.transform(spark_df)

# Visualizza i risultati
predicted_df.select("review", "cluster").show()


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Converti il DataFrame Spark delle previsioni dei cluster in un DataFrame Pandas
predicted_df_pandas = predicted_df.select("review", "cluster").toPandas()

# Conta il numero di recensioni in ciascun cluster
cluster_counts = predicted_df_pandas["cluster"].value_counts()

# Visualizza i cluster con un grafico a barre
plt.figure(figsize=(8, 6))
sns.barplot(x=cluster_counts.index, y=cluster_counts.values)
plt.xlabel("Cluster")
plt.ylabel("Numero di recensioni")
plt.title("Distribuzione dei cluster")
plt.show()


# mantieni prova hypothesis

###Convert all the text of the review to lowercase

In [None]:
def convert_to_lowercase(text):
    return text.lower()


convert_to_lowercase_udf = udf(convert_to_lowercase, StringType())

# Application of the UDF to the 'review' column of the DataFrame
spark_df = spark_df.withColumn('review_lower', convert_to_lowercase_udf(spark_df['review']))

"""
Since it is not possible to directly assign the UDF result to the same input column in Spark (this is due to the fact that Spark's DataFrames are immutable,
which means that they cannot be changed directly),
a new column called lower_reviews will be created and then replaced with the original reviews and deleted.
"""
spark_df = spark_df.withColumn(spark_df.columns[2], col('review_lower'))
spark_df=spark_df.drop('review_lower')

##Removal of extra-spaces

###Removal of non-ASCII characters
For example, the dollar sign ($), accented letters such as à, é, ô, also there are many special symbols such as ☺ (smiley face) and others that are not included in standard ASCII

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

def remove_non_ascii(text):
    # Utilizziamo un'espressione regolare per trovare tutti i caratteri non ASCII
    non_ascii_regex = re.compile('[^\x00-\x7F]')
    # Sostituiamo i caratteri non ASCII con una stringa vuota
    cleaned_text = non_ascii_regex.sub('', text)
    return cleaned_text

# Definizione della funzione UDF (User-Defined Function)
remove_non_ascii_udf = udf(remove_non_ascii, StringType())

# Applicazione della funzione UDF alla colonna 'review'
spark_df = spark_df.withColumn('cleaned_review', remove_non_ascii_udf(spark_df['review']))

spark_df = spark_df.withColumn(spark_df.columns[2], col('cleaned_review'))
spark_df=spark_df.drop('cleaned_review')


first_row = spark_df.select('review').show(n=5, truncate=False)
print(first_row)


### Remove Stop-words (NOT USED by Project Choice)
I initially tried to apply to the removal of stop words, but seeing the results, I noticed that it might result in the loss of some relevant information about sentence structure, so I decided not to use it in this case.

## 2.2 Data Exploration
At this phase I will analyze different hypotheses of correlation tar the variables to actually test whether or not they are correlated according to the hypothesis provided.

### 2.2.1 First Hypothesis
**Does there exist a correlation between the number of hours a person played a game and the sentiment of the review?**

In [None]:
import nltk

nltk.download("vader_lexicon")



In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from nltk.sentiment import SentimentIntensityAnalyzer

# Converte il DataFrame Pandas in un DataFrame PySpark
spark_df = spark.createDataFrame(game_dataset)

# Inizializza il SentimentIntensityAnalyzer di NLTK
sia = SentimentIntensityAnalyzer()

# Definisci la funzione per l'analisi del sentiment
def analyze_sentiment(review):
    # Calcola il sentiment della recensione utilizzando il SentimentIntensityAnalyzer di NLTK
    sentiment = sia.polarity_scores(review)["compound"]

    # Determina se la recensione è positiva o negativa in base al valore del sentiment
    if sentiment > 0:
        return "positive"
    elif sentiment < 0:
        return "negative"
    else:
        return "neutral"

# Registra la funzione come UDF (User Defined Function)
sentiment_udf = udf(analyze_sentiment, StringType())

# Applica la sentiment analysis al DataFrame
classified_df = spark_df.withColumn("sentiment", sentiment_udf(spark_df["review"]))

# Dividi il DataFrame in due DataFrame separati per le recensioni positive e negative
positive_reviews = classified_df.filter(classified_df["sentiment"] == "positive")
negative_reviews = classified_df.filter(classified_df["sentiment"] == "negative")


# Mostra i risultati del DataFrame positive_reviews come DataFrame Pandas
positive_reviews_pandas = positive_reviews.toPandas().head(5)
print("Positive Reviews:")
print(positive_reviews_pandas)

# Mostra i risultati del DataFrame negative_reviews come DataFrame Pandas
negative_reviews_pandas = negative_reviews.toPandas().head(5)
print("Negative Reviews:")
print(negative_reviews_pandas)

#WordCloud DA FARE PER PLAYABILITY INFORMATIVE E NON

In [None]:
from pyspark.sql.functions import explode, split, col
import matplotlib.pyplot as plt
from wordcloud import WordCloud

# Split delle parole all'interno della colonna 'review'
words = spark_df.select(explode(split(col("review"), " ")).alias("word"))
# Rimuovi le parole vuote o con lunghezza inferiore a 3
words = words.filter((col("word") != "") & (length(col("word")) > 2))

word_counts = words.groupBy("word").count()

wordcloud_data = word_counts.rdd.collectAsMap()

wordcloud = WordCloud(width=800, height=400, max_words=200, background_color="white").generate_from_frequencies(wordcloud_data)

# Plot del word cloud
plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.show()


# prova 2

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import PCA
from pyspark.ml.clustering import KMeans

train_df,test_df=spark_df.randomSplit([0.7,0.3],seed=42)

# Fase 2: Estrazione delle features
tokenizer = Tokenizer(inputCol='review', outputCol='words')
test_df = tokenizer.transform(test_df)

remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
test_df = remover.transform(test_df)

vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='features')
vectorizer_model = vectorizer.fit(test_df)
test_df = vectorizer_model.transform(test_df)

# Fase 3: Riduzione della dimensionalità
pca = PCA(k=100, inputCol='features', outputCol='pca_features')
pca_model = pca.fit(test_df)
test_df = pca_model.transform(test_df)

# Fase 4: Clustering
kmeans = KMeans(k=3, seed=42, featuresCol='pca_features', predictionCol='cluster')
kmeans_model = kmeans.fit(test_df)
test_df = kmeans_model.transform(test_df)


# Fase 5: Analisi dei cluster e assegnazione delle etichette
cluster_labels = {
    0: "Alta giocabilità",
    1: "Media giocabilità",
    2: "Bassa giocabilità"
}

# Calcola le statistiche dei cluster
cluster_stats = test_df.groupBy('cluster').count().alias('count')

# Stampa le statistiche dei cluster
cluster_stats.show()

# Assegna le etichette ai cluster
def assign_label(cluster):
    return cluster_labels[cluster]

assign_label_udf = F.udf(assign_label, StringType())
test_df = test_df.withColumn('cluster_label', assign_label_udf('cluster'))



import matplotlib.pyplot as plt
import pandas as pd

# Converti il dataframe Spark in un dataframe Pandas per la visualizzazione
pandas_df = test_df.select('pca_features', 'cluster_label').toPandas()

# Visualizza i cluster utilizzando un grafico a dispersione (scatter plot)
plt.figure(figsize=(8, 6))
plt.scatter(pandas_df['pca_features'].apply(lambda x: x[0]), pandas_df['pca_features'].apply(lambda x: x[1]),
            c=pandas_df['cluster_label'].astype('category').cat.codes, cmap='viridis')
plt.title('Visualizzazione dei cluster')
plt.xlabel('Componente principale 1')
plt.ylabel('Componente principale 2')
plt.show()


In [None]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF
from pyspark.ml.feature import StringIndexer


vectorizer = CountVectorizer(inputCol='sentences',outputCol='rawFeatures')
idf = IDF(inputCol='rawFeatures',outputCol='vectorizedFeatures')

labelEncoder = StringIndexer(inputCol='subject',outputCol='label').fit(spark_df)

#New hypothesis
**Does there exist a relationship between the text of the review and the number of votes up that it receives? Can we predict this value with a Machine Learning model?**

In [16]:

# Calcola il valore minimo e massimo della colonna desiderata
min_value = spark_df.agg(min(spark_df['votes_up'])).collect()[0][0]
max_value = spark_df.agg(max(spark_df['votes_up'])).collect()[0][0]

# Visualizza il valore minimo e massimo
print("Valore minimo:", min_value)
print("Valore massimo:", max_value)


23/06/21 13:01:02 WARN TaskSetManager: Stage 3 contains a task of very large size (4301 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/06/21 13:01:04 WARN TaskSetManager: Stage 6 contains a task of very large size (4301 KiB). The maximum recommended task size is 1000 KiB.
Valore minimo: 0
Valore massimo: 16993


In [17]:
votes_up_for_useful_review = spark_df.approxQuantile("votes_up", [0.99], 0)[0]

23/06/21 13:01:07 WARN TaskSetManager: Stage 9 contains a task of very large size (4301 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [18]:

# Selezionare le colonne 'review' e 'votes_up' e limitare ai primi 5 record
limited_df = spark_df.select('review', 'votes_up').limit(5)

# Convertire il DataFrame Spark in un DataFrame Pandas
pandas_df = limited_df.toPandas()

# Impostare l'opzione per consentire operazioni su frame di dati diversi
spark.conf.set('spark.sql.compute.ops_on_diff_frames', 'true')

# Creare un DataFrame Spark con recensioni utili basate sul valore del quantile
useful_reviews_df = spark_df.filter(col('votes_up') >= votes_up_for_useful_review)

# Campionamento di un sottoinsieme di recensioni non utili basato sulla proporzione
not_useful_reviews_df = spark_df.filter(col('votes_up') < votes_up_for_useful_review).sample(fraction=(useful_reviews_df.count() / spark_df.count()), seed=0)

# Reimpostare l'opzione per le operazioni su frame di dati diversi
spark.conf.unset('spark.sql.compute.ops_on_diff_frames')

# Unire i DataFrame di recensioni utili e non utili
restricted_df = useful_reviews_df.union(not_useful_reviews_df)


23/06/21 13:01:10 WARN TaskSetManager: Stage 11 contains a task of very large size (4301 KiB). The maximum recommended task size is 1000 KiB.
23/06/21 13:01:10 WARN TaskSetManager: Stage 12 contains a task of very large size (4301 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/06/21 13:01:11 WARN TaskSetManager: Stage 15 contains a task of very large size (4301 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [19]:

# Definizione della funzione UDF per assegnare l'etichetta di output
udf_y = udf(lambda x: 0 if x < votes_up_for_useful_review else 1, IntegerType())

# Aggiunta della colonna 'useful' al DataFrame Spark
restricted_df = restricted_df.withColumn('useful', udf_y(col('votes_up')))

# Conversione del DataFrame Spark in un DataFrame Pandas on Spark
restricted_df = restricted_df.limit(5).toPandas()

# Stampa delle colonne 'review', 'votes_up' e 'useful' del DataFrame
print(restricted_df[['review', 'votes_up', 'useful']])




23/06/21 13:01:19 WARN TaskSetManager: Stage 18 contains a task of very large size (4301 KiB). The maximum recommended task size is 1000 KiB.


[Stage 18:>                                                         (0 + 1) / 1]

                                              review  votes_up  useful
0  I've never written a review before for anythin...       167       1
1  ---{Graphics}---\r\n☐ You forget what reality ...       395       1
2  I was so hopeful that this game had turned aro...        60       1
3  I've been playing No Mans Sky again to get the...       104       1
4  [url=https://i.imgur.com/SScxDZS.png] No Man's...        31       1


                                                                                

# Obtain the Bag-Of-Words and predict, using the review text, the votes_up categorical values

In [20]:

# Convertire il DataFrame in un DataFrame Spark
restricted_df = spark.createDataFrame(restricted_df)

# Remove not characters (and \n) in written reviews and transform the text into lowercase
def clean(input):
    import re
    return ''.join(list(filter(lambda ele: re.search("[a-zA-Z\s]+", ele) is not None, list(re.sub('\n', ' ', input.lower())))))

# Creare una funzione UDF per applicare la pulizia del testo
udf_remove_not_characters = udf(lambda x: clean(x), StringType())

# Applicare la pulizia del testo direttamente sulla colonna 'review'
clean_df = restricted_df.withColumn('cleaned_review', udf_remove_not_characters(col('review')))


In [21]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import col, size

# Tokenization - break review text into list of its individual terms (words in this case)
tokenizer = Tokenizer(inputCol='cleaned_review', outputCol='review_words')
wordsData = tokenizer.transform(clean_df)

# Remove review having no words after filtering
wordsData = wordsData.filter(size(col('review_words')) > 0)

# Removing StopWords
remover = StopWordsRemover(inputCol='review_words', outputCol='no_stop_words')
filteredData = remover.transform(wordsData)


23/06/21 13:01:32 WARN StopWordsRemover: Default locale set was [en_IT]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.


In [22]:
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType
import nltk
nltk.download('snowball_data')


spark = SparkSession.builder \
    .appName("ProjectBigData") \
    .getOrCreate()

# Applicazione della stemmatizzazione
stemmer = nltk.stem.SnowballStemmer('english')
udf_stemming = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
stemmed_df = filteredData.withColumn('stemmed_review', udf_stemming(col('no_stop_words')))

# Applicazione di CountVectorizer: converte la lista di token in un vettore di conteggi di token
count = CountVectorizer(inputCol='no_stop_words', outputCol='BoW')
model = count.fit(stemmed_df)
featurizedData = model.transform(stemmed_df)

featurizedData[['review', 'cleaned_review', 'review_words', 'no_stop_words', 'stemmed_review', 'BoW', 'votes_up', 'useful']].limit(5).toPandas()


[nltk_data] Downloading package snowball_data to
[nltk_data]     /Users/ilariadesio/nltk_data...
[nltk_data]   Package snowball_data is already up-to-date!

23/06/21 13:01:52 ERROR Executor: Exception in task 4.0 in stage 19.0 (TID 62)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/pt/3cv5yjcs1rv1hdw6xwxdchz00000gn/T/ipykernel_81464/2365613367.py", line 10, in <lambda>
  File "/var/folders/pt/3cv5yjcs1rv1hdw6xwxdchz00000gn/T/ipykernel_81464/2365613367.py", line 7, in clean
  File "/Users/ilariadesio/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 4915, in filter
    return _invoke_higher_order_function("ArrayFilter", [col], [f])
  File "/Users/ilariadesio/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 4715, in _invoke_higher_order_function
    assert sc is not None and sc._jvm is not None
AssertionError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$ano

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/pt/3cv5yjcs1rv1hdw6xwxdchz00000gn/T/ipykernel_81464/2365613367.py", line 10, in <lambda>
  File "/var/folders/pt/3cv5yjcs1rv1hdw6xwxdchz00000gn/T/ipykernel_81464/2365613367.py", line 7, in clean
  File "/Users/ilariadesio/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 4915, in filter
    return _invoke_higher_order_function("ArrayFilter", [col], [f])
  File "/Users/ilariadesio/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 4715, in _invoke_higher_order_function
    assert sc is not None and sc._jvm is not None
AssertionError
