Data Source: Go to http://insideairbnb.com/get-the-data.html and search London, UK to get the data for 06 September, 2023.
#####Important considerations:##### 
1. You are required to ask 10 questions from the dataset and answer them. 
2. Make sure 6-7 out of these 10 questions are quantifiable i.e there should be a definite numerical output of that question. Other 3-4 questions could be non-quantifiable questions (For eg: Answering how the price and reviews vary with respect to the type of accommodation, is a non quantifiable question.) 
3. Divide your code for each question into 2 parts: 
    1. The first part does any pre-processing that is required for that question. You are required to put all the pre-processing like data cleaning etc. in 1 block of code. The output of this block should be a clean dataset on which the final analysis would be done. 
    2. The second part of the code would use the cleaned data from step (a) and run the core analysis code. Put all the required analysis in 1 block of code. 
4. Your analysis will be graded based on 
    1. Business value of the finding
    2. How non-obvious the questions and the insights are.
    3. Correctness of the questions and answers. 
    4. Intermediate steps to demonstrate your thought process from questions to answers. 
5. Both positive as well as negative results are interesting to us if you can justify your hypothesis and insights. 
6. You are required to submit the solution as an ipython notebook to ivana.kruhoberec@turing.com.

In [1]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

# start spark session
spark = (
    SparkSession.builder
    .master('local')
    .appName('PySpark_01')
    .getOrCreate()
    )

23/11/08 21:57:08 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.15.84 instead (on interface enp4s0)
23/11/08 21:57:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/08 21:57:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Exploratory Analysis

## Schema Overview :

In [2]:
listings = spark.read.csv('data/listings-detailed.csv', header=True, inferSchema=True, multiLine=True, escape='"')
reviews = spark.read.csv('data/reviews-detailed.csv', header=True, inferSchema=True, multiLine=True, escape='"')
listings.printSchema()
reviews.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]

root
 |-- id: long (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: long (nullable = true)
 |-- last_scraped: date (nullable = true)
 |-- source: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_about: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_thumbnail_url: string (nullable = true)
 |-- host_picture_url: string (nullable = true)
 |-- host_neighbourhood: string (nullable = true)
 |-- host_listings_count: int

                                                                                

In [3]:
listings_count = listings.count()
reviews_count = reviews.count()
print(f"Listings Count: ", listings_count)
print(f"Reviews Count: ", reviews_count)

23/11/08 21:57:24 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Listings Count:  87946
Reviews Count:  1581033


                                                                                

# Questions

## 1. How many distinct neighbourhoods are there?

### Preprocessing

In [4]:
df = listings.select("id", "neighbourhood_cleansed")

### Core analysis

In [5]:
distinct_nh = df.select("neighbourhood_cleansed").distinct()
print(f"There are {distinct_nh.count()} distinct neighbourhoods")
distinct_nh.show(truncate=False)

                                                                                

There are 33 distinct neighbourhoods


                                                                                

+----------------------+
|neighbourhood_cleansed|
+----------------------+
|Wandsworth            |
|Croydon               |
|Bexley                |
|Lambeth               |
|Barking and Dagenham  |
|Camden                |
|Greenwich             |
|Newham                |
|Tower Hamlets         |
|Barnet                |
|Hounslow              |
|Harrow                |
|Kensington and Chelsea|
|Islington             |
|Brent                 |
|Haringey              |
|Bromley               |
|Merton                |
|Westminster           |
|Southwark             |
+----------------------+
only showing top 20 rows



## 2. What is the average listing price (per neighbourhood) ?

### Preprocessing

In [6]:
df = listings.select("id", "neighbourhood_cleansed", "price")

# cast price to float
get_price = udf(lambda x: float(x.replace("$", "").replace(",", "")))
df = df.withColumn("price", get_price("price").cast(FloatType()))

### Core analysis

In [7]:
grouped = df.groupBy("neighbourhood_cleansed").agg(mean('price'))
grouped = grouped.sort(col("avg(price)").desc())
grouped.show(truncate=False)

[Stage 19:>                                                         (0 + 1) / 1]

+----------------------+------------------+
|neighbourhood_cleansed|avg(price)        |
+----------------------+------------------+
|Westminster           |320.59137309847875|
|Kensington and Chelsea|307.43279477483674|
|Barking and Dagenham  |245.8404255319149 |
|City of London        |243.72201492537314|
|Camden                |203.2881709223554 |
|Hounslow              |182.45570630486833|
|Southwark             |180.60518444666002|
|Brent                 |179.07919708029198|
|Hammersmith and Fulham|177.2308300395257 |
|Islington             |173.16919786096256|
|Wandsworth            |167.57320660782983|
|Richmond upon Thames  |163.40254574383454|
|Barnet                |160.5233437083148 |
|Merton                |153.22092222986925|
|Newham                |151.51016875811337|
|Haringey              |149.82787573467675|
|Lambeth               |141.03875157100964|
|Kingston upon Thames  |134.80400572246066|
|Tower Hamlets         |133.06963276836157|
|Hackney               |132.9322

                                                                                

## 3. What is the average minimum stay (per neighbourhood) ?

### Preprocessing

In [8]:
df = listings.select("id", "neighbourhood_cleansed", "minimum_nights")

### Core Analysis

In [9]:
grouped = df.groupBy("neighbourhood_cleansed").agg(round(mean('minimum_nights')).cast(IntegerType()).alias("avg_min_stay"))
grouped = grouped.sort(col("avg_min_stay").desc())
grouped.show(truncate=False)

[Stage 22:>                                                         (0 + 1) / 1]

+----------------------+------------+
|neighbourhood_cleansed|avg_min_stay|
+----------------------+------------+
|Kensington and Chelsea|8           |
|Greenwich             |7           |
|Tower Hamlets         |7           |
|Barking and Dagenham  |6           |
|Camden                |6           |
|Harrow                |6           |
|Westminster           |6           |
|Richmond upon Thames  |6           |
|Wandsworth            |5           |
|Croydon               |5           |
|Bexley                |5           |
|Lambeth               |5           |
|Newham                |5           |
|Barnet                |5           |
|Islington             |5           |
|Brent                 |5           |
|Haringey              |5           |
|Southwark             |5           |
|Hackney               |5           |
|Ealing                |5           |
+----------------------+------------+
only showing top 20 rows



                                                                                

## 4. How many listings for each type of rooms are there ?

### Preprocessing

In [10]:
room_types = listings.select("id", "room_type")

### Core analysis

In [11]:
room_types.groupBy("room_type").agg(count("room_type").alias("count")).show()

[Stage 25:>                                                         (0 + 1) / 1]

+---------------+-----+
|      room_type|count|
+---------------+-----+
|    Shared room|  441|
|     Hotel room|  219|
|Entire home/apt|54575|
|   Private room|32711|
+---------------+-----+



                                                                                

## 5. What is the average minimum monthly income per type of room ?
Used number of reviews per month to estimate average monthly income per type of room

### Preprocessing

In [12]:
listings_df = listings.select("id", "room_type", "price", "minimum_nights")
# cast price to float
get_price = udf(lambda x: float(x.replace("$", "").replace(",", "")))
listings_df = listings_df.withColumn("price", get_price("price").cast(FloatType()))

reviews_df = reviews.select("listing_id", "date")
reviews_df = reviews_df.withColumn("year", year("date"))
reviews_df = reviews_df.withColumn("month", month("date"))

### Core analysis

In [13]:
reviews_count = reviews_df.groupBy("listing_id", "year", "month").agg(count("month").alias("count"))

# join reviews_count with listings
joined = listings_df.join(reviews_count, listings_df['id'] == reviews_count['listing_id'])
joined = joined.withColumn("income", col("price") * col("minimum_nights") * col("count"))

output = joined.groupBy("room_type").agg(mean("income"))

## 6. What is the average minimum monthly income per neighbourhood ?
Used number of reviews per month to estimate average monthly income per neighbourhood

### Preprocessing

In [14]:
listings_df = listings.select("id", "neighbourhood_cleansed", "price", "minimum_nights")
# cast price to float
get_price = udf(lambda x: float(x.replace("$", "").replace(",", "")))
listings_df = listings_df.withColumn("price", get_price("price").cast(FloatType()))

reviews_df = reviews.select("listing_id", "date")
reviews_df = reviews_df.withColumn("year", year("date"))
reviews_df = reviews_df.withColumn("month", month("date"))

### Core analysis

In [15]:
reviews_count = reviews_df.groupBy("listing_id", "year", "month").agg(count("month").alias("count"))

# join reviews_count with listings
joined = listings_df.join(reviews_count, listings_df['id'] == reviews_count['listing_id'])
joined = joined.withColumn("income", col("price") * col("minimum_nights") * col("count"))

output = joined.groupBy("neighbourhood_cleansed").agg(mean("income").alias("avg_income"))
output = output.orderBy(col("avg_income").desc())
output.show(truncate=False)

[Stage 31:>                                                         (0 + 1) / 1]

+----------------------+------------------+
|neighbourhood_cleansed|avg_income        |
+----------------------+------------------+
|Haringey              |4449.199930839721 |
|Tower Hamlets         |3987.895461054729 |
|City of London        |2973.5594537815127|
|Kensington and Chelsea|2638.23090876111  |
|Westminster           |2193.1321252898365|
|Camden                |1901.4151713992605|
|Richmond upon Thames  |1851.3892176732875|
|Islington             |1743.6111081555568|
|Brent                 |1563.6519673268913|
|Hackney               |1457.3300889940294|
|Hammersmith and Fulham|1421.567107875579 |
|Southwark             |1289.194051026758 |
|Barking and Dagenham  |1222.826417967263 |
|Havering              |1211.5052631578947|
|Greenwich             |1202.5114285714285|
|Newham                |1129.5178279126042|
|Lambeth               |1052.8567109436956|
|Wandsworth            |973.7577339396656 |
|Bexley                |953.9405548216645 |
|Barnet                |879.8700

                                                                                

## 7. Who are the best rated hosts ?

### Preprocessing

In [16]:
df = listings.select("host_id", "host_name", "review_scores_rating")
df = df.fillna(0)

### Core analysis

In [17]:
avg_ratings = df.groupBy("host_id").agg(mean("review_scores_rating").alias("avg_rating"))
output = df.join(avg_ratings, df["host_id"] == avg_ratings['host_id']).select("host_name", "avg_rating").orderBy(col("avg_rating").desc(), "host_name")
output.show()

[Stage 38:>                                                         (0 + 1) / 1]

+-----------------+----------+
|        host_name|avg_rating|
+-----------------+----------+
|           5 Star|       5.0|
| 5 The Appartment|       5.0|
|56 Welbeck Street|       5.0|
|                A|       5.0|
|                A|       5.0|
|               A.|       5.0|
|              A.J|       5.0|
|             ALex|       5.0|
|            Aadil|       5.0|
|            Aadil|       5.0|
|        Aakanksha|       5.0|
|        Aakanksha|       5.0|
|          Aaliyah|       5.0|
|             Aapo|       5.0|
|            Aarav|       5.0|
|            Aarav|       5.0|
|            Aaron|       5.0|
|            Aaron|       5.0|
|            Aaron|       5.0|
|            Aaron|       5.0|
+-----------------+----------+
only showing top 20 rows



                                                                                

## 8. Does being a super host affects ratings?

### Preprocessing

In [18]:
df = listings.select("host_id", "host_name", "review_scores_rating", "host_is_superhost").dropna()



### Core analysis

In [19]:
df.groupBy("host_is_superhost").agg(mean("review_scores_rating").alias("avg_rating"),std("review_scores_rating").alias("std")).show()

[Stage 39:>                                                         (0 + 1) / 1]

+-----------------+-----------------+-------------------+
|host_is_superhost|       avg_rating|                std|
+-----------------+-----------------+-------------------+
|                f|4.534971122346671| 0.8182240058825446|
|                t| 4.85830605564648|0.21157671281516274|
+-----------------+-----------------+-------------------+



                                                                                

Answer : There is evidence that being a superhost impacts on the ratings

## 9. Does being a super host affects monthly income?

### Preprocessing

In [20]:
listings_df = listings.select("id", "host_is_superhost", "price", "minimum_nights").dropna()
# cast price to float
get_price = udf(lambda x: float(x.replace("$", "").replace(",", "")))
listings_df = listings_df.withColumn("price", get_price("price").cast(FloatType()))

reviews_df = reviews.select("listing_id", "date")
reviews_df = reviews_df.withColumn("year", year("date"))
reviews_df = reviews_df.withColumn("month", month("date"))

### Core analysis

In [21]:
# join review_counts by listing
reviews_count = reviews_df.groupBy("listing_id", "year", "month").agg(count("month").alias("count"))

# join reviews_count with listings
joined = listings_df.join(reviews_count, listings_df['id'] == reviews_count['listing_id'])
joined = joined.withColumn("income", col("price") * col("minimum_nights") * col("count")).filter(col("income") != 0)

output = joined.groupBy("host_is_superhost").agg(mean("income").alias("avg_income"), std("income").alias("std"))
output = output.orderBy(col("avg_income").desc())
output.show(truncate=False)

                                                                                

+-----------------+------------------+-----------------+
|host_is_superhost|avg_income        |std              |
+-----------------+------------------+-----------------+
|f                |1908.7607457790778|66911.17855052992|
|t                |1400.2666391710059|5325.1437143836  |
+-----------------+------------------+-----------------+



Answer: Inconclusive analysis, the standard deviation is too high to afirm anything.

## 10. What are the best rated neighbourhoods ? 

### Preprocessing

In [22]:
df = listings.select("id", "neighbourhood_cleansed", "review_scores_rating").dropna()

### Core analysis

In [23]:
output = df.groupBy("neighbourhood_cleansed").agg(mean("review_scores_rating").alias("avg_rating"), std("review_scores_rating").alias("std"))
output.orderBy(col("avg_rating").desc()).show()

[Stage 49:>                                                         (0 + 1) / 1]

+----------------------+------------------+------------------+
|neighbourhood_cleansed|        avg_rating|               std|
+----------------------+------------------+------------------+
|  Kingston upon Thames| 4.737269303201508|0.6098647243905227|
|  Richmond upon Thames| 4.706460732984299|0.7499652244051536|
|               Bromley| 4.685929054054057| 0.660433494796809|
|        Waltham Forest| 4.678620414673047|0.6592823696042672|
|               Hackney| 4.674173064951204|0.7113163417768752|
|                Merton|4.6691809523809535|0.7351294741568429|
|              Havering| 4.668877192982454|0.6434776791759963|
|                Sutton| 4.665501730103805|0.6286619174251032|
|            Wandsworth|4.6621602892437375|0.7921210511736919|
|               Lambeth| 4.654759473259864|0.7298239236997204|
|              Haringey|  4.64158426966292|0.7423362738955056|
|             Southwark| 4.627599586135534|0.6965096356018664|
|              Lewisham|4.6226882263769555|0.8050307251

                                                                                