# Project 3: Airbnb
**This is the third of three mandatory projects to be handed in as part of the assessment for the course 02807 Computational Tools for Data Science at Technical University of Denmark, autumn 2019.**

#### Practical info
- **The project is to be done in groups of at most 3 students**
- **Each group has to hand in _one_ Jupyter notebook (this notebook) with their solution**
- **The hand-in of the notebook is due 2019-12-05, 23:59 on DTU Inside**

#### Your solution
- **Your solution should be in Python/PySpark**
- **For each question you may use as many cells for your solution as you like**
- **You should not remove the problem statements**
- **Your notebook should be runnable, i.e., clicking [>>] in Jupyter should generate the result that you want to be assessed**
- **You are not expected to use machine learning to solve any of the exercises**

# Introduction
[Airbnb](http://airbnb.com) is an online marketplace for arranging or offering lodgings. In this project you will use Spark to analyze data obtained from the Airbnb website. The purpose of the analysis is to extract information about trends and patterns from the data.

The project has two parts.

### Part 1: Loading, describing and preparing the data
There's quite a lot of data. Make sure that you can load and correctly parse the data, and that you understand what the dataset contains. You should also prepare the data for the analysis in part two. This means cleaning it and staging it so that subsequent queries are fast.

### Par 2: Analysis
In this part your goal is to learn about trends and usage patterns from the data. You should give solutions to the tasks defined in this notebook, and you should use Spark to do the data processing. You may use other libraries like for instance Pandas and matplotlib for visualisation.

## Guidelines
- Processing data should be done using Spark. Once data has been reduced to aggregate form, you may use collect to extract it into Python for visualisation.
- Your solutions will be evaluated by correctness, code quality and interpretability of the output. This means that you have to write clean and efficient Spark code that will generate sensible execution plans, and that the tables and visualisations that you produce are meaningful and easy to read.
- You may add more cells for your solutions, but you should not modify the notebook otherwise.

### Create Spark session and define imports

In [None]:
from pyspark.sql import *
from pyspark.sql import functions as f
from pyspark.sql.types import *
import matplotlib.pyplot as plt
%matplotlib inline

spark = SparkSession.builder.appName("Project3_ys").getOrCreate()

# Part 1: Loading, describing and preparing the data
The data comes in two files. Start by downloading the files and putting them in your `data/` folder.

- [Listings](https://files.dtu.dk/u/siPzAasj8w2gI_ME/listings.csv?l) (5 GB)
- [Reviews](https://files.dtu.dk/u/k3oaPYp6GjKBeho4/reviews.csv?l) (9.5 GB)

### Load the data
The data has multiline rows (rows that span multiple lines in the file). To correctly parse these you should use the `multiline` option and set the `escape` character to be `"`.

In [None]:
listings = spark.read.option('header', True).\
    option('inferSchema', True).\
    option('multiLine', True).\
    option('escape', "\"").csv('../data/listings.csv')

In [None]:
reviews = spark.read.option('header', True).\
    option('inferSchema', True).\
    option('multiLine', True).\
    option('escape', "\"").csv('../data/reviews.csv')

### Describe the data
List the features (schema) and sizes of the datasets.

In [None]:
# Schema for listings.csv
listings.printSchema()

In [None]:
# size of listings.csv
print("The number of rows in listings.csv : {}".format(listings.count()))
print("The number of columns in listings.csv : {}".format(len(listings.columns)))

In [None]:
# Schema for reviews.csv
reviews.printSchema()

In [None]:
# size of reviews.csv
print("The number of rows in reviews.csv : {}".format(reviews.count()))
print("The number of columns in reviews.csv : {}".format(len(reviews.columns)))

### Prepare the data for analysis
You should prepare two dataframes to be used in the analysis part of the project. You should not be concerned with cleaning the data. There's a lot of it, so it will be sufficient to drop rows that have bad values. You may want to go back and refine this step at a later point when doing the analysis.

You may also want to consider if you can stage your data so that subsequent processing is more efficient (this is not strictly necessary for Spark to run, but you may be able to decrease the time you sit around waiting for Spark to finish things)

In [None]:
# drop all NA values in listings and count the number of rows omitting those null values
listings_na=listings.dropna()
listings_na.count()

In [None]:
# listings_filtered = listings.filter(f.col('price').isNotNull())

In [None]:
# drop all NA values in reviews and count the number of rows omitting those null values
reviews_na = reviews.dropna()
reviews_na.count()

In [None]:
# We sampled 10% of the listings.csv and 5% of the reviews.csv
# We write them into local disk, listings_sample.csv and reviews_sample.csv

# No need to run now
# sample=df.sample(False, 0.05, 33)
# sample.coalesce(1).write.csv('data/names_here.csv')

# Part 2: Analysis
Use Spark and your favorite tool for data visualization to solve the following tasks.

## The basics
Compute and show a table with the number of listings and neighbourhoods per city.

In [None]:
groupby_city=listings.groupBy('city').\
    agg(f.countDistinct('id').alias("Distinct Listings"),\
        f.countDistinct('neighbourhood_cleansed').alias("Distinct Neighbourhood")).cache()

In [None]:
groupby_city.show()

Based on the table above, you should choose a city that you want to continue your analysis for. The city should have mulitple neighbourhoods with listings in them.

Compute and visualize the number of listings of different property types per neighbourhood in your city.

In [None]:
# Let's check if Copenhagen fulfills the requirement:

In [None]:
groupby_city.filter(f.col("city") == "Copenhagen").show()

In [None]:
# Yup, it does. We will choose Copenhagen for further analysis.

In [None]:
# extract listings that are in Copenhagen
copenhagen=listings.filter(f.col("city") == "Copenhagen").cache()

In [None]:
# compute and show with a table the number of listings per neighbourhood per property type in Copenhagen.
copenhagen_groupby=copenhagen.groupBy('neighbourhood_cleansed','property_type').\
                    agg(f.countDistinct('id').alias("Listings Count")).\
                    orderBy('neighbourhood_cleansed', ascending=True)

In [None]:
copenhagen_groupby.show()

## Prices
Compute the minimum, maximum and average listing price in your city. 

In [None]:
# from the schema, it can be seen that the price column is of type string 
# using regular expression, we clean the strings and then cast the column type to float
copenhagen_cleanPrice = copenhagen.\
        withColumn('price', f.regexp_replace('price', '\$', '')).\
        withColumn('price', f.regexp_replace('price', ',', '')).\
        withColumn('price', f.col('price').cast('float'))

In [None]:
# compute the minimum, maximum and average listing price in Copenhagen
copenhagen_cleanPrice.select(f.min('price'), f.max('price'), f.avg('price')).show()

Compute and visualize the distribution of listing prices in your city.

In [None]:
# use groupby, count the number of listings at each price
price_distribution = copenhagen_cleanPrice.groupby("price").\
                        agg(f.countDistinct('id').alias("count"))

In [None]:
price_distribution.show()

In [None]:
# create a price list for histogram
price_list=[]
for row in price_distribution.collect():
    price_list.extend([row['price']] * int(row['count']))

Visualize price distribution using histogram. Adjust the bins and bin size:

In [None]:
# 1. bin size = 10
plt.hist(price_list,bins=10)
plt.title("Price distribution (10 bins)")
plt.xlabel("Price")
plt.ylabel("Number of listings")
plt.show()

In [None]:
# 2. 13 bins, interval = 200, long tail (price>10000) ignored
plt.hist(price_list,
         bins = [200,400,600,800,1000,1200,1400,1600,1800,2000,4000,6000,8000,10000])
plt.title("Price distribution (13 bins, price>10000 not included)")
plt.xlabel("Price")
plt.ylabel("Number of listings")
plt.show()

In [None]:
# 3. 10 bins, interval = 200, long tail (price>4000) ignored
plt.hist(price_list,
         bins = [200,400,600,800,1000,1200,1400,1600,1800,2000,4000])
plt.title("Price distribution (10 bins, price>4000 not included)")
plt.xlabel("Price")
plt.ylabel("Number of listings")
plt.show()

The value of a listing is its rating divided by its price.

Compute and show a dataframe with the 3 highest valued listings in each neighbourhood.

In [None]:
# cast rating and price column into float, then compute the value of listings and store the results in a new column
copenhagen_value=copenhagen.\
        withColumn('review_scores_rating', f.col('review_scores_rating').cast('float')).\
        withColumn('price', f.regexp_replace('price', '\$', '')).\
        withColumn('price', f.regexp_replace('price', ',', '')).\
        withColumn('price', f.col('price').cast('float')).\
        withColumn('value', f.col('review_scores_rating')/f.col('price'))

In [None]:
# create a window and sort the table by value
value_window = Window.partitionBy('neighbourhood_cleansed').orderBy(f.desc('value'))
ranked_df = copenhagen_value.withColumn('valueRank', f.rank().over(value_window))

In [None]:
# show the top 3 valued listing in each neighbourhood
ranked_df.filter(f.col('valueRank') <= 3).\
    orderBy('neighbourhood_cleansed', f.desc('value')).\
    select('id','neighbourhood_cleansed','value','valueRank').show()

## Trends
Now we want to analyze the "popularity" of your city. The data does not contain the number of bookings per listing, but we have a large number of reviews, and we will assume that this is a good indicator of activity on listings.

Compute and visualize the popularity (i.e., number of reviews) of your city over time.

In [None]:
# join reviews and listings tables on listing id, and keep date and neighbourhood columns
joined=reviews.select('listing_id','date').\
            join(copenhagen.select('id','neighbourhood_cleansed'), 
                 f.col('listing_id') == f.col('id'))

In [None]:
# extract the year, month and date from the date column and store each in a separate column
joined_clean=joined.withColumn('year', f.col('date').substr(1,4)).\
                    withColumn('month', f.col('date').substr(6,2)).\
                    withColumn('day', f.col('date').substr(9,2)).\
                    drop('id')

In [None]:
joined_clean.show()

In [None]:
# craete a cache for faster queries and operations
joined_clean.cache()

In [None]:
# count the number of reviews over the years
gb_year=joined_clean.groupBy('year').count().\
                withColumn('year', f.col('year').cast('Integer')).\
                orderBy('year', ascending=True)

In [None]:
gb_year.show()

In [None]:
# export and visualize
year = [row['year'] for row in gb_year.collect()]
review_count=[row['count'] for row in gb_year.collect()]

In [None]:
plt.plot(year,review_count)
plt.title("Number of reviews in Copenhagen (by year)")
plt.xlabel("Year")
plt.ylabel("Review Count")
plt.show()

Compute and visualize the popularity of neighbourhoods over time. If there are many neighbourhoods in your city, you should select a few interesting ones for comparison.

In [None]:
# count the number of reviews in each neighbourhood in each year
gb_neighbor_year=joined_clean.groupBy('neighbourhood_cleansed', 'year').count().\
                withColumn('year', f.col('year').cast('Integer')).\
                orderBy(['neighbourhood_cleansed','year'], ascending=[1,1])

In [None]:
gb_neighbor_year.show()

In [None]:
# create a dictionary to store the information, where key is name of the neighbourhood and value is a list of tuples 
# {neighbourhood: [(year1, count1), ...], ...}
dic={}
for row in gb_neighbor_year.collect():
    if row['neighbourhood_cleansed'] in dic:
        dic[row['neighbourhood_cleansed']] += [(row['year'],row['count'])]
    else:
        dic[row['neighbourhood_cleansed']] = [(row['year'],row['count'])]

In [None]:
# plot the number of reviews in each neighbourhood in each year
def plot():    
    fig, ax = plt.subplots(4, 3, figsize=(30,20))
    plt.subplots_adjust(wspace = 0.2, hspace = 0.7)
    
    for index,key in enumerate(dic.keys()):
        row = index//3
        col = index%3      
        ax[row, col].set_title(key)
        ax[row, col].set_xlabel("Year")
        ax[row, col].set_ylabel("Review Count")
        ax[row, col].plot([x[0] for x in dic[key]], [x[1] for x in dic[key]])

In [None]:
plot()

<font color="navy">Comparing the yearly trends of the reviews, it can be seen that most neighbourhoods experience an upward-then-downward trend, where the number of reviews peaked at 2015-2016 and has been on an decline since then. The only exception is Valby, where the number of reviews increased steadily from 2013 to 2019.</font>

Compute and visualize the popularity of your city by season. For example, visualize the popularity of your city per month.

In [None]:
# count the number of reviews by month
gb_month=joined_clean.groupBy('month').count().\
                withColumn('month', f.col('month').cast('Integer')).\
                orderBy('month', ascending=True)

In [None]:
gb_month.show()

In [None]:
# export and visualize
month = [row['month'] for row in gb_month.collect()]
review_count=[row['count'] for row in gb_month.collect()]

In [None]:
plt.plot(month,review_count)
plt.title("Number of reviews in Copenhagen (by month)")
plt.xlabel("Month")
plt.ylabel("Review Count")
plt.show()

In [None]:
# to see if there is any seasonality effects,
# use groupby to count the number of reviews per month per year
gb_year_month=joined_clean.groupBy('year','month').count().\
                withColumn('year', f.col('year').cast('Integer')).\
                withColumn('month', f.col('month').cast('Integer')).\
                orderBy(['year','month'], ascending=[1,1])

In [None]:
gb_year_month.show()

In [None]:
# export and visualize
review_count=[row['count'] for row in gb_year_month.collect()]
plt.title("Number of reviews in Copenhagen (by year & month)")
plt.xlabel("Month")
plt.ylabel("Review Count")
plt.plot(review_count)
plt.show()

<font color="navy">As can be seen from the plot, there is no clear seasonality in the first 40 months, perhaps due to the few number of users as Airbnb was just launched not long ago. Seasonality becomes more pronounced from the 40th month onwards, where a cyclic pattern is observed.

## Reviews
In this part you should determine which words used in reviews that are the most positive. 

The individual reviews do not have a rating of the listing, so we will assume that each review gave the average rating to the listing, i.e., the one on the listing.

You should assign a positivity weight to each word seen in reviews and list the words with the highest weight. It is up to you to decide what the weight should be. For example, it can be a function of the rating on the listing on which it occurs, the number of reviews it occurs in, and the number of unique listings for which it was used to review.

Depending on your choice of weight function, you may also want to do some filtering of words. For example, remove words that only occur in a few reviews.

<font color="navy">**To fulfil the above requirements, our group has come up with a plan that has the following steps:**

1. join reviews and listings on id, keep the comments and ratings column
<br>
<br>
2. clean and tokenize the comments column
<br>
<br>
3. for each word token, compute its average rating and store this information in a dictionary. This is achieved by:
    1. initializing a dictionary that looks like this: `{token1 : [sum_ratings, total_num_occurrences], ...}`
    2. iterating through every token, and update it accordingly
        - e.g. we start off by having a dictionary {"good": (70, 10)}. This means that so far, the word "good" has a cumulative rating of 70, and this word has occurred for 10 times already. 
        - Should we encounter the word "good" again in a review whose rating is 8, we will update the dictionary accordingly, so that the new dictionary will become {"good":(78, 11)}
    3. finally, computing the average rating of each token by using sum_ratings/total_num_occurrences
        - e.g. in the previous example {"good":(78, 11)}, the average rating of the word "good" will be 78/11 = 7.09
<br>
<br>
4. sort the dictionary and return words with the highest ratings</font>

In [None]:
# Install pip packages gensim in the current Jupyter kernel
# gensim is a Natural Language Processing library
import sys
!{sys.executable} -m pip install gensim

In [None]:
import gensim.parsing.preprocessing as gsp
from gensim import utils
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer

In [None]:
# set up filters for text cleaning
filters = [
           gsp.strip_tags,  # Conversion to lowercase
           gsp.strip_punctuation, # Removal of punctuations
           gsp.strip_multiple_whitespaces, # Removal of extra spaces
           gsp.strip_numeric, # Removal of integers, numbers
           gsp.remove_stopwords, # Removal of stop words (like ‘and’, ‘to’, ‘the’ etc)
           # gsp.stem_text # Stemming (Conversion of words to root form)
          ]

In [None]:
# join reviews and listings tables on listing id, and keep rating and comments columns
pos_table = listings.select('id','review_scores_rating').\
                join(reviews.select('listing_id','comments'), 
                     f.col('id') == f.col('listing_id')).drop('listing_id')

In [None]:
# drop rows where either rating or comments is null
pos_table = pos_table.dropna().cache()

In [None]:
# define a function to preprocess the raw comments
def preprocess_comments(x):
    x = x.lower() # set to lower case
    x = utils.to_unicode(x) # convert to unicode
    for f in filters:
        x = f(x)
    return x

In [None]:
# use pre-process_comments() as a udf
preprocess_comments_udf = f.udf(preprocess_comments)

# <font color="purple">YS's implementation</font>

In [None]:
# sample = pos_table.sample(False, 0.01, 33)

In [None]:
# apply preprocess_comments_udf to the comments column of the spark dataframe
pos_table_processed = pos_table.withColumn('processed_comments', 
                                preprocess_comments_udf(f.col('comments')))

In [None]:
# using Apache Spark to tokenize the words
tokenizer = Tokenizer(inputCol="processed_comments", outputCol="tokens")
pos_table_tokens = tokenizer.transform(pos_table_processed)

In [None]:
# prepare a dictionary that is used for storing and computing the positivity score for each word
ratings_dic = {}

In [None]:
def compute_ratings(row):
    rating = row[1] # rating is the 1st column (0-index based) in the spark dataframe 
    tokens = row[4] # tokens is the 4th column (0-index based) in the spark dataframe
    for token in tokens:
        if token not in ratings_dic:
            ratings_dic[token] = [0,0] # [m, n] where m is the sum of scores & n is the number of occurrences of this token
        ratings_dic[token][0] += int(rating)
        ratings_dic[token][1] += 1

In [None]:
for row in pos_table_tokens.collect():
    compute_ratings(row)

In [None]:
import pickle
pickle.dump(ratings_dic, open('rating_dictionary.p', 'wb'))

# <font color="purple">ZY's implementation</font>

In [None]:
# apply preprocess_comments_udf to the comments column of the spark dataframe
pos_table = pos_table.withColumn('processed_comments', preprocess_comments_udf(f.col('comments')))

In [None]:
# using Apache Spark to tokenize the words
tokenizer = Tokenizer(inputCol="processed_comments", outputCol="tokens")
pos_table = tokenizer.transform(pos_table)

In [None]:
pos_table.cache()

In [None]:
pos_table.show()

In [None]:
# extract the scores of each list of tokens into a list
scores_list = pos_table.select("review_scores_rating").rdd.flatMap(lambda x: x).collect()

In [None]:
# convert ratings in scores_list from string to integer
for index, score in enumerate(scores_list):
    try:
        scores_list[index] = int(score)
        
    # if cannot be converted to integer, change this score to a dummy value -999 
    except:
        scores_list[index] = -999

In [None]:
import pickle
pickle.dump(scores_list, open('scores_list.p', 'wb'))

In [None]:
import pickle
with open('scores_list.p', 'rb') as fp:
    scores_list = pickle.load(fp)

In [None]:
# extract the tokens into a list of lists
tokens_list = pos_table.select("tokens").rdd.flatMap(lambda x: x).collect()

In [None]:
# prepare a dictionary that is used for storing and computing the positivity score for each word
ratings_dic = {}

In [None]:
# iterate through the tokens and build a dictionary that stores each token's sum of scores and its number of occurrences
for index, tokens in enumerate(tokens_list):
    score = scores_list[index]
    if score == -999: # bad entries
        continue # ignore this record and proceed with the next
    for token in tokens:
        if token not in ratings_dic:
            ratings_dic[token] = [0,0] # [m, n] where m is the sum of scores & n is the number of occurrences of this token
        ratings_dic[token][0] += score # increment the total scores of this token by 1
        ratings_dic[token][1] += 1 # increment the occurrences of this token by 1

In [None]:
# ratings_dic

In [None]:
# prepare a dictionary that is used for storing the average rating for each word
ave_ratings_dic = {}

# iterate through the key-value pairs in ratings_dic and build a dictionary that stores each token's average rating
for key, value in ratings_dic.items():
    
    # filter the dictionary and remove words that occur below a certain number of times
    threshold = 500 # remove words that occur fewer than 500 times across all reviews
    if value[1] < threshold: 
        continue
    
    ave_rating = value[0]/value[1]
    ave_ratings_dic[key] = ave_rating

In [None]:
# sort the dictionary based on each token's average rating in descending order
# output a list of tuples, with the first element of each tuple being the token, and the second being its average score
sorted_ave_ratings = sorted(ave_ratings_dic.items(), key=lambda x: x[1], reverse=True)

In [None]:
# enquire the top 100 most positive words in the reviews
sorted_ave_ratings[:100]

1. select id and rating from listings
2. select id and comment from reviews
3. join two on id
4. define user defined function udf e.g.
5. use udf to clean comment column --> remove stop words, common words blah blah, stemming maybe? stemming may be too computationally expensive, we could omit.

6. store in a dic, key == rating , value == comment. if same rating, just concat comments. after iterations, value should be very long (few comments concat)

7. tokenize comment, assign rate to each token and store avg rating for each word in a new dic. Lets say original dic is {7 : "it is good", 8: "it looks good and fantastic"}, the new dic will be {'good' : (7+8)/2 = 7.5, "fantastic" : 8}
    - <font color="red">searching through each word in the original dic ({7 : "it is good", 8: "it looks good and fantastic"}) and computing its ave rating might be computationally expensive(?). I'm thinking if we can obtain the new dic ({'good' : (7+8)/2 = 7.5, "fantastic" : 8}) directly from the raw text. 
    - So basically we tokenize a review first, and for every token that is an adjective (see my comment in the next code cell under bullet point #2), append it to a dictionary with its appropriate score and occurrence. e.g. we have a dictionary that is {"good": (70, 10)}, meaning that so far, the word "good" has a cumulative rating of 70, and this word has occurred for 10 times already. If we encounter the word "good" again in a review whose rating is 8, we will update the dictionary accordingly, so that the new dictionary will become {"good":(78, 11)}. Lookup is O(1) and from this dictionary, we can output the final positivity score of a word directly simply by using 78/11. What do you think?</font>
<br>
<br>
8. return the highest score words

a few concerns:
1. too many words invloved, i am not sure if a dic is appropriate, maybe should use streaming sketch, i do not know..
    - <font color="red">agreed. anyway preprocessing needs to be done in Spark so no choice</font>
2. only considered listing rating in positivity weight. "For example, it can be a function of the rating on the listing on which it occurs, the number of reviews it occurs in, and the number of unique listings for which it was used to review." maybe consider others, keep a counter on how many times this word appears, below a threshold then drop. This may help with point 1.
    - <font color="red">im not sure if this is what you mean, but to me it sounds a bit like the TF part of tfidf right? To help reduce the memory and also increase the accuracy, I'm thinking of only retaining the adjectives in each review, as they are the ones that will give insight into how positive the listing is. I've done 词性分析 on Chinese text using jieba, pretty sure there are libraries out there that perform similar tasks on English text too. Will do a bit of research on this part later on.</font>
3. yes, memory space is a concern...can we store so many words in memory?