# Project 3: Airbnb
**This is the third of three mandatory projects to be handed in as part of the assessment for the course 02807 Computational Tools for Data Science at Technical University of Denmark, autumn 2019.**

#### Practical info
- **The project is to be done in groups of at most 3 students**
- **Each group has to hand in _one_ Jupyter notebook (this notebook) with their solution**
- **The hand-in of the notebook is due 2019-12-05, 23:59 on DTU Inside**

#### Your solution
- **Your solution should be in Python/PySpark**
- **For each question you may use as many cells for your solution as you like**
- **You should not remove the problem statements**
- **Your notebook should be runnable, i.e., clicking [>>] in Jupyter should generate the result that you want to be assessed**
- **You are not expected to use machine learning to solve any of the exercises**

# Introduction
[Airbnb](http://airbnb.com) is an online marketplace for arranging or offering lodgings. In this project you will use Spark to analyze data obtained from the Airbnb website. The purpose of the analysis is to extract information about trends and patterns from the data.

The project has two parts.

### Part 1: Loading, describing and preparing the data
There's quite a lot of data. Make sure that you can load and correctly parse the data, and that you understand what the dataset contains. You should also prepare the data for the analysis in part two. This means cleaning it and staging it so that subsequent queries are fast.

### Par 2: Analysis
In this part your goal is to learn about trends and usage patterns from the data. You should give solutions to the tasks defined in this notebook, and you should use Spark to do the data processing. You may use other libraries like for instance Pandas and matplotlib for visualisation.

## Guidelines
- Processing data should be done using Spark. Once data has been reduced to aggregate form, you may use collect to extract it into Python for visualisation.
- Your solutions will be evaluated by correctness, code quality and interpretability of the output. This means that you have to write clean and efficient Spark code that will generate sensible execution plans, and that the tables and visualisations that you produce are meaningful and easy to read.
- You may add more cells for your solutions, but you should not modify the notebook otherwise.

### Create Spark session and define imports

In [None]:
from pyspark.sql import *
from pyspark.sql import functions as f
from pyspark.sql.types import *
from dateutil.parser import parse
from pyspark.sql.functions import to_date
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import from_unixtime
from pyspark.sql.types import DateType
from pyspark.sql.types import DoubleType
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sb
from pyspark.sql.functions import col, size, asc, desc
from pyspark.sql.functions import explode
from pyspark.sql.functions import avg

In [None]:
spark = SparkSession.builder.appName("Project Three").getOrCreate()

# Part 1: Loading, describing and preparing the data
The data comes in two files. Start by downloading the files and putting them in your `data/` folder.

- [Listings](https://files.dtu.dk/u/siPzAasj8w2gI_ME/listings.csv?l) (5 GB)
- [Reviews](https://files.dtu.dk/u/k3oaPYp6GjKBeho4/reviews.csv?l) (9.5 GB)

### Load the data
The data has multiline rows (rows that span multiple lines in the file). To correctly parse these you should use the `multiline` option and set the `escape` character to be `"`.

In [None]:
df = spark.read.option('header', True).option('inferSchema', True).option('multiline', True).option('escape','"').csv('listings.csv')

review = spark.read.option('header', True).option('inferSchema', True).option('multiline', True).option('escape','"').csv('reviews.csv')

### Describe the data
List the features (schema) and sizes of the datasets.

In [None]:
print('Listing Data:')
print('number of rows:'+ str(listing.count()))
print('number of columns:'+ str(len(listing.columns)))
print('Review Data:')
#print('number of rows:'+ str(review.count()))
#print('number of columns:'+ str(len(review.columns)))


In [None]:
# Types of Columns:
listing.printSchema()

In [None]:
# Types of Columns:
review.printSchema()

### Prepare the data for analysis
You should prepare two dataframes to be used in the analysis part of the project. You should not be concerned with cleaning the data. There's a lot of it, so it will be sufficient to drop rows that have bad values. You may want to go back and refine this step at a later point when doing the analysis.

You may also want to consider if you can stage your data so that subsequent processing is more efficient (this is not strictly necessary for Spark to run, but you may be able to decrease the time you sit around waiting for Spark to finish things)

### Note
First, we notice that the type of some column could be converted to the desired one instead of the string. This is the initial preparation, but we do other preparation and cleaning of the data through the process as we go the further steps.

In [None]:
from pyspark.sql.functions import udf
charReplace=udf(lambda x: x.replace('$',''))
listing_pre_=df.withColumn('price',charReplace('price'))

In [None]:
listing_1= listing.withColumn("host_since",listing['host_since'].cast(DateType()))
listing_2 = listing_1.withColumn("price", listing['price'].cast(FloatType()))
listing_3 = listing_2.withColumn("review_scores_rating", listing['review_scores_rating'].cast(FloatType()))
df0 = listing_3.withColumn("host_total_listings_count", listing['host_total_listings_count'].cast(FloatType()))
listing_pre.printSchema()

In [None]:
review_pre= review.withColumn("date",review['date'].cast(DateType()))
review_pre.printSchema()

# Part 2: Analysis
Use Spark and your favorite tool for data visualization to solve the following tasks.

## The basics
Compute and show a dataframe with the number of listings and neighbourhoods per city.

In [None]:
df1 = df0.select('city', 'host_total_listings_count', 'property_type','neighbourhood','review_scores_rating','host_since','price')

In [None]:
df_listings_neighhood_per_city= df1.groupBy('city').agg(f.sum('host_total_listings_count'))

In [None]:
df_listings_neighhood_per_city.show(100)

Based on the table above, you should choose a city that you want to continue your analysis for. The city should have mulitple neighbourhoods with listings in them.

Compute and visualize the number of listings of different property types per neighbourhood in your city.

In [None]:
df_selected_city = df1.filter(f.col( 'city') == 'Stockholm') 

In [None]:
listing_basics_per_property.show(10)
df_selected_city.show(10)
df_neighhood_property_type = df1.groupBy('neighbourhood').agg(f.count('property_type'))

In [None]:
dat1  = df_neighhood_property_type.toPandas()

In [None]:
sb.barplot(x = dat1['neighbourhood'], y=dat1['count(property_type)'])
plt.xlabel('neighbourhood')
plt.ylabel('count')
plt.figure(figsize=(20, 10))
plt.show()

## Prices
Compute the minimum, maximum and average listing price in your city. 

In [None]:
#Select the price attribute
df_price_column_selected = df_selected_city.select('price').dropna()

In [None]:
df_price_column_selected.show(10)

In [None]:
df_price_column_selected.select(f.min('price'),f.max('price'),f.avg('price')).show()


In [None]:
dat2 = df_price_column_selected.toPandas()
dat2.hist(column='price')

In [None]:
sb.kdeplot(dat2.price, shade =True)

In [None]:
df_review_ratings = df1.dropna(subset=('review_scores_rating','price'))
df_review_ratings.show(10)

In [None]:
df_value = df_review_ratings.withColumn('value', f.col('review_scores_rating')/f.col('price')).select('neighbourhood', 'value')

In [None]:
df_window = Window.partitionBy('neighbourhood').orderBy(f.desc('value'))
df_rank = df_value.withColumn('NeighbourhoodValue', f.rank().over(df_window))

Compute and visualize the distribution of listing prices in your city.

In [None]:
df_rank_top_3 = df_rank.filter(f.col('NeighbourhoodValue') <= 3).drop('NeighbourhoodValue').orderBy('neighbourhood', f.desc('value')).show()

The value of a listing is its rating divided by its price.

Compute and show a dataframe with the 3 highest valued listings in each neighbourhood.

In [None]:
df_review_column_selected = df_selected_city.select('review_scores_rating').dropna()

## Trends
Now we want to analyze the "popularity" of your city. The data does not contain the number of bookings per listing, but we have a large number of reviews, and we will assume that this is a good indicator of activity on listings.

Compute and visualize the popularity (i.e., number of reviews) of your city over time.

Compute and visualize the popularity of neighbourhoods over time. If there are many neighbourhoods in your city, you should select a few interesting ones for comparison.

Compute and visualize the popularity of your city by season. For example, visualize the popularity of your city per month.

## Reviews
In this part you should determine which words used in reviews that are the most positive. 

The individual reviews do not have a rating of the listing, so we will assume that each review gave the average rating to the listing, i.e., the one on the listing.

You should assign a positivity weight to each word seen in reviews and list the words with the highest weight. It is up to you to decide what the weight should be. For example, it can be a function of the rating on the listing on which it occurs, the number of reviews it occurs in, and the number of unique listings for which it was used to review.

Depending on your choice of weight function, you may also want to do some filtering of words. For example, remove words that only occur in a few reviews.

In [None]:
df_review = spark.read.option('header', True).option('inferSchema', True).option('multiline', True).option('escape', '"').csv('reviews.csv')

In [None]:
df_review.groupby('listing_id').count().show(100)

In [None]:
df_rev_ana = df0.select('id', 'review_scores_rating')
df_rev_ana.show(10)
df_review_ana = df_review.select('listing_id','comments')
df_review_ana.show(10)
ta = df_rev_ana.alias('ta')
tb = df_review_ana.alias('tb')
inner_join = ta.join(tb, ta.id == tb.listing_id)
inner_join.show()

In [None]:
inner_join.describe('review_scores_rating').show()

In [None]:
table = inner_join.withColumn("review_scores_rating", df0['review_scores_rating'].cast(FloatType()))

In [None]:
splits = table.randomSplit([0.8, 0.2], 24)
splits[0].show(10)

In [None]:
train=splits[0]
test = splits[1]

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="comments", outputCol="words", pattern="\\s+|[,.()\"]")

In [None]:
data_words = regexTokenizer.transform(train)

In [None]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
data_words2 = remover.transform(data_words)

In [None]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="filtered", outputCol="outword")


In [None]:
data3 = cv.fit(data_words2).transform(data_words2)

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
idf = IDF(inputCol='outword', outputCol='newIDF') 


In [None]:
from pyspark.mllib.feature import HashingTF, IDF


In [None]:
idf = IDF()
data3 = IDF.fit(data_words2).transform(data_words2)

In [None]:
idf = IDF(inputCol = 'cv', outputCol = 'features')