**Data Engineering - NY City AirBnB**

**André Igor Nóbrega da Silva - andre.nobrega@ee.ufcg.edu.br**

In [0]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.ml.pipeline import Transformer
from pyspark.ml import Pipeline
from pyspark.ml.util import Identifiable
from pyspark.sql.types import IntegerType, StringType, DateType, FloatType
from pyspark.sql.functions import isnan, when, count, col, udf, regexp_extract, lit, log10, log1p
from pyspark.ml.feature import Imputer,CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StopWordsRemover, NGram
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
import re

**Data Importing and Changing Schema**

In [0]:
df = spark.read.load("/FileStore/tables/nyc_airbnb_parquet")
df = df.withColumn("id", df["id"].cast(IntegerType()))
df = df.withColumn("name", df["name"].cast(StringType()))
df = df.withColumn("host_id", df["host_id"].cast(IntegerType()))
df = df.withColumn("host_name", df["host_name"].cast(StringType()))
df = df.withColumn("neighbourhood_group", df["neighbourhood_group"].cast(StringType()))
df = df.withColumn("neighbourhood", df["neighbourhood"].cast(StringType()))
df = df.withColumn("latitude", df["latitude"].cast(FloatType()))
df = df.withColumn("longitude", df["longitude"].cast(FloatType()))
df = df.withColumn("room_type", df["room_type"].cast(StringType()))
df = df.withColumn("price", df["price"].cast(IntegerType()))
df = df.withColumn("minimum_nights", df["minimum_nights"].cast(IntegerType()))
df = df.withColumn("number_of_reviews", df["number_of_reviews"].cast(IntegerType()))
df = df.withColumn("reviews_per_month", df["reviews_per_month"].cast(FloatType()))
df = df.withColumn("calculated_host_listings_count", df["calculated_host_listings_count"].cast(IntegerType()))
df = df.withColumn("availability_365", df["availability_365"].cast(IntegerType()))
display(df)

id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
2539.0,Clean & quiet apt home by the park,2787.0,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149.0,1.0,9.0,2018-10-19,0.21,6.0,365.0
2595.0,Skylit Midtown Castle,2845.0,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225.0,1.0,45.0,2019-05-21,0.38,2.0,355.0
3647.0,THE VILLAGE OF HARLEM....NEW YORK !,4632.0,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150.0,3.0,0.0,,,1.0,365.0
3831.0,Cozy Entire Floor of Brownstone,4869.0,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89.0,1.0,270.0,2019-07-05,4.64,1.0,194.0
5022.0,Entire Apt: Spacious Studio/Loft by central park,7192.0,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80.0,10.0,9.0,2018-11-19,0.1,1.0,0.0
5099.0,Large Cozy 1 BR Apartment In Midtown East,7322.0,Chris,Manhattan,Murray Hill,40.74767,-73.975,Entire home/apt,200.0,3.0,74.0,2019-06-22,0.59,1.0,129.0
5121.0,BlissArtsSpace!,7356.0,Garon,Brooklyn,Bedford-Stuyvesant,40.68688,-73.95596,Private room,60.0,45.0,49.0,2017-10-05,0.4,1.0,0.0
5178.0,Large Furnished Room Near B'way,8967.0,Shunichi,Manhattan,Hell's Kitchen,40.76489,-73.98493,Private room,79.0,2.0,430.0,2019-06-24,3.47,1.0,220.0
5203.0,Cozy Clean Guest Room - Family Apt,7490.0,MaryEllen,Manhattan,Upper West Side,40.80178,-73.96723,Private room,79.0,2.0,118.0,2017-07-21,0.99,1.0,0.0
5238.0,Cute & Cozy Lower East Side 1 bdrm,7549.0,Ben,Manhattan,Chinatown,40.71344,-73.99037,Entire home/apt,150.0,1.0,160.0,2019-06-09,1.33,4.0,188.0


**Building Pipeline Transformers**

**Cleaning the Data**

In [0]:
# Clean
class InputError(Exception):
  pass

class CleanData(Transformer):
  """
  Returns a filtered dataframe. The input_cols are filtered between the thresholds parameters.
  
  Parameters:
  
  inputCols: dictionary with columns to remove as keys and boundaries as the values.
  
  Example:
  
  inputCols = {'column1': ['lower_boundary', 'upper_boundary']}
  """
  
  def __init__(self, inputCols):
    #super(CleanData, self).__init__()
    self.inputCols = inputCols
    self.uid = "CleanData" + Identifiable._randomUID()
    
  def copy(extra):
    defaultCopy(extra)
    
  def _transform(self, df):
    for input_col, thresh in inputCols.items():
      if len(thresh) < 2:
        raise InputError('thresholds parameter must be a list containing the lower and upper bounds')
      before = df.count()
      df = df.filter((df[input_col] > thresh[0]) & (df[input_col] < thresh[1]))
      after = df.count()
    
      print(f'Percentage removed by {input_col}: {round((100 - after/before*100),2)}%')
    return df

**Imputing Values**

In [0]:
class ImputValues(Transformer):
  """
  Imput values in the columns of dataframe with a value passed by the user.
  Returns the imputed dataframe.
  """
  def __init__(self, inputCols):
    self.inputCols = inputCols

    self.uid = "ImputValues" + Identifiable._randomUID()
    
  def copy(extra):
    defaultCopy(extra)
    
  def _transform(self, df):    
    for key,value in self.inputCols.items():
    
      df = df.fillna({key:value})
    return df
  

**Logarithm Transformation**

In [0]:
class LogTransformer(Transformer):
  """
  Applies a logarithm transformation to the inputCols.
  Returns a dataframe with new columns.
  """
  
  def __init__(self, inputCols):
    self.inputCols = inputCols

    self.uid = "LogTransformer" + Identifiable._randomUID()
    
  def copy(extra):
    defaultCopy(extra)
    
  def _transform(self, df):    
    for inputCol in self.inputCols:
    
      df = df.withColumn(('log_'+inputCol), log1p(inputCol))
    return df
  

**Cleaning the Text Features**

In [0]:
@udf
def clean_text(text):
    """
    Cleans a text input, removing punctuations, links, line breaks, spacings and words containing numbers
    """
    text = text.lower()
    text = re.sub('[!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~]', '', text) # remove punctuations
    text = re.sub(r'\n', ' ', text)  # remove line breaks
    text = re.sub(r'https?://\S+', '', text) # remove links
    text = re.sub(r'\s+', ' ', text) # remove unnecessary spacings
    text = re.sub(r'\w*\d\w*', '', text) # remove words containing numbers
    
    return text
  
class CleanText(Transformer):
  """
  Applies the UDF clean_text to the dataframe.
  Returns a dataframe with a new column containing the cleaned text.
  """
  def __init__(self):
      self.uid = "CleanText" + Identifiable._randomUID()
    
  def _transform(self,df):
      df = df.withColumn('clean_text', clean_text('name'))
      
      return df

**Assuring Categorical Distributions**

In [0]:
class SecureDistribution(Transformer):
  """
  Filters rows on the test set that have unseen categorical values by the training set
  """
  
  def __init__(self, inputCols, train_df):
    self.train_df = train_df
    self.inputCols = inputCols
    self.uid = "SecureDistribution" + Identifiable._randomUID()
    
  def copy(extra):
    defaultCopy(extra)
    
  def _transform(self, df):    
    for col in self.inputCols:
      unseen_set = (set(df.select(col).distinct().collect()) - set(self.train_df.select(col).distinct().collect()))
      for item in unseen_set:
        df = df.filter(df[col] != item[col])
    
    return df

**Building the Pipeline Object**

In [0]:
stages = []

# Cleaning Stage

# The thresholds were decided by an EDA.
inputCols = {'minimum_nights': [0, 40],  'longitude': [-75.0, -73.712991],'latitude': [40.50, 40.90], 'price': [0, 800]}
clean = CleanData(inputCols)

clean_df = clean.transform(df)


# Splitting train test so only the train vocabulary is learned by the CountVectorizer

train, test = clean_df.randomSplit([0.70, 0.20], seed = 1235)

# Securing distribution of categorical columns
categorical_columns = ['neighbourhood_group', 'neighbourhood', 'room_type']

secureDistribution = SecureDistribution(inputCols = categorical_columns, train_df = train)
test = secureDistribution.transform(test)


# Imputer

imputer = ImputValues({'name': '', 'reviews_per_month':0})

# Log Scaling

log_cols = ['price', 'minimum_nights', 'number_of_reviews','reviews_per_month','calculated_host_listings_count']
logt = LogTransformer(log_cols)

# Cleaning
cleaningStage = CleanText()

# Tokenizing
regexTokenizer = RegexTokenizer(inputCol="clean_text", outputCol="words", pattern="\\W")

# Removing stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Applying word vector
cv = CountVectorizer(inputCol="filtered", outputCol="text_feature", minDF = 3)

stages+= [imputer, logt, cleaningStage, regexTokenizer, remover, cv]


# Categorical features


for catCol in categorical_columns:
  stringIndexer = StringIndexer(inputCol = catCol, outputCol = catCol + "Index")
  encoder = OneHotEncoder(inputCols = [stringIndexer.getOutputCol()], outputCols = [catCol + "catVec"])
  
  stages += [stringIndexer, encoder]
  
# Numerical features

num_feat = ['log_' + log_col for log_col in log_cols[1:]] + ['latitude', 'longitude', 'availability_365'] + ['text_feature']
assembleInputs = [c + "catVec" for c in categorical_columns] + num_feat
assembleInputs_NoText = assembleInputs[:-1]


# Vector Assembler

assembler = VectorAssembler(inputCols = assembleInputs, outputCol = "features")
assembler_NoText = VectorAssembler(inputCols = assembleInputs_NoText, outputCol = "simple_features")

stages += [assembler, assembler_NoText]
pip = Pipeline(stages = stages)

# Fitting the Pipeline
pipe = pip.fit(train)

# Transforming train and test sets
X_test = pipe.transform(test)
X_train = pipe.transform(train)

# Creating a concatenated dataframe
df = X_train.union(X_test)

**Saving the Transformed Dataframes**

In [0]:
df.select("features", "simple_features", "log_price").write.save('/FileStore/tables/airbnb_transf_dataframe_full',  format='parquet')
X_train.select("features", "simple_features", "log_price").write.save('/FileStore/tables/airbnb_transf_dataframe_train',  format='parquet')
X_test.select("features", "simple_features", "log_price").write.save('/FileStore/tables/airbnb_transf_dataframe_test',  format='parquet')