### Student's Contact Information:
**Name:** Gloria Moraa Angwenyi

**Email:** gloriaa@arizona.edu

Part 0: Configure Spark and Download Data

In [46]:
!pip install pyspark



In [47]:
# create spark context and spark session
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MIS584_Lab_Assignment2")
#sc = SparkContext(conf = conf)

# Get or create a SparkContext
sc = SparkContext.getOrCreate()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MIS584_Lab_Assignment2").getOrCreate()

## Part 1: Practice of PySpark RDD

In [48]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [49]:
text_file_path = "/content/drive/MyDrive/Colab Notebooks/text_reviews.txt"

In [50]:
# Read the text file into an RDD
lines_rdd = sc.textFile(text_file_path)


In [51]:
import pandas as pd

# Define a custom mapper function to extract user IDs and review text
def extract_user(lines):
    # Split the line into user ID and review text using a delimiter \t
    parts = lines.split('\t')

    # Ensure that the line has at least two parts (user ID and review text)
    if len(parts) >= 2:
        user_id = parts[0].strip()
        review_text = parts[1].strip()
        return (user_id, review_text)
    else:

        return None

# Use the map transformation to apply the custom mapper function
# Create a new RDD containing user_id and  review_text
user_id_review_rdd = lines_rdd.map(extract_user)


# Print the first 10 records:
for record in user_id_review_rdd.take(10):
    print(record)

# Convert the RDD to a DataFrame using toDF()
columns = ["user_id", "review_text"]
df = user_id_review_rdd .toDF(columns)

# Show the DataFrame
df.show()

#Use distinct() to get the unique user IDs.
unique_users = df.select('user_id').distinct()

# Count the number of unique users
num_unique_users = unique_users.count()

# Print the number of unique users
print("Number of unique users:", num_unique_users)

# Use reduceByKey to sum the review counts for each user
user_review_counts = user_id_review_rdd.reduceByKey(lambda a, b: a + b)

# Sort the RDD by total number of reviews in descending order
sorted_user_review_counts = user_review_counts.sortBy(lambda x: x[1], ascending=False)

# Take the top 10 users with the largest number of reviews
top_10_users = sorted_user_review_counts.take(10)

# Print the top 10 users with large number of reviews
print("Top 10 users with largest number of reviews:")
for user_id in top_10_users:
   print(f"{user_id}")


# Use reduceByKey to calculate the total length of reviews for each user
user_total_length = user_id_review_rdd.reduceByKey(lambda a, b: a + b)

# Sort the RDD by values (total length of reviews) in ascending order
sorted_user_total_length = user_total_length.sortBy(lambda x: x[1], ascending=True)

# Take the 10 users with the lowest total length of reviews
top_10_quiet_users = sorted_user_total_length.take(10)

# Create a dictionary to store the total length of reviews for each user
user_total_length = {}

# Iterate through the reviews_data and calculate the total length of reviews for each user
for user_id, review_text in top_10_quiet_users:
    if user_id not in user_total_length:
        user_total_length[user_id] = 0
    user_total_length[user_id] += len(review_text)

# Sort the dictionary by total length of reviews in ascending order
sorted_user_total_length = dict(sorted(user_total_length.items(), key=lambda x: x[1]))

# Get the 10 users with the lowest total length of reviews
top_10_quiet_users = list(sorted_user_total_length.items())[:10]

# Print the 10 most quiet users and their total length of reviews
print("10 most quiet users:")
for user_id, total_length in top_10_quiet_users:
    print(f"User {user_id}: Length of Reviews: {total_length} characters")

# Stop the Spark Session.
sc.stop()



('CMYCfKoEu0WF9_43zRgr8g', "We love this little restaurant! It's not as overrated  and loud compared to other places in Tucson. Sushi is awesome and for the right price.")
('CMYCfKoEu0WF9_43zRgr8g', "We came here for dinner this evening and was absolutely delicious! Truly I was blown away. I ordered a fish special at market price and couldn't believe how yummy it was. Can't recall the name of the fish but it was in the grouper family. It tasted very identical to crab meat.   I had my one year old daughter with and couldn't get her to eat anything, but she loved my fish! Service was friendly. Love it here, but it is pricey! $100+ for two people, however every penny was well spent.")
('CMYCfKoEu0WF9_43zRgr8g', "Just a heads up the owner, Roya, will not give refunds or respond to clients. I paid $500+ to reserve hair and makeup for my bridal party in September.   I was not informed that the business was closed and haven't received any responses regarding to getting a refund. Roya is a tot

## Part 2: Practice of PySpark SQL

In [None]:
from pyspark.sql.functions import col, length
from pyspark.sql.functions import round
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV and Show Schema") \
    .getOrCreate()

# Define the path to the CSV file
csv_file_path = "/content/drive/MyDrive/Colab Notebooks/user_reviews.csv"

# Use the read.csv function to read the CSV into a DataFrame
df1 = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Print the schema of the DataFrame
df1.printSchema()

# Show the first 20 rows of the DataFrame
df1.show(20)


# Fill the null values with the string "missing"
df_filled = df1.fillna("missing")
df_filled.show(10)

# Use filter to remove rows with missing values
df_filtered = df_filled.filter((df_filled['review_id'] != 'missing') & (df_filled['business_id'] != 'missing'))
df_filtered.show(20)


# Count the number of rows left after removing missing values
row_count = df_filtered.count()
print("Number of rows after removing missing values:", row_count)

# Add a new column 'review_text_length' with the length of the 'review_text' column
df_with_length = df_filtered.withColumn("review_text_length", length(col("review_text")))

# Show the DataFrame with the new column
df_with_length.show(10)

# Group by 'user_id' and calculate the averages
grouped_df = df_filtered.groupBy("user_id").agg(
    round(F.avg("review_stars"), 2).alias("avg_star_rating"),
    round(F.avg(F.length("review_text")), 2).alias("avg_review_length"),
    round(F.sum("useful"), 2).alias("total_useful_votes")
)

# Sort the DataFrame as specified
sorted_df = grouped_df.sort(F.col("total_useful_votes").desc())

sorted_df2 = grouped_df.sort(F.col("avg_star_rating").asc())

# Show the first 20 rows
sorted_df.show(20)
sorted_df2.show(20)



# Stop the spark session
spark.stop()



root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- review_stars: double (nullable = true)
 |-- useful: integer (nullable = true)
 |-- review_text: string (nullable = true)

+--------------------+--------------------+--------------------+------------+------+--------------------+
|           review_id|             user_id|         business_id|review_stars|useful|         review_text|
+--------------------+--------------------+--------------------+------------+------+--------------------+
|FTcRb7TUjE-K6spSj...|CMYCfKoEu0WF9_43z...|5Ce3lZksYVkCbrihq...|         5.0|     2|We love this litt...|
|oyxS126nYDZOL0qwP...|CMYCfKoEu0WF9_43z...|CA5BOxKRDPGJgdUQ8...|         5.0|     1|We came here for ...|
|KbFlOy2PN2dXBjdk4...|CMYCfKoEu0WF9_43z...|1MAQQhmUNU0uzHw3K...|         1.0|     4|Just a heads up t...|
|mslt0F7LpdBMQmKGk...|CMYCfKoEu0WF9_43z...|QXB4E78FXn3eotalX...|         1.0|     9|Came in to get my...|
|5S