In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, avg


spark = SparkSession.builder.appName("AmenitiesWordCount").getOrCreate()
df = spark.read.csv('airbnb-listings.csv', header=True, inferSchema=True)

# group by Amenity
df_exploded = df.withColumn('Amenity', explode(split(col('Amenities'), ',')))

# wordcount
word_counts = df_exploded.groupBy('Amenity').count().orderBy(col('count').desc())

# top 10 frequent word
top_words = word_counts.limit(10)

# top 10's avg price
average_prices_ame = df_exploded.groupBy('Amenity').agg(avg(col('Price')).alias('AveragePrice'))


top_words_with_price = top_words.join(average_prices_ame, 'Amenity').select('Amenity', 'count', 'AveragePrice')
top_words_with_price.orderBy(col('count').desc()).limit(10).show()

# 停止 SparkSession
spark.stop()

+-----------------+------+------------------+
|          Amenity| count|      AveragePrice|
+-----------------+------+------------------+
|Wireless Internet|301897|  139.894383855269|
|          Kitchen|296850|141.42693088455155|
|          Heating|285280|138.53689722531155|
|       Essentials|272512|140.51520093750932|
|           Washer|235544|142.11130688508692|
|               TV|225280|  154.405368044634|
|         Internet|190759| 144.2520076196168|
|          Hangers|183868| 138.5601776658159|
|          Shampoo|183126| 141.3421657196129|
|   Smoke detector|177721|147.89176864200925|
+-----------------+------+------------------+



In [2]:
spark = SparkSession.builder.appName("AmenitiesWordCount").getOrCreate()
df = spark.read.csv('airbnb-listings.csv', header=True, inferSchema=True)

# group by Transit
df_exploded = df.withColumn('Transit', explode(split(col('Transit'), ',')))

# wordcount
word_counts = df_exploded.groupBy('Transit').count().orderBy(col('count').desc())

# top 10 frequent word
top_words = word_counts.limit(10)

# top 10's avg price
average_prices_transit = df_exploded.groupBy('Transit').agg(avg(col('Price')).alias('AveragePrice'))


top_words_with_price = top_words.join(average_prices_transit, 'Transit').select('Transit', 'count', 'AveragePrice')
top_words_with_price.orderBy(col('count').desc()).limit(10).show()

# 停止 SparkSession
spark.stop()

+---------------+-----+------------------+
|        Transit|count|      AveragePrice|
+---------------+-----+------------------+
|          phone|13248| 9.730272202364587|
|          email|13087| 9.733401240855635|
|        reviews|12718| 9.731131643948403|
|          jumio| 7965| 9.738031119090365|
|  United States| 5892| 9.904449741756059|
|            1.0| 5706| 9.570974576271187|
| United Kingdom| 3762|  9.66472602739726|
|          Spain| 3389| 9.617452440033086|
|       facebook| 3288| 9.789454545454545|
|         France| 3044|10.288540534253647|
+---------------+-----+------------------+



In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from pyspark.sql.types import IntegerType, FloatType

# Create a SparkSession
spark = SparkSession.builder \
    .appName("AirbnbDataAnalysis") \
    .getOrCreate()

# Read the CSV file into a DataFrame
df = spark.read \
    .option("header", "true") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .csv("airbnb-listings.csv")

df = df.dropna(subset=["Host Response Time"])
# Cast the 'Number of Reviews' to Integer and 'Last Review' to Date
df = df.withColumn("Number of Reviews", df["Number of Reviews"].cast(IntegerType()))
df = df.withColumn("Last Review", to_date(df["Last Review"], "yyyy-MM-dd"))
df = df.withColumn("Review Scores Rating", df["Review Scores Rating"].cast(FloatType()))

# Create a temporary view to run SQL queries
df.createOrReplaceTempView("listings")

# Now run the SQL query
query = """
SELECT 
    `Host Response Time`,
    COUNT(`Number of Reviews`) AS Count_Number_of_Reviews,
    AVG(`Review Scores Rating`) AS Avg_Review_Scores_Rating
FROM listings
WHERE `Number of Reviews` > 50 AND `Last Review` > '2016-01-01'
GROUP BY `Host Response Time`
ORDER BY AVG(`Review Scores Rating`) DESC
"""

result = spark.sql(query)
result.show(4)

# Stop the Spark session
spark.stop()

+------------------+-----------------------+------------------------+
|Host Response Time|Count_Number_of_Reviews|Avg_Review_Scores_Rating|
+------------------+-----------------------+------------------------+
|within a few hours|                   5144|       93.33547257876313|
|    within an hour|                  18333|       92.95301757066463|
|      within a day|                   2047|       92.86112469437653|
|a few days or more|                    119|        89.8655462184874|
+------------------+-----------------------+------------------------+
only showing top 4 rows



In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
from pyspark.sql.window import Window
from pyspark.sql import functions as F


spark = SparkSession.builder.appName("TopCitiesNeighbourhoodAnalysis").getOrCreate()

df = spark.read \
    .option("header", "true") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .csv("airbnb-listings.csv")


df = df.na.drop(subset=["City", "Neighbourhood", "Property Type", "Review Scores Rating"])

df = df.withColumn("Review Scores Rating", col("Review Scores Rating").cast("float"))

df.createOrReplaceTempView("listings")

query = """
WITH CityFrequency AS (
    SELECT City, COUNT(*) AS Listings
    FROM listings
    GROUP BY City
    ORDER BY Listings DESC
    LIMIT 10
),
NeighbourhoodFrequency AS (
    SELECT City, Neighbourhood, COUNT(*) AS Listings
    FROM listings
    WHERE City IN (SELECT City FROM CityFrequency)
    GROUP BY City, Neighbourhood
),
MaxNeighbourhoodPerCity AS (
    SELECT City, MAX(Listings) AS MaxListings
    FROM NeighbourhoodFrequency
    GROUP BY City
),
TopNeighbourhoods AS (
    SELECT nf.City, nf.Neighbourhood
    FROM NeighbourhoodFrequency nf
    INNER JOIN MaxNeighbourhoodPerCity mnc
    ON nf.City = mnc.City AND nf.Listings = mnc.MaxListings
)
SELECT tn.City, tn.Neighbourhood, l.`Property Type`, AVG(l.`Review Scores Rating`) AS AvgRating, COUNT(tn.Neighbourhood) AS COUNTS
FROM TopNeighbourhoods tn
JOIN listings l ON tn.City = l.City AND tn.Neighbourhood = l.Neighbourhood
GROUP BY tn.City, tn.Neighbourhood, l.`Property Type`
HAVING COUNTS >= 100
"""

result = spark.sql(query)
result.orderBy(col('City').asc()).show(100)

+-----------+----------------+---------------+-----------------+------+
|       City|   Neighbourhood|  Property Type|        AvgRating|COUNTS|
+-----------+----------------+---------------+-----------------+------+
|  Amsterdam|        Oud-West|      Apartment|94.33739130434783|  1150|
|     Berlin|        Neukölln|      Apartment| 93.6020482809071|  1367|
|   Brooklyn|    Williamsburg|      Apartment|93.89115646258503|  1617|
|   Brooklyn|    Williamsburg|           Loft| 94.4014598540146|   137|
|  København|        Nørrebro|      Apartment|94.19538572458544|  1387|
|     London| LB of Islington|          House|92.35036496350365|   137|
|     London| LB of Islington|      Apartment|92.46984924623115|   398|
|Los Angeles|    Mid-Wilshire|      Apartment|92.56801195814649|   669|
|Los Angeles|    Mid-Wilshire|          House|94.08243727598567|   279|
|   New York| Upper West Side|      Apartment|93.18666666666667|   900|
|      Paris|      Montmartre|      Apartment|92.30014124293785|