In [None]:
%%bash
if [ ! -d "DataSet" ] ; then 
    mkdir "DataSet"
    cd "DataSet"
    curl -O "http://data.insideairbnb.com/united-states/ny/new-york-city/2019-06-02/data/{listings.csv.gz,sreviews.csv.gz}"
    gunzip *
fi

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyspark
conf = pyspark.SparkConf().setMaster("local[3]").setAppName("Airbnb")
sc = pyspark.SparkContext(conf = conf)
spark=pyspark.sql.SparkSession(sc)

In [None]:
from pyspark.sql.types import DoubleType,IntegerType,StringType,ShortType,DateType
import pyspark.sql.functions as F
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk import tokenize

def PercentStrip(text):
    try:
        result=text.rstrip("%")
    except:
        result=None
    return result
udf_PercentStrip=spark.udf.register("PercentStrip", PercentStrip, StringType())


def DollarSkip(text):
    try:
        result=text.lstrip("$").replace(",","")
    except:
        result=None
    return result
udf_DollarSkip=spark.udf.register("DollarSkip", DollarSkip, StringType())



def DropTextNA(text):
    return None if text=="N/A" else text
udf_DropTextNA=spark.udf.register("DropTextNA", DropTextNA, StringType())

def NumAmenities(text):
    return int(len(text.split(","))/2)
udf_NumAmenities=spark.udf.register("NumAmenities", NumAmenities, IntegerType())



sid = SentimentIntensityAnalyzer()
def Sentiment_Score(text):
    return sid.polarity_scores(text)["compound"]
udf_SC=spark.udf.register("Sentiment_Score_Function", Sentiment_Score, DoubleType())

In [None]:
df_Listings=spark.read.option("escape",'"').option("multiline",True).csv("DataSet/listings.csv",header=True)["id","host_is_superhost","host_has_profile_pic","host_identity_verified","latitude","longitude","is_location_exact","price",'review_scores_rating','reviews_per_month',"host_response_time","host_response_rate","neighbourhood","zipcode","room_type","accommodates","bathrooms","bedrooms","beds","amenities"]
df_Listings=df_Listings\
    .withColumn("id",df_Listings.id.cast(IntegerType()))\
    .withColumn("host_is_superhost",df_Listings.host_is_superhost=="t")\
    .withColumn("host_has_profile_pic",df_Listings.host_has_profile_pic=="t")\
    .withColumn("host_identity_verified",df_Listings.host_identity_verified=="t")\
    .withColumn("latitude",df_Listings.latitude.cast(DoubleType()))\
    .withColumn("longitude",df_Listings.longitude.cast(DoubleType()))\
    .withColumn("is_location_exact",df_Listings.is_location_exact=="t")\
    .withColumn("price",udf_DollarSkip(df_Listings.price).cast(DoubleType()))\
    .withColumn("review_scores_rating",df_Listings.review_scores_rating.cast(ShortType()))\
    .withColumn("reviews_per_month",df_Listings.reviews_per_month.cast(DoubleType()))\
    .withColumnRenamed("id","Listing_ID")\
    .withColumn("host_response_rate",udf_PercentStrip(df_Listings.host_response_rate).cast(IntegerType()))\
    .withColumn("zipcode",df_Listings.zipcode.cast(IntegerType()))\
    .withColumn("host_response_time",udf_DropTextNA(df_Listings.host_response_time))\
    .withColumn("accommodates",df_Listings.accommodates.cast(IntegerType()))\
    .withColumn("bathrooms",df_Listings.bathrooms.cast(IntegerType()))\
    .withColumn("bedrooms",df_Listings.bedrooms.cast(IntegerType()))\
    .withColumn("beds",df_Listings.beds.cast(IntegerType())).na.drop(subset="price")\
    .withColumn("amenities",udf_NumAmenities(df_Listings.amenities))



In [None]:
df_Reviews=spark.read.csv("DataSet/reviews.csv",header=True).na.drop()
df_Reviews=df_Reviews.withColumn("date",df_Reviews["date"].cast(DateType()))\
    .withColumn("listing_id",df_Reviews.listing_id.cast(IntegerType()))\
    .withColumn("Sentiment_Score",udf_SC(df_Reviews["comments"]))\
    .withColumn("Comment_Length",F.length(df_Reviews["comments"]))\
    .drop(*["id","reviewer_id","comments"]).na.drop()\
    .selectExpr("listing_id as Listing_ID", "date as Date","Sentiment_Score","Comment_Length")\
    .groupby("Listing_ID").agg({"Sentiment_Score":"mean","Comment_Length":"mean"})

In [None]:
Joint_Set=df_Listings.join(df_Reviews,"Listing_ID","inner")
DataFrame=Joint_Set.toPandas()
DataFrame.to_pickle("DataSet/DataSet.pkl")
DataFrame.to_feather("DataSet/DataSet.feather")
sc.stop()