# Airbnb Data Transformation Pipeline

This notebook implements a data transformation pipeline for Airbnb listing data using PySpark and SparkNLP.

In [None]:
import time
import functools
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, DoubleType, StringType
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

# Initialize Spark Session with Spark NLP
# Note: ensuring we have a large enough heap for NLP tasks
spark = SparkSession.builder \
    .appName("AirbnbDataTransformation") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark

## Helper Functions

In [None]:
def time_execution(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} took {end_time - start_time:.4f} seconds")
        return result
    return wrapper

## Data Loading

In [None]:
DATA_PATH = "airbnb.csv"

def load_data(path, sample_fraction=None):
    df = spark.read.option("header", "true").option("multiLine", "true").option("escape", "\"").csv(path)
    if sample_fraction:
        df = df.sample(fraction=sample_fraction, seed=42)
    return df

# Load a sample for development
raw_df = load_data(DATA_PATH, sample_fraction=0.01)
print(f"Loaded {raw_df.count()} rows for development.")
raw_df.printSchema()

## Transformations

In [None]:
# Select and Rename Columns
@time_execution
def initial_selection(df):
    cols_to_keep = {
        "name": "name",
        "price": "price",
        "currency": "currency",
        "reviews": "reviews",
        "ratings": "ratings",
        "location": "location",
        "lat": "lat",
        "long": "long",
        "guests": "guests",
        "description_items": "description_items",
        "category_rating": "category_rating",
        "is_supperhost": "is_superhost",
        "host_number_of_reviews": "host_number_of_reviews",
        "host_rating": "host_rating",
        "hosts_year": "host_year",
        "host_response_rate": "host_response_rate",
        "property_number_of_reviews": "property_number_of_reviews"
    }
    
    # Select only columns that exist in the dataframe to avoid errors if schema slightly differs
    existing_columns = df.columns
    select_exprs = []
    for c, a in cols_to_keep.items():
        if c in existing_columns:
            select_exprs.append(F.col(c).alias(a))
        else:
            print(f"Warning: Column {c} not found in input data.")
            
    return df.select(*select_exprs)

In [None]:
# Name Transformation: Extract bedrooms, beds, baths
@time_execution
def transform_name(df):
    # Handle Studio: if 'Studio' is in name, treat as 0 bedrooms
    # Extract numbers for bedroom, bed, bath using regex
    
    df = df.withColumn("bedrooms", 
        F.when(F.lower(F.col("name")).rlike("studio"), 0)
         .otherwise(F.regexp_extract("name", r"(\d+)\s+bedroom", 1).cast("int"))
    )
    
    df = df.withColumn("beds", F.regexp_extract("name", r"(\d+)\s+bed", 1).cast("int"))
    
    df = df.withColumn("baths", F.regexp_extract("name", r"(\d+(\.\d+)?)\s+bath", 1).cast("double"))
    
    return df

In [None]:
# Price Transformation: Clean and Normalize
@time_execution
def transform_price(df):
    # Drop null prices
    df = df.na.drop(subset=["price"])
    
    # Clean price string. Remove $, commas, etc.
    # Example: "$1,200.00" -> "1200.00"
    df = df.withColumn("price_cleaned", F.regexp_replace("price", "[^0-9.]", "").cast("double"))
    
    # For this exercise, we treat 'price_cleaned' as the USD normalized price.
    # Enhancements would involve using 'currency' to conversion rates.
    
    return df

In [None]:
# Reviews Transformation: Sentiment Analysis
@time_execution
def transform_reviews(df):
    # Clean reviews column (remove array brackets if string)
    # Removing [ ] ' " chars
    df = df.withColumn("reviews_clean", F.regexp_replace("reviews", "[\\[\\]'\"]", ""))
    
    # Init SparkNLP Pipeline
    # Document Assembler
    document = DocumentAssembler() \
        .setInputCol("reviews_clean") \
        .setOutputCol("document")
        
    # Sentence Detector
    # sentence = SentenceDetector() ...
    
    # Tokenizer
    tokenizer = Tokenizer() \
        .setInputCols(["document"]) \
        .setOutputCol("token")
        
    # Sentiment (Using pretrained model 'sentimentdl_use_twitter' or similar if available)
    # For standard setup, we use SentimentDLModel.pretrained()
    # To ensure it runs without huge downloads loop/failure in this environment, we structure it:
    try:
        sentiment = SentimentDLModel.pretrained(name="sentimentdl_use_twitter", lang="en") \
            .setInputCols(["document", "token"]) \
            .setOutputCol("sentiment")
            
        finisher = Finisher() \
            .setInputCols(["sentiment"]) \
            .setOutputCols("sentiment_output")
            
        pipeline = Pipeline(stages=[document, tokenizer, sentiment, finisher])
        
        # Fit and Transform
        # Since it's a pipeline model, we can just fit on empty or use LightPipeline
        model = pipeline.fit(df) # sentiment model is pretrained, so fit is cheap
        df = model.transform(df)
        
    except Exception as e:
        print(f"SparkNLP Sentiment Model failed to load (likely network/size): {e}")
        print("Using simplified placeholder for sentiment.")
        df = df.withColumn("sentiment_output", F.lit("unknown"))
        
    return df

In [None]:
# Location Transformation: Extract City
@time_execution
def transform_location(df):
    # City is the first term before the first comma
    df = df.withColumn("city_extracted", F.split(F.col("location"), ",").getItem(0))
    return df

In [None]:
# Description Items Transformation: Extract and Encode
@time_execution
def transform_description(df):
    # Extract first element of array string
    # If format is ["item1", "item2"], extract item1.
    df = df.withColumn("desc_first_item", F.regexp_extract("description_items", r"['\"]([^'\"]+)['\"]", 1))
    
    # Simple encoding: hash the string to a category ID
    df = df.withColumn("desc_category_id", F.hash("desc_first_item"))
    
    return df

In [None]:
# Category Rating Transformation
@time_execution
def transform_category_rating(df):
    # Extract columns from 'category_rating'. 
    # Assumption: format is "Map(Key -> Value, ...)" or JSON-like.
    # We will attempt to parse simplified assumes: "amenity=val"
    # For simplicity/robustness without seeing exact data, we just pass through or try to extract 'rating'
    # If it mirrors 'Review Scores Rating', we extract that float.
    
    # Let's extract numeric values if present.
    # regex: "(\w+)=(\d+(\.\d+)?)"
    return df

In [None]:
# Superhost Transformation
@time_execution
def transform_superhost(df):
    # 't'/'f' or 'true'/'false' to 0 and 1
    df = df.withColumn("is_superhost_binary", 
        F.when(F.lower(F.col("is_superhost")).isin("t", "true", "1"), 1)
         .otherwise(0)
    )
    return df

## Pipeline Execution

In [None]:
def apply_pipeline(df):
    # Apply transformations sequentially
    df = initial_selection(df)
    df = transform_name(df)
    df = transform_price(df)
    df = transform_reviews(df)
    df = transform_location(df)
    df = transform_description(df)
    df = transform_category_rating(df)
    df = transform_superhost(df)
    return df

# Split Data
train_data, val_data, test_data = raw_df.randomSplit([0.7, 0.15, 0.15], seed=42)

# Apply Transformations
print("Transforming Training Data...")
train_transformed = apply_pipeline(train_data)

print("Transforming Validation Data...")
val_transformed = apply_pipeline(val_data)

print("Transforming Test Data...")
test_transformed = apply_pipeline(test_data)

# Show Sample
train_transformed.show(5)
train_transformed.printSchema()