In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import json

In [2]:
spark = SparkSession.builder.appName('Amenities Processing').getOrCreate()

25/01/24 17:57:39 WARN Utils: Your hostname, w3e101 resolves to a loopback address: 127.0.1.1; using 192.168.0.231 instead (on interface enp2s0)
25/01/24 17:57:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/24 17:57:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/24 17:57:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read.json("input/expedia-lodging-amenities-en_us-1-all.jsonl")

25/01/24 17:58:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [4]:
df.show(5, truncate= False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [5]:
# Load the amenity_category.json file into a dictionary
with open("input/amenity_category.json", "r") as f:
    amenity_category_dict = json.load(f)

flat_amenity_category_dict = {}
for amenity, catagories in amenity_category_dict.items():
    if isinstance(catagories, list):
        for catagory in catagories:
            flat_amenity_category_dict[amenity] = catagory
    else:
        flat_amenity_category_dict[amenity] = catagories

mapping_expr = F.create_map(
    *[F.lit(key_val) for key_val in sum(flat_amenity_category_dict.items(), ())]
)

In [6]:
room_arrays = [
    'BATHROOM', 'BEDROOM', 'ENTERTAINMENT', 
    'FOOD_AND_DRINK', 'MORE', 'OUTDOOR_SPACE'
]

property_arrays = [
    'ACCESSIBILITY', 'ACTIVITIES_NEARBY', 'BATHROOMS', 'BEACH', 'BEDROOM',
    'BUSINESS_SERVICES', 'CLIMATE_CONTROL', 'CONVENIENCES', 'DINING',
    'ENTERTAINMENT', 'FAMILY_FRIENDLY', 'FOOD_AND_DRINK', 'GENERAL',
    'GUEST_SERVICES', 'INTERNET', 'KITCHEN', 'LANGS_SPOKEN', 'LAUNDRY',
    'LIVING_SPACES', 'LOCATION_HIGHLIGHTS', 'MORE', 'OUTDOOR', 'OUTDOORS',
    'PARKING', 'PETS', 'POOL/SPA', 'SAFETY', 'SERVICES_AND_CONVENIENCES',
    'SKI', 'SPA', 'SUITABILITY/ACCESSIBILITY', 'THINGS_TO_DO', 'WORKSPACES'
]
    
# Start with the first array from roomAmenities
result = F.filter(
    F.coalesce(F.col(f"roomAmenities.{room_arrays[0]}"), F.array()),
    lambda x: x.isNotNull()
)
    
# Add remaining room amenities
for col in room_arrays[1:]:
    filtered_array = F.filter(
        F.coalesce(F.col(f"roomAmenities.{col}"), F.array()),
        lambda x: x.isNotNull()
    )
    result = F.array_union(result, filtered_array)
    
# Add property amenities
for col in property_arrays:
    filtered_array = F.filter(
        F.coalesce(F.col(f"propertyAmenities.{col}"), F.array()),
        lambda x: x.isNotNull()
    )
    result = F.array_union(result, filtered_array)


df = df.withColumn(
    "expedia_id",
    F.col("propertyId.expedia")
).withColumn(
    "amenities",
    F.array_distinct(result)  # Ensure unique amenities
).withColumn(
    "amenities_count",
    F.coalesce(F.size(F.col("amenities")), F.lit(0))  # Count of amenities
).withColumn(
    "amenities_snake_case",
    F.expr(
        """
        transform(
            amenities,
            x -> lower(
                regexp_replace(
                    regexp_replace(
                        regexp_replace(
                            regexp_replace(
                                regexp_replace(
                                    regexp_replace(
                                        trim(x), 
                                        r"\s*\([^)]*\)", 
                                        ""
                                    ),
                                    " - ", 
                                    "-"
                                ),
                                " ", "_"  
                            ),
                            "-", "_"
                        ),
                        "/", "_"
                    ),
                    '_+', '_'
                )
            )
        )
        """
    )
).withColumn(
    "amenity_categories",
    F.array_distinct(  # Ensure only unique categories
        F.transform(
            F.col("amenities_snake_case"),
            lambda amenity: F.coalesce(F.element_at(mapping_expr, amenity), F.lit("Other"))
        )
    )
).withColumn(
    "themes",
    F.coalesce(F.expr("transform(popularAmenities, x -> initcap(replace(x, '_', ' ')))"), F.array())
)

df = df.select(
    "expedia_id",
    "amenities",
    "amenities_count",
    "amenity_categories",
    "themes"
)

df.show(5, truncate=False)



  """


+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
df.printSchema()

root
 |-- expedia_id: string (nullable = true)
 |-- amenities: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- amenities_count: integer (nullable = false)
 |-- amenity_categories: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- themes: array (nullable = false)
 |    |-- element: string (containsNull = true)

