# Assignment: Scalable Processing
## Yelp Reviews and Authenticity

Large Scale Data Analysis | by Petya Petrova |pety@itu.dk | 27-02-2024

## Connecting to the Spark Cluster job using the two JobParameters.json

To connect this jupyter notebook with your Spark cluster, we need to tell jupyter how it can access the spark cluster. Below code accomplishes that. 

In [None]:
#####################################################################
#Connecting to the Spark Cluster job using the two JobParameters.json
#####################################################################
    
# Only execute this cell once.
if '_EXECUTED_' in globals():
    # check if variable '_EXECUTED_' exists in the global variable namespace
    print("Already been executed once, not running again!")
else:
    print("Cell has not been executed before, running...")
    import os, json, pyspark
    from pyspark.conf import SparkConf
    from pyspark.sql import SparkSession, functions as F

    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster. 

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open('/work/JobParameters.json', 'r') as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS) 
        for resource in JUPYTER_LAB_JOB_PARAMS['request']['resources']:
            if 'hostname' in resource.keys():
                MASTER_HOST_NAME = resource['hostname']

    MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

    conf = SparkConf().setAll([
            ("spark.app.name", 'reading_job_params_app'), 
            ("spark.master", MASTER_HOST),
        ])
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()

    CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline","true").json('/work/JobParameters.json')

    # Extract cluster info from the specific JobParameters.json
    NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
    CPUS_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
    MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.memoryInGigs").first()[0]

    CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
    CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES 
    
    if CPUS_PER_NODE > 1:
        EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
    else:
        EXECUTOR_CORES = CPUS_PER_NODE 

    EXECUTOR_MEMORY = int(
        MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
    )  # set executor memory in GB on each worker node

    # Make sure there is a dir for spark logs
    if not os.path.exists('spark_logs'):
        os.mkdir('spark_logs')

    conf = SparkConf().setAll(
        [
            ("spark.app.name", 'spark_assignment'), # Change to your liking 
            ("spark.sql.caseSensitive", False), # Optional: Make queries strings sensitive to captialization
            ("spark.master", MASTER_HOST),
            ("spark.cores.max", CLUSTER_CORES_MAX),
            ("spark.executor.cores", EXECUTOR_CORES),
            ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
            ("spark.eventLog.enabled", True),
            ("spark.eventLog.dir", "spark_logs"),
            ("spark.history.fs.logDirectory", "spark_logs"),
            ("spark.deploy.mode", "cluster"),
        ]
    )

    ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
    CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
        EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
    )

    assert (
        int(CHECK) <= CLUSTER_MEMORY_MAX
    ), "Executor memory larger than cluster total memory!"

    # Stop previous session that was just for loading cluster params
    spark.stop()

    # Start new session with above config, that has better resource handling
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()
    sc = spark.sparkContext
    _EXECUTED_ = True
    print("Success!")

Click on the "SparkMonitor" tab at the top in Jupyter Lab to see the status of running code on the cluster.

## Loading the data
Here we specify where the yelp datasets are located on UCloud and read then using the spark session.

In [2]:
# Read in the business and review files
# This is the path to the shared datasets provided by adding an the dataset input folder
# when submitting the spark cluster job.
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') # Use the file:/// prefix to indicate we want to read from the cluster's filesystem
business = business.persist()
# Persist 2 commonly used dataframes since they're used for later computations
# https://sparkbyexamples.com/spark/spark-difference-between-cache-and-persist/

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
reviews = reviews.persist()

In [None]:
# Get number of rows with no sampling:
reviews.count()

In [None]:
#business.show()
# Get only the column names
#column_names = reviews.columns
#print(column_names)
# Get only the first row
first_row = reviews.head(1)
print(first_row)


In [None]:
# Filter to only Arizona businesses with "Mexican" as part of their categories
az_mex = business.filter(business.state == "AZ")\
                .filter(business.categories.rlike("Mexican"))\
                .select("business_id", "name")

# Join with the reviews
az_mex_rs = reviews.join(az_mex, on="business_id", how="inner")

# Filter to only 5 star reviews
good_az_mex_rs = az_mex_rs.filter(az_mex_rs.stars == 5)\
                        .select("name","text")

# Print the top 20 rows of the DataFrame
good_az_mex_rs.show()

# Convert to pandas (local object) and save to local file system
good_az_mex_rs.toPandas().to_csv("good_az_reviews.csv", header=True, index=False, encoding='utf-8')


In [None]:
#### Begining of the project:

In [7]:
bu = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') 
bu = business.persist()

us = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

re = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
re = reviews.persist()

In [None]:
# EDA
bu.show(1)
#us.show(1)
#re.show(1)


In [12]:
#3.1 Specific DataFrame Queries

In [None]:
#1. Find the total number of reviews for all businesses. The output should be in the form of a Spark Table/DataFrame with one value representing the count.

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum

# Sum the values in the review_count column
total_review_count = bu.agg(spark_sum("review_count")).collect()[0][0]

# Print the result
print("Total Review Count:", total_review_count)


In [None]:
#2.Find all businesses that have received 5 stars and that have been reviewed by 500 or more users. The output should be in the form of DataFrame of (name, stars, review count).
from pyspark.sql.functions import col

businesses = bu.filter((col("stars") == 5) & (col("review_count") >= 500)) \
               .select("name", "stars", "review_count")

# Show the DataFrame of businesses with 5 stars and 500 or more reviews
businesses.show()

# Count the businesses with 5 stars and 500 or more reviews
count_businesses = businesses.count()

# Print the result
print("Number of businesses with 5 stars and 500 or more reviews:", count_businesses)



In [None]:
#3.Find the influencers who have written more than 1000 reviews. The output should be in the form of a Spark Table/DataFrame of user id. Find the businesses that have been
import pyspark.sql.functions as f

influencers = us.filter(col("review_count") > 1000).select("user_id")

# Count the influencers who have written more than 1000 reviews.
count_influencers = influencers.count()

# Print the result
print("Number of influencers who have written more than 1000 reviews:", count_influencers)
influencers.show()

In [None]:
business_reviews_users = bu.join(re, "business_id").join(influencers, "user_id")
business_count = business_reviews_users.groupby("business_id").agg(f. countDistinct("user_id").alias("infl_count"))
bus_5_infl = business_count.filter(business_count.infl_count > 5).select("business_id")

bus_5_infl.show()

In [None]:
business_reviews_users = bu.join(re, "business_id").join(influencers, "user_id")
business_count = business_reviews_users.groupby("business_id").agg(f. countDistinct("user_id").alias("infl_count"))
bus_5_infl = business_count.filter(business_count.infl_count > 5).select("business_id")
bus_5_infl.count()

In [None]:
#3.Find the influencers who have written more than 1000 reviews. The output should be in the form of a Spark Table/DataFrame of user id. Find the businesses that have been
import pyspark.sql.functions as f

influencers = us.filter(col("review_count") > 1000).select("user_id")

# Show the DataFrame of influencers' user IDs
#influencers.show()

# Count the influencers who have written more than 1000 reviews.
count_influencers = influencers.count()

# Print the result
print("Number of influencers who have written more than 1000 reviews:", count_influencers)


#3.Find the businesses that have been reviewed by more than 5 influencer users.



# Join business, reviews, and corresponding influencers using an inner join
business_review_influencers = business.join(reviews, "business_id").join(influencers, "user_id")

# For each business, count the number of unique influencers
business_influencer_count = business_review_influencers.groupby("business_id").agg(f.countDistinct("user_id").alias("count_influencers"))

# Filter businesses with more than 5 unique influencers and select the business_id
businesses_with_more_than_5_influencers = business_influencer_count.filter(business_influencer_count.count_influencers > 5).select("business_id")

# Count the number of businesses with more than 5 unique influencers
count_businesses = businesses_with_more_than_5_influencers.count()

print(f'Number of businesses with more than 5 unique influencers: {count_businesses}')


In [None]:
#4. Find an ordered list of users based on the average star counts they have given in all their reviews.

# Compute the average star counts given by users in all their reviews
review_user_avg_stars = re.join(us, "user_id").groupby("user_id").mean("stars").sort("avg(stars)", ascending=False)

# Show the ordered list of users based on average star counts
review_user_avg_stars.show(5)



In [None]:
#3.2.1 Data Exploration
#1.  A) What is the percentage of reviews containing a variant of the word "authentic"?

# Filter out the businesses to be only restaurants
restaurants = business[business.categories.contains('Restaurants')]
restaurant_reviews = restaurants.join(reviews, "business_id")

# Filter reviews containing the word "authentic"
authentic_reviews = restaurant_reviews.filter(restaurant_reviews.text.rlike('authentic'))

# Get the percentage of such reviews
authentic_percentage = (authentic_reviews.count() / restaurant_reviews.count()) * 100

# Print the percentage
print(f'{round(authentic_percentage, 2)} percentage of reviews contain the word "authentic"')


#1. B) How many reviews contain the string "legitimate" grouped by type of cuisine?
# Filter reviews with the word "legitimate"
legitimate_reviews = restaurant_reviews.filter(restaurant_reviews.text.rlike('legitimate'))
count_legitimate_reviews = legitimate_reviews.count()

print(f'Reviews containing the word "legitimate": {count_legitimate_reviews}')

# Calculate the percentage of reviews containing the word "legitimate"
legitimate_percentage = (count_legitimate_reviews / restaurant_reviews.count()) * 100

print(f'{round(legitimate_percentage, 2)} percentage of reviews contain the word "legitimate"')


In [None]:
############# QUESTION
# Add a column indicating whether the review contains authenticity language
restaurant_reviews = restaurant_reviews.withColumn("contains_authenticity", restaurant_reviews.text.rlike('authentic|legitimate'))
restaurant_reviews_cube = restaurant_reviews.cube("state", "city", "contains_authenticity").count().orderBy("count", ascending=False)






# Add column mentioning whether the review includes some NEGATIVE words
restaurant_reviews = restaurant_reviews.withColumn("contains_negative", restaurant_reviews.text.rlike('(dirty) |(cheap) | (rude)'))

# Add column mentioning whether the review includes some POSITIVE words
restaurant_reviews = restaurant_reviews.withColumn("contains_positive", restaurant_reviews.text.rlike('(nice) | (fresh) | (eleg)'))

# Get reviews with authentic language and negative/positive language
restaurant_reviews_neg = restaurant_reviews.filter((restaurant_reviews.contains_authenticity) & (restaurant_reviews.contains_negative))
restaurant_reviews_pos = restaurant_reviews.filter((restaurant_reviews.contains_authenticity) & (restaurant_reviews.contains_positive))

# Look at the categories
# * All
category_count_all = restaurant_reviews.withColumn('category', F.explode(F.split(F.col('categories'), ', '))) \
    .groupBy('category').count().withColumnRenamed("count", "count_all")

# * Negative
restaurant_reviews_neg_count = restaurant_reviews_neg.withColumn('category', F.explode(F.split(F.col('categories'), ', '))) \
    .groupBy('category').count().withColumnRenamed("count", "count_neg")
restaurant_reviews_neg_count = restaurant_reviews_neg_count.join(category_count_all, "category") \
    .withColumn("normalized", (F.col("count_neg") / F.col("count_all")) * 100)

# * Positive
restaurant_reviews_pos_count = restaurant_reviews_pos.withColumn('category', F.explode(F.split(F.col('categories'), ', '))) \
    .groupBy('category').count().withColumnRenamed("count", "count_pos")
restaurant_reviews_pos_count = restaurant_reviews_pos_count.join(category_count_all, "category") \
    .withColumn("normalized", (F.col("count_pos") / F.col("count_all")) * 100)

# Save the results
restaurant_reviews_neg_count.orderBy("normalized", ascending=True).show()
restaurant_reviews_pos_count.orderBy("normalized", ascending=True).show()



In [None]:
#2. • Is there a difference in the amount of authenticity language used in the different areas?
# Add a column indicating whether the review contains authenticity language
restaurant_reviews = restaurant_reviews.withColumn("contains_authenticity", restaurant_reviews.text.rlike('authentic|legitimate'))

# Group by state and city, and count the number of reviews containing authenticity language
region_authenticity_counts = restaurant_reviews.groupBy("state", "city").agg(
    f.sum(f.col("contains_authenticity").cast("int")).alias("authenticity_count"),
    f.count("*").alias("total_reviews")
)

# Calculate the percentage of reviews containing authenticity language for each region
region_authenticity_counts = region_authenticity_counts.withColumn(
    "authenticity_percentage", 
    f.col("authenticity_count") / f.col("total_reviews") * 100
)

# Show the result
region_authenticity_counts.show(10)




In [None]:
#3.2.2 Hypothesis TestingNull Hypothesis 
#(H0):There is no significant difference in the relationship between authenticity language and typically negative words in restaurants serving South American or Asian cuisine compared to restaurants serving European cuisine.

#(H1):There is a significant difference in the relationship between authenticity language and typically negative words in restaurants serving South American or Asian cuisine compared to restaurants serving European cuisine.


In [None]:
from pyspark.sql.functions import col

# Filter the DataFrame for South American, Asian, and European cuisines
cuisine_filtered = re.filter((col('categories').contains('South American')) | 
                                  (col('categories').contains('Asian')) | 
                                  (col('categories').contains('European')))

# Count the number of reviews containing the word "authentic" for each cuisine type
authentic_counts_by_cuisine = cuisine_filtered.groupBy('categories').agg(
    count(when(col('text').rlike('authentic'), 1)).alias('authentic_count_by_cuisine')
)

# Count the number of reviews containing the word "legitimate" for each cuisine type
legitimate_counts_by_cuisine = cuisine_filtered.groupBy('categories').agg(
    count(when(col('text').rlike('legitimate'), 1)).alias('legitimate_count_by_cuisine')
)

# Join the counts for both words by cuisine type
counts_by_cuisine = authentic_counts_by_cuisine.join(legitimate_counts_by_cuisine, 'categories')

# Calculate the percentage of reviews containing each word for each cuisine type
counts_by_cuisine = counts_by_cuisine.withColumn('authentic_percentage', 
                                                 (col('authentic_count_by_cuisine') / rest_rs.count()) * 100)
counts_by_cuisine = counts_by_cuisine.withColumn('legitimate_percentage', 
                                                 (col('legitimate_count_by_cuisine') / rest_rs.count()) * 100)

# Display the results
counts_by_cuisine.show(10)


In [None]:
restaurants = bu.filter(bu.categories.rlike("Restaurants"))
restaurants.show()

In [None]:
restaurants.columns

In [None]:
restaurants.select('postal_code').show()

In [2]:

bu = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') 
#bu = business.persist()

us = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

re = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
#re = reviews.persist()

In [3]:
restaurants = bu.filter(bu.categories.rlike("Restaurants"))


In [4]:
#Filter out all the restaurants 
all_df = re.join(restaurants , re['business_id'] == restaurants ['business_id'], 'inner') \
          .join(us, re['user_id'] == us['user_id'], 'inner') \
          .select(re['stars'], re['text'],restaurants['categories'],restaurants['state'],restaurants['city'])


In [None]:
#all_df.show()

In [5]:
# checking how the words authentic and legitimate are compared to each other 
# Filter reviews that contain the words "authentic" and "legitimate"

from pyspark.sql.functions import col
filtered_df = all_df.filter(col("text").contains("authentic") | col("text").contains("legitimate"))

# Show the filtered DataFrame
#filtered_df.show()


In [6]:
# Group by city and count the occurrences
from pyspark.sql.functions import col, count

cube_df = filtered_df.cube("state", "city").agg(count("*").alias("count"))

In [None]:
#cube_df.show()

In [7]:
sorted_cube_df = cube_df.orderBy(col("count").desc())
#sorted_cube_df.show()

In [8]:
# Remove rows where state or city is null
filtered_sorted_cube_df = sorted_cube_df.filter(col("state").isNotNull() & col("city").isNotNull())

In [None]:
#filtered_sorted_cube_df.show()

In [9]:
# Remove the specific row with state="CA" and city="Santa Barbara"
filtered_sorted_cube_df = filtered_sorted_cube_df.filter((col("state") != "CA") | (col("city") != "Santa Barbara"))


In [None]:
all_df.show()

In [None]:
first_row_categories = all_df.select("categories").first()[0]
#print(first_row_categories)
# Take the second row of categories
second_row_categories = all_df.select("categories").collect()[1][0]

# Print the second row of categories
#print("Second row of categories:", second_row_categories)

In [None]:
#filtered_df.show()


In [None]:
# DataFrame containing French, Italian, Mediterranean, or European categories



european_df = filtered_df.filter((col("categories").contains("French")) |    (col("categories").contains("Italian")) |    (col("categories").contains("Mediterranean")) |
    (col("categories").contains("European")))

# DataFrame containing Thai, Japanese, Chinese, Korean, Indian, Mexican, Soul, Latin American, and Asian categories

asian_df = filtered_df.filter(
    (col("categories").contains("Thai")) |
    (col("categories").contains("Japanese")) |
    (col("categories").contains("Chinese")) |
    (col("categories").contains("Korean")) |
    (col("categories").contains("Indian")) |
    (col("categories").contains("Mexican")) |
    (col("categories").contains("Soul")) |
    (col("categories").contains("Latin American")) |
    (col("categories").contains("Asian"))
)

# Show the DataFrames
print("European DataFrame:")
#european_df.show()

#print("Asian DataFrame:")
#asian_df.show(truncate=False)


In [11]:

sampled_european_df = european_df.sample(False, 0.1)  
sampled_european_df = sampled_european_df.limit(50)

sampled_asian_df = asian_df.sample(False, 0.1)  
sampled_asian_df = sampled_asian_df.limit(50)




In [None]:
print("Average sentiment scores for Asian cuisines")
print("Positive score: {}".format(0.20496368239812873))
print("Negative score: {}".format(-0.017286704808065025))
print("\nAverage sentiment scores for European cuisines:")
print("Positive score: {}".format(0.27702170666841436))
print("Negative score: {}".format(-0.029861111111111116))


In [None]:
sampled_european_df.columns

In [23]:
from pyspark.sql.functions import col

# Select the "text" column containing blob data
blob_data_european_df = sampled_european_df.select(col("text"))
blob_data_asian_df = sampled_asian_df.select(col("text"))


In [24]:
# Save European DataFrame text data to a text file
blob_data_european_df.write.text("european_text_data.txt")

# Save Asian DataFrame text data to a text file
blob_data_asian_df.write.text("asian_text_data.txt")


In [19]:
# Save sampled European DataFrame to CSV
#sampled_european_df.write.csv("sampled_european.csv", header=True, mode="overwrite")

# Save sampled Asian DataFrame to CSV
#sampled_asian_df.write.csv("sampled_asian.csv", header=True, mode="overwrite")


In [12]:
#from textblob import TextBlob


In [12]:
import nltk
#nltk.download('vader_lexicon')


In [13]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from nltk.sentiment.vader import SentimentIntensityAnalyzer

In [14]:
# Function to calculate polarity score using VADER
def get_sentiment_score(text):
    sid = SentimentIntensityAnalyzer()
    sentiment = sid.polarity_scores(text)
    return sentiment["compound"]


In [15]:
# Define UDF for sentiment analysis
sentiment_udf = udf(get_sentiment_score, FloatType())

# Add polarity score to Asian DataFrame
asian_df_with_sentiment = asian_df.withColumn("sentiment_score", sentiment_udf(col("text")))

# Add polarity score to European DataFrame
european_df_with_sentiment = european_df.withColumn("sentiment_score", sentiment_udf(col("text")))


In [127]:
sampled = all_df.limit(1500)


In [None]:
#pip install sklearn

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.feature import VectorAssembler
from pyspark.ml. regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql. functions import col
#from sklearn.feature_extraction.text import TfidfVectorizer, HashingVectorizer


In [128]:
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
reviews_tokenized = tokenizer. transform(all_df)

In [129]:
from pyspark.ml.feature import HashingTF, IDF

In [156]:
# Token to Numerical Features
tokens2numb = HashingTF(inputCol="tokens", outputCol="NewFeatures", numFeatures=500)
#transformer = IDF(inputCol="NewFeatures", outputCol="NewFeatures1")


In [None]:

model = transformer.fit(tokens2numb.transform(reviews_tokenized))
transformed_data = model.transform(tokens2numb.transform(reviews_tokenized))



In [163]:
# Show the transformed data
transformed_data= transformed_data.drop("NewFeatures1")

In [None]:
# Show the transformed data
transformed_data.show()

In [167]:
assembler = VectorAssembler(inputCols=["NewFeatures"], outputCol="features_vector1")
transformed_data = assembler.transform(transformed_data)

In [None]:
transformed_data.select("features_vector1").show()


In [145]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the data  (60%, 20%, 20%)
train_data, test_data, dev_data = transformed_data.randomSplit([0.6, 0.2, 0.2], seed=42)



In [None]:
train_data.columns

In [None]:
train_data.show()

In [None]:
# Select all columns except the target variable (stars), NewFeatures, and features
feature_columns = [col for col in train_data.columns if col not in ["stars", "NewFeatures", "features","tokens","text"]]

# Use selected columns as features
train_data.select(feature_columns).show()


In [None]:
from pyspark.ml.feature import VectorAssembler



# Assemble all feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="Feat_vector1")

# Apply the assembler to your DataFrame to combine all features into a single vector
final_data = assembler.transform(train_data)
# Drop the specified columns
final_data = final_data.drop("tokens", "NewFeatures", "features")

final_data.show()


In [150]:
final_data = final_data.drop("Feat_vector1")


In [None]:
final_data.show()

In [142]:
# Initialize DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol="features_vector1", labelCol="stars")

# Train the decision tree model on the training data
dt_model = dt.fit(train_data)

# Get feature importances from the trained model
feature_importances = dt_model.featureImportances.toArray()



In [None]:
# Plot the feature importances
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.bar(range(len(feature_importances)), feature_importances)
plt.xlabel('Feature Index')
plt.ylabel('Importance')
plt.title('Feature Importances')
plt.show()

In [None]:
# Create a list of (feature_index, importance) tuples
feature_importance_list = [(index, importance) for index, importance in enumerate(feature_importances)]

# Sort the features by importance in descending order
sorted_features = sorted(feature_importance_list, key=lambda x: x[1], reverse=True)

# Print the sorted features
print("Sorted Features by Importance:")
for feature in sorted_features:
    print(f"Feature Index: {feature[0]}, Importance: {feature[1]}")

In [170]:
feature_indices = [105, 405, 250, 204]
selected_feature_names = [f'feature_{idx}' for idx in feature_indices]

In [None]:
# Train linear regression model
lr = LinearRegression(featuresCol="features", labelCol="stars")
lr_model = lr.fit(train_data)

# Make predictions on dev data
dev_predictions = lr_model.transform(dev_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="stars", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(dev_predictions)
print("Mean Squared Error on Dev Data for Linear Regression:", mse)

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Train Random Forest model
rf = RandomForestRegressor(featuresCol="features", labelCol="stars")
rf_model = rf.fit(train_data)

# Make predictions on dev data
dev_predictions = rf_model.transform(dev_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="stars", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(dev_predictions)
print("Mean Squared Error on Dev Data for Random Forest:", mse)


In [None]:
# Make predictions on test data
test_predictions = lr_model.transform(test_data)

# Calculate Mean Squared Error on test data
mse_test = evaluator.evaluate(test_predictions)
print("Mean Squared Error on Test Data for Linear Regression:", mse_test)