In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark import SparkConf

In [2]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF 
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [3]:
conf = SparkConf().setMaster("local[*]").set("spark.executer.memory", "2g")

sc = SparkContext(conf=conf)
spark = SparkSession(sc).builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/21 21:34:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


###  Loading Reviews Data

In [4]:
df = spark.read.format("csv").option("header", "true").option("multiline","true").load("yelp-dataset-001/yelp_review.csv")
df.printSchema()
df.show(1)


root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: string (nullable = true)
 |-- funny: string (nullable = true)
 |-- cool: string (nullable = true)

+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+
|           review_id|             user_id|         business_id|stars|      date|                text|useful|funny|cool|
+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+
|vkVSCC7xljjrAI4UG...|bv2nCi5Qv5vroFiqK...|AEx2SYEUJmTxVVB18...|    5|2016-05-28|Super simple plac...|     0|    0|   0|
+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+
only showing top 1 row



In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, regexp_replace, split, col, desc, explode, when, array
from pyspark.ml.feature import StopWordsRemover

def clean_text(df, column_name, clean_column_name):
    # Ensure no null values at input
    df = df.withColumn(clean_column_name, when(col(column_name).isNull(), "").otherwise(col(column_name)))
    # Lowercase all letters
    df = df.withColumn(clean_column_name, lower(col(clean_column_name)))
    # Replace hyphens, remove periods and apostrophes between letters
    df = df.withColumn(clean_column_name, regexp_replace(col(clean_column_name), r'([a-z])-([a-z])', r'\1 \2'))
    df = df.withColumn(clean_column_name, regexp_replace(col(clean_column_name), r'(\w)\.(\w)', r'\1\2'))
    df = df.withColumn(clean_column_name, regexp_replace(col(clean_column_name), r'(\w)\'(\w)', r'\1\2'))
    return df

def remove_punctuation_in_dataframe_column(df, column_name):
    return df.withColumn(column_name, regexp_replace(col(column_name), r'[!"#$%&\'()*+,\-./:;<=>?@[\\]^_`{|}~]', ''))

def remove_stop_words(df, column_name):
    remover = StopWordsRemover(inputCol=column_name, outputCol="filtered_text")
    df = remover.transform(df)
    # Handle potential null values in the output column
    df = df.withColumn(column_name, when(col("filtered_text").isNull(), array()).otherwise(col("filtered_text")))
    return df.drop("filtered_text").withColumnRenamed(column_name, column_name)

def find_top_words(df, column_name):
    df_exploded = df.select(explode(col(column_name)).alias("word"))
    df_word_count = df_exploded.groupBy("word").count()
    top_words = df_word_count.orderBy(desc("count")).limit(10)
    return top_words

# Example usage:
spark = SparkSession.builder.appName("Text Processing Example").getOrCreate()
# Suppose df is your DataFrame already loaded with data
df = clean_text(df, "text", "clean_text_column")
df = remove_punctuation_in_dataframe_column(df, "clean_text_column")
df = df.withColumn("clean_text_column", split(col("clean_text_column"), "\s+"))  # Tokenize text into words
df = remove_stop_words(df, "clean_text_column")
#top_words_df = find_top_words(df, "clean_text_column")
#top_words_df.show()


### Import Cleaned Users Data

In [7]:
dfu = spark.read.format("csv").option("header", "true").option("multiline","true").load("clean_users.csv")


In [8]:
dfu.show()

+--------------------+-------+------------+-------------+--------------------+------+-----+----+----+-----+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+-----------------+-----------------+----------+
|             user_id|   name|review_count|yelping_since|             friends|useful|funny|cool|fans|elite|average_stars|compliment_hot|compliment_more|compliment_profile|compliment_cute|compliment_list|compliment_note|compliment_plain|compliment_cool|compliment_funny|compliment_writer|compliment_photos|elite_flag|
+--------------------+-------+------------+-------------+--------------------+------+-----+----+----+-----+-------------+--------------+---------------+------------------+---------------+---------------+---------------+----------------+---------------+----------------+-----------------+-----------------+----------+
|JJ-aSuM4pCFPdkfoZ...|  Chris|          10|   201

### Data Analysis - Elite vs Non - Elite

In [9]:
from pyspark.sql import functions as F

# Filter elite and non-elite users
elite_users = dfu.filter(dfu.elite_flag == 1)
non_elite_users = dfu.filter(dfu.elite_flag == 0)

# Compute summary statistics for review count
elite_review_count_stats = elite_users.select(F.mean("review_count").alias("avg_review_count_elite"),
                                              F.min("review_count").alias("min_review_count_elite"),
                                              F.max("review_count").alias("max_review_count_elite"))

non_elite_review_count_stats = non_elite_users.select(F.mean("review_count").alias("avg_review_count_non_elite"),
                                                      F.min("review_count").alias("min_review_count_non_elite"),
                                                      F.max("review_count").alias("max_review_count_non_elite"))

# Show the summary statistics
elite_review_count_stats.show()
non_elite_review_count_stats.show()


                                                                                

+----------------------+----------------------+----------------------+
|avg_review_count_elite|min_review_count_elite|max_review_count_elite|
+----------------------+----------------------+----------------------+
|    226.87628662567002|                     1|                   999|
+----------------------+----------------------+----------------------+



[Stage 7:>                                                          (0 + 1) / 1]

+--------------------------+--------------------------+--------------------------+
|avg_review_count_non_elite|min_review_count_non_elite|max_review_count_non_elite|
+--------------------------+--------------------------+--------------------------+
|        13.323133499093483|                         0|                       996|
+--------------------------+--------------------------+--------------------------+



                                                                                

In [21]:
from pyspark.sql import functions as F

# Filter elite and non-elite users
elite_users = dfu.filter(dfu.elite_flag == 1)
non_elite_users = dfu.filter(dfu.elite_flag == 0)

# Compute summary statistics for average stars
elite_avg_stars_stats = elite_users.select(F.mean("average_stars").alias("avg_avg_stars_elite"),
                                            F.min("average_stars").alias("min_avg_stars_elite"),
                                            F.max("average_stars").alias("max_avg_stars_elite"))

non_elite_avg_stars_stats = non_elite_users.select(F.mean("average_stars").alias("avg_avg_stars_non_elite"),
                                                    F.min("average_stars").alias("min_avg_stars_non_elite"),
                                                    F.max("average_stars").alias("max_avg_stars_non_elite"))

# Show the summary statistics
elite_avg_stars_stats.show()
non_elite_avg_stars_stats.show()


                                                                                

+-------------------+-------------------+-------------------+
|avg_avg_stars_elite|min_avg_stars_elite|max_avg_stars_elite|
+-------------------+-------------------+-------------------+
|  3.847253937978893|               2.13|                5.0|
+-------------------+-------------------+-------------------+



[Stage 18:>                                                         (0 + 1) / 1]

+-----------------------+-----------------------+-----------------------+
|avg_avg_stars_non_elite|min_avg_stars_non_elite|max_avg_stars_non_elite|
+-----------------------+-----------------------+-----------------------+
|      3.704283740697875|                    1.0|                    5.0|
+-----------------------+-----------------------+-----------------------+



                                                                                

In [23]:
from pyspark.sql import functions as F

# Filter elite and non-elite users
elite_users = dfu.filter(dfu.elite_flag == 1)
non_elite_users = dfu.filter(dfu.elite_flag == 0)

# Compute mean values for each column
elite_avg_stats = elite_users.select(F.mean("average_stars").alias("avg_avg_stars_elite"),
                                     F.mean("review_count").alias("avg_review_count_elite"),
                                     F.mean("useful").alias("avg_useful_reviews_elite"))

non_elite_avg_stats = non_elite_users.select(F.mean("average_stars").alias("avg_avg_stars_non_elite"),
                                             F.mean("review_count").alias("avg_review_count_non_elite"),
                                             F.mean("useful").alias("avg_useful_reviews_non_elite"))

# Show the mean values
elite_avg_stats.show()
non_elite_avg_stats.show()


                                                                                

+-------------------+----------------------+------------------------+
|avg_avg_stars_elite|avg_review_count_elite|avg_useful_reviews_elite|
+-------------------+----------------------+------------------------+
|  3.847253937978893|    226.87628662567002|       476.1755894636456|
+-------------------+----------------------+------------------------+



[Stage 24:>                                                         (0 + 1) / 1]

+-----------------------+--------------------------+----------------------------+
|avg_avg_stars_non_elite|avg_review_count_non_elite|avg_useful_reviews_non_elite|
+-----------------------+--------------------------+----------------------------+
|      3.704283740697875|        13.323133499093483|          7.8670596752344535|
+-----------------------+--------------------------+----------------------------+



                                                                                

### Joining Reviews and Users Data

In [10]:
# Selecting only the user_id and elite_flag columns from dfu
dfu_selected = dfu.select("user_id", "elite_flag")

# Performing the join with selected columns
joined_df = df.join(dfu_selected, on="user_id", how="left")



In [11]:
# Show the resulting DataFrame
joined_df.show()


                                                                                

+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+--------------------+----------+
|             user_id|           review_id|         business_id|stars|      date|                text|useful|funny|cool|   clean_text_column|elite_flag|
+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+--------------------+----------+
|u0LXt3Uea_GidxRW1...|ymAUG8DZfQcFTBSOi...|9_CGhHMz8698M9-Pk...|    4|2012-05-11|Who would have gu...|     0|    0|   2|[guess, able, get...|         1|
|u0LXt3Uea_GidxRW1...|8UIishPUD92hXtScS...|gkCorLgPyQLsptTHa...|    4|2015-10-27|Always drove past...|     1|    0|   0|[always, drove, p...|         1|
|u0LXt3Uea_GidxRW1...|w41ZS9shepfO3uEyh...|5r6-G9C4YLbC7Ziz5...|    3|2013-02-09|Not bad!! Love th...|     1|    0|   0|[bad, love, glute...|         1|
|u0LXt3Uea_GidxRW1...|WF_QTN3p-thD74hqp...|fDF_o2JPU8BR1Gya-...|    5|2016-04-06|L

# Sentiment Analysis Polarity and Subjectivity 

In [12]:
# First, ensure you've installed textblob; you might need to run: !pip install textblob
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, concat_ws, col
from pyspark.sql.types import StringType, DoubleType
from textblob import TextBlob



# Define UDF to calculate polarity
def get_polarity(text):
    return TextBlob(text).sentiment.polarity

# Define UDF to calculate subjectivity
def get_subjectivity(text):
    return TextBlob(text).sentiment.subjectivity

# Register UDFs
polarity_udf = udf(get_polarity, DoubleType())
subjectivity_udf = udf(get_subjectivity, DoubleType())

# Assuming 'clean_text_column' needs to be converted from array to string if not already
joined_df = joined_df.withColumn("clean_text_column", concat_ws(", ", col("clean_text_column")))

# Apply the UDFs
joined_df = joined_df.withColumn("polarity", polarity_udf("clean_text_column"))
joined_df = joined_df.withColumn("subjectivity", subjectivity_udf("clean_text_column"))

# Show results
joined_df.show()





+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+--------------------+----------+--------------------+-------------------+
|             user_id|           review_id|         business_id|stars|      date|                text|useful|funny|cool|   clean_text_column|elite_flag|            polarity|       subjectivity|
+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+--------------------+----------+--------------------+-------------------+
|u0LXt3Uea_GidxRW1...|ymAUG8DZfQcFTBSOi...|9_CGhHMz8698M9-Pk...|    4|2012-05-11|Who would have gu...|     0|    0|   2|guess, able, get,...|         1|0.011904761904761894| 0.4886904761904761|
|u0LXt3Uea_GidxRW1...|8UIishPUD92hXtScS...|gkCorLgPyQLsptTHa...|    4|2015-10-27|Always drove past...|     1|    0|   0|always, drove, pa...|         1| 0.19285714285714287| 0.5714285714285714|
|u0LXt3Uea_GidxRW1...|w41ZS9sh

                                                                                

In [18]:
small_group_df = joined_df.filter(col("elite_flag") == 1)  
large_group_df = joined_df.filter(col("elite_flag") == 0)



In [21]:
# Calculate the average polarity and subjectivity
average_sentiment_small = small_group_df.agg(
    avg("polarity").alias("avg_polarity"),
    avg("subjectivity").alias("avg_subjectivity")
)


In [22]:
# Show the results
average_sentiment_small.show()



+-------------------+------------------+
|       avg_polarity|  avg_subjectivity|
+-------------------+------------------+
|0.22608805795202244|0.5457307004258998|
+-------------------+------------------+



                                                                                

In [23]:
# Calculate average polarity and subjectivity by elite status
average_sentiment_large = large_group_df.agg(
    avg("polarity").alias("avg_polarity"),
    avg("subjectivity").alias("avg_subjectivity")
)
# Show the results
average_sentiment_large.show()




+------------------+------------------+
|      avg_polarity|  avg_subjectivity|
+------------------+------------------+
|0.2456547673315709|0.5568185331539073|
+------------------+------------------+



24/04/22 02:00:56 ERROR Inbox: Ignoring error                                   
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apach