<a href="https://colab.research.google.com/github/dnmuturi/SIT742/blob/main/SIT742Task2code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**END TERM ASSIGNMENT**

 **GROUP C30 Members**

1. David Muturi - S225177509
2. Nhlanhla Matukane - S225177376
3. Vincent Nwobi -

### **Part 1**

**Answer 1.1**

In [2]:
#install the spark library
!pip install pyspark



In [3]:
#load necessary libraries
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, from_unixtime, to_date, when, lit, trim
from google.colab import drive

In [4]:
# Mount Google Drive
drive.mount('/content/drive')
# Define the path to the CSV file
csv_path = '/content/drive/My Drive/SIT742/review.csv'

Mounted at /content/drive


In [5]:
# Create a SparkSession
spark = SparkSession.builder.appName("ReadCSVCorrectly").getOrCreate()
# Read the CSV file into a PySpark DataFrame with options to improve parsing
try:
    df_review= spark.read.csv(
        csv_path,
        header=True,
        inferSchema=True,
        sep=',',  # Assuming comma is the delimiter
        quote='"', # Assuming double quotes are used for quoting fields
        escape='"', # Assuming double quotes are escaped by double quotes
        multiLine=True # Set to true if text column contains newline characters
    )
    print("CSV file loaded successfully!")
    df_review.show(5, truncate=False) # Display the first five rows without truncating
except Exception as e:
    print(f"An error occurred: {e}")

CSV file loaded successfully!
+---------------------+-----------------+-------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+----+-------------------------------------+
|user_id              |name             |time         |rating|text                                                

In [6]:
# show data tpes
df_review.printSchema()

root
 |-- user_id: double (nullable = true)
 |-- name: string (nullable = true)
 |-- time: long (nullable = true)
 |-- rating: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- pics: string (nullable = true)
 |-- resp: string (nullable = true)
 |-- gmap_id: string (nullable = true)



In [7]:
#show summary statistics
df_review.describe().show()

+-------+--------------------+------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+
|summary|             user_id|        name|                time|            rating|                text|                pics|                resp|             gmap_id|
+-------+--------------------+------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+
|  count|              521515|      521515|              521515|            521515|              298257|               20851|               44232|              521515|
|   mean|1.091263946357014...|1.59456176E8|1.550301924036441E12| 4.283750227701984|                NULL|                NULL|                NULL|                NULL|
| stddev|5.306023249666912...|         0.0|  3.8404430568757E10|1.0356221247508024|                NULL|                NULL|                NULL|              

**Answer 1.1.1**

In [8]:
#function to count rows that are none or null
def show_empty_rows(df):
  # Filter rows where 'text' column is null or an empty string
  null_or_empty_text_count = df.filter(
    col("text").isNull() | (trim(col("text")) == "")
  ).count()
  return null_or_empty_text_count

In [9]:
#show count of empty rows in text field before filling in with no review
print(f"The number of empty rows in the text column before replacing with 'no review' is: {show_empty_rows(df_review)}")

The number of empty rows in the text column before replacing with 'no review' is: 223258


In [10]:
# Replace null values with "No review"
df_review = df_review.fillna({'text': 'no review'})

# Replace empty strings (after trimming whitespace) with "No review"
df_review = df_review.withColumn("text",
    when(trim(col("text")) == "", lit("no review")).otherwise(col("text"))
)

# Show the count of text column with  to verify the changes
print(f"The number of empty rows in the text column after replacing with no review is: {show_empty_rows(df_review)}")

# Count rows where the 'text' column is 'no review'
no_review_count = df_review.filter(col("text") == "no review").count()

print(f"The number of reviews with 'no review' in the text column is: {no_review_count}")


The number of empty rows in the text column after replacing with no review is: 0
The number of reviews with 'no review' in the text column is: 223258


**Answer 1.1.2**

In [11]:
# Set the time parser policy to legacy to handle potential parsing issues
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Convert 'time' from epoch milliseconds to a timestamp, then to a date string
df_review = df_review.withColumn(
    "newtime",
    to_date(from_unixtime(col("time") / 1000), "yyyy-MM-dd")
)

# Display the first few rows with the new column
df_review.select("user_id","name","rating","time", "newtime","text","gmap_id").show(5)

+--------------------+-----------------+------+-------------+----------+--------------------+--------------------+
|             user_id|             name|rating|         time|   newtime|                text|             gmap_id|
+--------------------+-----------------+------+-------------+----------+--------------------+--------------------+
|1.091298048426862E20|       Nicki Gore|     5|1566331951619|2019-08-20|We always stay he...|0x56b646ed2220b77...|
|1.132409264057589...|    Allen Ratliff|     5|1504917982385|2017-09-09|Great campground ...|0x56b646ed2220b77...|
|1.130448378911412...|Jonathan Tringali|     4|1474765901185|2016-09-25|We tent camped he...|0x56b646ed2220b77...|
|1.103291551475920...|           S Blad|     4|1472858535682|2016-09-02|This place is jus...|0x56b646ed2220b77...|
| 1.08989634908602E20|   Daniel Formoso|     5|1529649811341|2018-06-22|Probably the nice...|0x56b646ed2220b77...|
+--------------------+-----------------+------+-------------+----------+--------

**Answer 1.2**

In [12]:
from pyspark.sql.functions import count, col
from pyspark.sql.types import DoubleType

# Calculate the number of reviews per unique gmap_id
reviews_per_gmap = df_review.groupBy("gmap_id").agg(count("*").alias("review_count"))

# Cast the review_count to float type
reviews_per_gmap = reviews_per_gmap.withColumn("review_count", col("review_count").cast(DoubleType()))

# Show the top 5 results
print("Number of reviews per unique gmap_id (Top 5):")
reviews_per_gmap.orderBy(col("review_count").desc()).show(5)

Number of reviews per unique gmap_id (Top 5):
+--------------------+------------+
|             gmap_id|review_count|
+--------------------+------------+
|0x56c897b9ce6000d...|      2833.0|
|0x56c899d05892048...|      2594.0|
|0x56c897c63697ee3...|      2258.0|
|0x56c8965ee2fb87a...|      2237.0|
|0x56c89629bde7481...|      2219.0|
+--------------------+------------+
only showing top 5 rows

