# 1. Setup Your PySpark Notebook

In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("AirbnbMLPipeline") \
    .config("spark.jars", "/home/jovyan/work/jars/postgresql-42.7.7.jar") \
    .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-42.7.7.jar") \
    .getOrCreate()

# Verify configuration
print("Configured jars:")
print("spark.jars:", spark.conf.get("spark.jars"))
print("spark.driver.extraClassPath:", spark.conf.get("spark.driver.extraClassPath"))

# Load data from PostgreSQL
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/airbnb_amsterdam") \
    .option("dbtable", "airbnb_cleaned_listings") \
    .option("user", "airbnb") \
    .option("password", "airbnb123") \
    .option("driver", "org.postgresql.Driver") \
    .load()

# Show the schema and some data
df.printSchema()
df.show(5)

Configured jars:
spark.jars: /home/jovyan/work/jars/postgresql-42.7.7.jar
spark.driver.extraClassPath: /home/jovyan/work/jars/postgresql-42.7.7.jar


Py4JJavaError: An error occurred while calling o147.load.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


# 2. Data Preparation

In [None]:
from pyspark.sql.functions import col

# Drop rows with nulls in relevant columns
df_ml = df.select(
    col("price").cast("double"),
    col("availability_365").cast("double"),
    col("number_of_reviews").cast("double"),
    col("minimum_nights").cast("double")
).dropna()

# 3. Feature Engineering

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

assembler = VectorAssembler(
    inputCols=["availability_365", "number_of_reviews", "minimum_nights"],
    outputCol="unscaled_features"
)
df_assembled = assembler.transform(df_ml)

scaler = StandardScaler(inputCol="unscaled_features", outputCol="features", withStd=True, withMean=False)
df_scaled = scaler.fit(df_assembled).transform(df_assembled)

# 4. Clustering Model: KMeans

In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=4, seed=1, featuresCol="features", predictionCol="cluster")
model = kmeans.fit(df_scaled)
df_clustered = model.transform(df_scaled)

df_clustered.select("price", "cluster").groupBy("cluster").avg("price").show()

# 5. Predictive Model: Linear Regression for Price

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Split into train/test
train_data, test_data = df_scaled.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol="features", labelCol="price")
lr_model = lr.fit(train_data)

predictions = lr_model.transform(test_data)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")

rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.2f}")

# 6. Save Models

In [None]:
model.save("/home/jovyan/work/ml/kmeans_model")
lr_model.save("/home/jovyan/work/ml/linear_regression_model")

# Export Final Datasets

## 1. Clustered Listings (from KMeans)

In [None]:
df_clustered.toPandas().to_csv("/home/jovyan/work/data/clustered_listings.csv", index=False)

## 2. Regression Predictions (Price Prediction)

In [None]:
predictions.select("price", "prediction", "availability_365", "number_of_reviews", "minimum_nights") \
    .toPandas().to_csv("/home/jovyan/work/data/ml_predictions.csv", index=False)