In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

In [2]:
spark = SparkSession.builder.appName('Seattle').getOrCreate()

In [3]:
# Read dataset as Spark dataframe
url = 'https://raw.githubusercontent.com/ZacksAmber/Kaggle-Seattle-Airbnb/main/data/201601/listings.csv'
spark_df = spark.read.options(inferSchema=True, header=True, sep=',', quote="\"", escape="\"", multiLine=True).csv(url)

Py4JJavaError: An error occurred while calling o32.csv.
: java.lang.UnsupportedOperationException
	at org.apache.hadoop.fs.http.AbstractHttpFileSystem.listStatus(AbstractHttpFileSystem.java:91)
	at org.apache.hadoop.fs.http.HttpsFileSystem.listStatus(HttpsFileSystem.java:23)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225)
	at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66)
	at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:578)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:416)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:796)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
# Remove '$' in the following columns, then format the column from string to int
dollar_cols = ['price', 'weekly_price', 'monthly_price', 'extra_people', 'security_deposit', 'cleaning_fee']
for dollar_col in dollar_cols:
    spark_df = spark_df.withColumn(dollar_col, regexp_replace(dollar_col, '\$', ''))
    spark_df = spark_df.withColumn(dollar_col, col(dollar_col).cast("Integer"))

In [None]:
# Create a SQL view for Spark SQL query
spark_df.createOrReplaceTempView('spark_view')
# Export spark dataframe to pandas dataframe
pandas_df = spark_df.toPandas()

# Now we have 1 Spark dataframe, 1 Spark view, 1 Pandas dataframe
# We can query data from all of the three data source

In [None]:
# Query data from Spark view
sql = """
SELECT property_type, AVG(price) AS avg_price
FROM spark_view
GROUP BY property_type
"""

spark.sql(sql).show(5)

In [None]:
# Query data from Pandas dataframe
pandas_df.groupby('property_type')['price'].mean().head()

In [None]:
# Query data from Spark dataframe
spark_df.groupby('property_type').mean('price').toPandas().head()

In [None]:
# Query data from Spark view
sql = """
SELECT 
    property_type, 
    SUM(price) AS sum_price,
    MIN(price) AS min_price,
    AVG(price) AS avg_price,
    MAX(price) AS max_price
FROM spark_view
GROUP BY property_type
ORDER BY 4 DESC
"""

spark.sql(sql).show(5)

In [None]:
# Query data from Pandas dataframe
pandas_df.groupby('property_type')['price'].agg(['sum', 'min', 'mean', 'max']).sort_values(by='mean', ascending=False).head()

In [None]:
# Query data from Spark dataframe
import pyspark.sql.functions as F

spark_df.groupby('property_type') \
    .agg(F.sum('price').alias('sum_price'), \
         F.min('price').alias('min_price'), \
         F.mean('price').alias('avg_price'), \
         F.max('price').alias('max_price') \
    ).sort('avg_price', ascending=False).show(5)