In [None]:
## NEXT STEPS:
##      TAKE OUT SPECIAL CHARACTERS --> DONE
##      CONVERT TEXT INTO LOWERCASE --> DONE
##      REMOVE STOPWORDS AND PUNCTUATION --> DONE
##      ADD LABEL --> LATER
##      READ ALL FILES FROM FOLDER AND COMBINE THEM INTO A SINGLE CSV --> DONE
##      READ FILES FROM S3
##      CONVERT TO GLUE CODE

### EXTRA
##      TOKENIZATION
##      STEMMING/LEMMATIZATION

In [8]:
import os
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, regexp_replace, lit, size, udf, lower
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HotelDataProcessing") \
    .getOrCreate()

# Path to the local JSON files folder
json_folder_path = '../input/'  # Update this path
output_path = '../output/'  # Update this path as needed

# Define the schema
schema = StructType([
    StructField("hotel_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("rating", StringType(), True),
    StructField("user_ratings_total", IntegerType(), True),
    StructField("max_number_of_people", IntegerType(), True),
    StructField("address", StringType(), True),
    StructField("business_status", StringType(), True),
    StructField("place_id", StringType(), True),
    StructField("amenities", MapType(StringType(), StringType()), True),
    StructField("photos", ArrayType(
        StructType([
            StructField("photo_reference", StringType(), True),
            StructField("s3_url", StringType(), True),
            StructField("html_attributions", ArrayType(StringType()), True)
        ])
    ), True),
    StructField("reviews", ArrayType(
        StructType([
            StructField("user", StringType(), True),
            StructField("rating", StringType(), True),
            StructField("date", StringType(), True),
            StructField("review", StringType(), True)
        ])
    ), True),
    StructField("source", StringType(), True)
])

# Define a UDF to convert relative dates to number of days
def convert_to_days(date_str):
    try:
        if "day" in date_str:
            days = int(re.findall(r'\d+', date_str)[0])
        elif "week" in date_str:
            days = int(re.findall(r'\d+', date_str)[0]) * 7
        elif "month" in date_str:
            days = int(re.findall(r'\d+', date_str)[0]) * 30
        elif "year" in date_str:
            days = int(re.findall(r'\d+', date_str)[0]) * 365
        else:
            days = 0
    except (ValueError, IndexError):
        days = 0
    return days

convert_to_days_udf = udf(convert_to_days, IntegerType())

# Initialize an empty DataFrame with the target schema
empty_df_schema = StructType([
    StructField("region", StringType(), True),
    StructField("hotel_name", StringType(), True),
    StructField("rating", StringType(), True),
    StructField("user_ratings_total", IntegerType(), True),
    StructField("number_of_photos", IntegerType(), True),
    StructField("address", StringType(), True),
    StructField("business_status", StringType(), True),
    StructField("review_user", StringType(), True),
    StructField("review_rating", StringType(), True),
    StructField("review_date_in_days", IntegerType(), True),
    StructField("review_text", StringType(), True)
])

combined_df = spark.createDataFrame([], empty_df_schema)

# Iterate over all JSON files in the folder
for file_name in os.listdir(json_folder_path):
    if file_name.endswith('.json'):
        # Extract region from the file name
        region = file_name.split('_')[0]
        json_file_path = os.path.join(json_folder_path, file_name)
        
        # Load JSON data from the local file with schema and multiline option
        df = spark.read.schema(schema).option("multiline", "true").json(json_file_path)
        
        # Explode the reviews array to analyze individual reviews
        exploded_df = df.select(
            col("hotel_name"),
            col("rating"),
            col("user_ratings_total"),
            col("address"),
            col("business_status"),
            size(col("photos")).alias("number_of_photos"),  # Add column for number of photos
            explode(col("reviews")).alias("review")
        )
        
        # Flatten the DataFrame, replace newline characters with spaces in reviews, and convert dates
        flattened_df = exploded_df.select(
            lit(region).alias("region"),  # Add region as a new column
            col("hotel_name"),
            col("rating"),
            col("user_ratings_total"),
            col("number_of_photos"),  # Include the number of photos column
            col("address"),
            col("business_status"),
            col("review.user").alias("review_user"),
            col("review.rating").alias("review_rating"),
            convert_to_days_udf(col("review.date")).alias("review_date_in_days"),  # Convert review date to days
            lower(regexp_replace(regexp_replace(col("review.review"), "\n", " "), "[^a-zA-Z0-9\s]", "")).alias("review_text")  # Clean and lowercase review text
        )
        
        # Append to the combined DataFrame
        combined_df = combined_df.unionByName(flattened_df)

# Coalesce the combined DataFrame to a single partition
combined_df = combined_df.coalesce(1)

# Show the combined DataFrame
print("Combined DataFrame:")
combined_df.show(truncate=False)

# Save the combined DataFrame to a single CSV file
combined_df.write.mode("overwrite").csv(output_path, header=True)
# combined_df.write.mode("overwrite").csv(os.path.join(output_path, 'combined_reviews.csv'), header=True)

# Stop the Spark session
spark.stop()

Combined DataFrame:


                                                                                

+----------------+----------------------------+------+------------------+----------------+--------------------------------------------------------------+---------------+-----------------------------+-------------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------