In [1]:
import os
import pandas as pd
from dotenv import load_dotenv

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types 
from pyspark.sql.functions import to_timestamp

load_dotenv()

True

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
SPARK_PARQUET_PATH = os.getenv("SPARK_PARQUET_PATH")

In [4]:
df = spark.read.parquet(f"{SPARK_PARQUET_PATH}")
df.printSchema()

root
 |-- Hotel_Address: string (nullable = true)
 |-- Additional_Number_of_Scoring: integer (nullable = true)
 |-- Review_Date: timestamp (nullable = true)
 |-- Average_Score: double (nullable = true)
 |-- Hotel_Name: string (nullable = true)
 |-- Reviewer_Nationality: string (nullable = true)
 |-- Negative_Review: string (nullable = true)
 |-- Review_Total_Negative_Word_Counts: integer (nullable = true)
 |-- Total_Number_of_Reviews: integer (nullable = true)
 |-- Positive_Review: string (nullable = true)
 |-- Review_Total_Positive_Word_Counts: integer (nullable = true)
 |-- Total_Number_of_Reviews_Reviewer_Has_Given: integer (nullable = true)
 |-- Reviewer_Score: double (nullable = true)
 |-- Tags: string (nullable = true)
 |-- days_since_review: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)



In [5]:
df.dtypes

[('Hotel_Address', 'string'),
 ('Additional_Number_of_Scoring', 'int'),
 ('Review_Date', 'timestamp'),
 ('Average_Score', 'double'),
 ('Hotel_Name', 'string'),
 ('Reviewer_Nationality', 'string'),
 ('Negative_Review', 'string'),
 ('Review_Total_Negative_Word_Counts', 'int'),
 ('Total_Number_of_Reviews', 'int'),
 ('Positive_Review', 'string'),
 ('Review_Total_Positive_Word_Counts', 'int'),
 ('Total_Number_of_Reviews_Reviewer_Has_Given', 'int'),
 ('Reviewer_Score', 'double'),
 ('Tags', 'string'),
 ('days_since_review', 'string'),
 ('lat', 'double'),
 ('lng', 'double')]

In [6]:
df.show()

+--------------------+----------------------------+-------------------+-------------+--------------------+--------------------+--------------------+---------------------------------+-----------------------+--------------------+---------------------------------+------------------------------------------+--------------+--------------------+-----------------+----------+----------+
|       Hotel_Address|Additional_Number_of_Scoring|        Review_Date|Average_Score|          Hotel_Name|Reviewer_Nationality|     Negative_Review|Review_Total_Negative_Word_Counts|Total_Number_of_Reviews|     Positive_Review|Review_Total_Positive_Word_Counts|Total_Number_of_Reviews_Reviewer_Has_Given|Reviewer_Score|                Tags|days_since_review|       lat|       lng|
+--------------------+----------------------------+-------------------+-------------+--------------------+--------------------+--------------------+---------------------------------+-----------------------+--------------------+-----------

In [7]:
from pyspark.sql.functions import regexp_extract, expr


# 1) Transform 'United Kingdom' to 'UK' in the 'Hotel_Address' column
df = df.withColumn('Hotel_Address', expr("regexp_replace(Hotel_Address, 'United Kingdom', 'UK')"))

# 2) Get the last word of each row and create a new 'Hotel_Country' column
df = df.withColumn('Hotel_Country', regexp_extract(df['Hotel_Address'], r'\b(\w+)$', 1))

# Show the DataFrame with the transformations
df.show(truncate=False)

+-----------------------------------------------------------------+----------------------------+-------------------+-------------+-------------------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------+-------------------

In [8]:
df_selected = df.select('Hotel_Address', 'Hotel_Country', "Hotel_Name", "Review_Date", 
                        "Average_Score", "Reviewer_Nationality", "Reviewer_Score")

df_selected.show(truncate=True)

+--------------------+-------------+--------------------+-------------------+-------------+--------------------+--------------+
|       Hotel_Address|Hotel_Country|          Hotel_Name|        Review_Date|Average_Score|Reviewer_Nationality|Reviewer_Score|
+--------------------+-------------+--------------------+-------------------+-------------+--------------------+--------------+
|1 Rue Du G n ral ...|       France| Gardette Park Hotel|2015-09-30 00:00:00|          8.2| United States of...|           9.6|
|1 Inverness Terra...|           UK|Grand Royale Lond...|2016-04-24 00:00:00|          7.7|              Qatar |           8.3|
|10 Palace Place W...|           UK| The Nadler Victoria|2016-09-01 00:00:00|          9.3|     United Kingdom |          10.0|
|1 Inverness Terra...|           UK|Grand Royale Lond...|2017-01-02 00:00:00|          7.7|     United Kingdom |           6.3|
|1 2 Serjeant s In...|           UK|Apex Temple Court...|2016-10-18 00:00:00|          9.2|        Switz

In [9]:
# Register the DataFrame as a temporary SQL table
df_selected.createOrReplaceTempView("hotel_reviews")

# Define the Spark SQL query
report_query = """
                SELECT
                    Hotel_Country,
                    Hotel_Name,
                    FORMAT_NUMBER(AVG(Reviewer_Score), 2) AS Avg_Reviewer_Score
                FROM
                    hotel_reviews
                GROUP BY
                    Hotel_Country, Hotel_Name
                ORDER BY
                    Hotel_Country, Avg_Reviewer_Score DESC
"""

# Execute the query
report_df = spark.sql(report_query)

# Show the report DataFrame
report_df.show()

+-------------+--------------------+------------------+
|Hotel_Country|          Hotel_Name|Avg_Reviewer_Score|
+-------------+--------------------+------------------+
|      Austria|   Hotel Sacher Wien|              9.59|
|      Austria|Hollmann Beletage...|              9.55|
|      Austria|Hotel Am Stephans...|              9.52|
|      Austria|Palais Coburg Res...|              9.51|
|      Austria|Hotel Sans Souci ...|              9.51|
|      Austria|Best Western Prem...|              9.50|
|      Austria|Boutiquehotel Das...|              9.50|
|      Austria|The Guesthouse Vi...|              9.48|
|      Austria|Hotel K nig von U...|              9.45|
|      Austria|Small Luxury Hote...|              9.42|
|      Austria|Hotel Imperial A ...|              9.40|
|      Austria|Hotel Rathaus Wei...|              9.38|
|      Austria|         Hotel J ger|              9.36|
|      Austria|Appartement Hotel...|              9.28|
|      Austria|    Hotel Capricorno|            

In [10]:
SPARK_REPORT = os.getenv("SPARK_REPORT")
print(SPARK_REPORT)

C:\Users\bmart\OneDrive\12_Data_Engineering\de-hotel-reviews\data\spark\spark_report


In [11]:
# Coalesce the DataFrame into a single partition
report_df_single_partition = report_df.coalesce(1)

# Write the DataFrame to a Parquet file
report_df_single_partition.write.parquet(f"{SPARK_REPORT}", mode='overwrite')

In [12]:
# spark.stop()