# Data Engineering 

Here the objective is to transform the original data. Such as cleaning, log transform, clean outliers, etc. 

Author: Arthur Dimitri <br>
arthur.dimitri@ee.ufcg.edu.br

# Extra Dependency

We need NLTK to perform some transformations in the text fields.

In [0]:
pip install nltk

# Loading the Data

In [0]:
# File location and type
file_location = "/FileStore/tables/AB_NYC_2019-3.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
data = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(data)

id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149.0,1.0,9,2018-10-19,0.21,6.0,365.0
2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225.0,1.0,45,2019-05-21,0.38,2.0,355.0
3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150.0,3.0,0,,,1.0,365.0
3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89.0,1.0,270,2019-07-05,4.64,1.0,194.0
5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80.0,10.0,9,2018-11-19,0.1,1.0,0.0
5099,Large Cozy 1 BR Apartment In Midtown East,7322,Chris,Manhattan,Murray Hill,40.74767,-73.975,Entire home/apt,200.0,3.0,74,2019-06-22,0.59,1.0,129.0
5121,BlissArtsSpace!,7356,Garon,Brooklyn,Bedford-Stuyvesant,40.68688,-73.95596,Private room,60.0,45.0,49,2017-10-05,0.4,1.0,0.0
5178,Large Furnished Room Near B'way,8967,Shunichi,Manhattan,Hell's Kitchen,40.76489,-73.98493,Private room,79.0,2.0,430,2019-06-24,3.47,1.0,220.0
5203,Cozy Clean Guest Room - Family Apt,7490,MaryEllen,Manhattan,Upper West Side,40.80178,-73.96723,Private room,79.0,2.0,118,2017-07-21,0.99,1.0,0.0
5238,Cute & Cozy Lower East Side 1 bdrm,7549,Ben,Manhattan,Chinatown,40.71344,-73.99037,Entire home/apt,150.0,1.0,160,2019-06-09,1.33,4.0,188.0


In [0]:
total_examples = data.count()
print("---------- Initial Number of Training Examples: ", data.count(), "----------------")

In [0]:
# Create a view or table

temp_table_name = "AB_NYC_2019_csv"

data.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `AB_NYC_2019_csv`

id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149.0,1.0,9,2018-10-19,0.21,6.0,365.0
2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225.0,1.0,45,2019-05-21,0.38,2.0,355.0
3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150.0,3.0,0,,,1.0,365.0
3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89.0,1.0,270,2019-07-05,4.64,1.0,194.0
5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80.0,10.0,9,2018-11-19,0.1,1.0,0.0
5099,Large Cozy 1 BR Apartment In Midtown East,7322,Chris,Manhattan,Murray Hill,40.74767,-73.975,Entire home/apt,200.0,3.0,74,2019-06-22,0.59,1.0,129.0
5121,BlissArtsSpace!,7356,Garon,Brooklyn,Bedford-Stuyvesant,40.68688,-73.95596,Private room,60.0,45.0,49,2017-10-05,0.4,1.0,0.0
5178,Large Furnished Room Near B'way,8967,Shunichi,Manhattan,Hell's Kitchen,40.76489,-73.98493,Private room,79.0,2.0,430,2019-06-24,3.47,1.0,220.0
5203,Cozy Clean Guest Room - Family Apt,7490,MaryEllen,Manhattan,Upper West Side,40.80178,-73.96723,Private room,79.0,2.0,118,2017-07-21,0.99,1.0,0.0
5238,Cute & Cozy Lower East Side 1 bdrm,7549,Ben,Manhattan,Chinatown,40.71344,-73.99037,Entire home/apt,150.0,1.0,160,2019-06-09,1.33,4.0,188.0


# Investigating the Data

* name, host_id, host_name, last_review, but they do not seem to be relevant to solve the problem.
* There's a pattern. Most of the columns have 185 null values. It seems that they are related to the same listings. Removing them won't harm the model. 
* availability_365 and reviews_per_month have missing values. We are going to impute the mean value according to the neighbourhood_group and the room_type.

In [0]:
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas().transpose()

Unnamed: 0,0
id,0
name,32
host_id,185
host_name,212
neighbourhood_group,185
neighbourhood,185
latitude,185
longitude,185
room_type,185
price,185


In [0]:
important_columns = ['id','name','host_id','neighbourhood_group','latitude','longitude','room_type','price','number_of_reviews','reviews_per_month','calculated_host_listings_count','availability_365']

# Pipeline

### Customized Stages Description

* MeanImputation: given a list of input columns, it fills the null values of an output column with the average value of the input grouping
* DropRows: given a list on input columns, this stage drops all the rows where there are null values
* logTransform: takes an input column and apply a log transformation
* cleanText: takes an input column and removes all the special characters from the text
* cleanOutliers: takes an input column and a threshold, which determines the dataframe rows that satisfy the condition
<img src="https://drive.google.com/uc?id=1Fl-80kElIgAOS2SB16IARf48zsKzpo25">

In [0]:
from pyspark.ml.pipeline import Transformer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.functions import log10
from pyspark.sql.functions import col, lower, regexp_replace, split
from pyspark.sql.types import *
from nltk.stem.porter import *
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
global total_lost

class MeanImputation(Transformer):
    def __init__(self, inputCols, outputCol):
        self.inputCols = inputCols
        self.outputCol = outputCol 
    def this():
        this(Identifiable.uid("imputer"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        # operating in the speficied group of input columns
        w = Window().partitionBy(self.inputCols)
        #related to the data integrity specification
        print("------------- Percentage Recovered With Mean Imputation - " + self.outputCol + ":", df.filter((data[self.outputCol] == "") | df[self.outputCol].isNull() | isnan(df[self.outputCol])).count()/data.count()*100, "% -----------------")
        # returns the new windowed average values when there is a null value on the column
        return df.withColumn(self.outputCol,when(col(self.outputCol).isNull(),avg(col(self.outputCol)).over(w)).otherwise(col(self.outputCol)))
      
class DropRows(Transformer):
    def __init__(self, inputCols):
        self.inputCols = inputCols
    def this():
        this(Identifiable.uid("Dropper_rows"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        # related to the data integrity specification
        print("------------- Percentage Lost in Drop Rows Stage:", (df.na.drop(subset=self.inputCols).count())/df.count(), "%-----------------")
        # returns a dataframe where there are not rows with null values in the specified columns
        return df.na.drop(subset=self.inputCols)
      
class LogTransform(Transformer):
    def __init__(self, inputCol, outputCol):
        self.inputCol = inputCol
        self.outputCol = outputCol
    def this():
        this(Identifiable.uid("log_transform"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        # returns a dataframe with a log transformed input column as the specified output column
        return df.withColumn(self.outputCol, log10((self.inputCol)))
      
class CleanText(Transformer):
    def __init__(self, inputCol, outputCol):
        self.inputCol = inputCol
        self.outputCol = outputCol
    def this():
        this(Identifiable.uid("clean_text"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        def clean_text(c):
          # removes all these characters
          c = lower(c)
          c = regexp_replace(c, "^rt ", "")
          c = regexp_replace(c, "(https?\://)\S+", "")
          c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
          return c
        # applies the specified function above to the input column
        return df.withColumn((self.outputCol), clean_text((self.inputCol)))
  
class StemmerTransform(Transformer):
    def __init__(self, inputCol, outputCol):
        self.inputCol = inputCol
        self.outputCol = outputCol
    def this():
        this(Identifiable.uid("clean_text"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        def stem(in_vec):
          out_vec = []
          stemmer = PorterStemmer()
          for t in in_vec:
              t_stem = stemmer.stem(t)
              if len(t_stem) > 2:
                  out_vec.append(t_stem)       
          return out_vec
        stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))
        return df.withColumn((self.outputCol), stemmer_udf(col(self.inputCol)))
  
class CleanOutliers(Transformer):
    def __init__(self, inputCol, outputCol, threshold):
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.threshold = threshold
    def this():
        this(Identifiable.uid("clean_outliers"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        print("------------- Percentage lost in clean outliers stage: ",(df.count() - df.where(f.col(self.inputCol) < self.threshold).count())/df.count(), "---------------------")
        # returns a dataframe where the condition for outliers is satisfied
        return df.where(f.col(self.inputCol) < self.threshold)

# Pipeline Execution

After we run the pipeline, there are some informations about the data integrity after each stage execution.

In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import NGram
from pyspark.sql.functions import col, size
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF

stages = []
# All the customized pipeline stages
mi = MeanImputation(inputCols=['neighbourhood_group','room_type'], outputCol='reviews_per_month')
mi2 = MeanImputation(inputCols=['neighbourhood_group','room_type'], outputCol='availability_365')
dr = DropRows(inputCols=['latitude','longitude', 'price','name','price'])
co = CleanOutliers(inputCol='price', outputCol='price_no_outliers', threshold=500)
lt = LogTransform(inputCol='price',outputCol='priceLog') #antes input = price_no_outliers
dr2 = DropRows(inputCols=['priceLog'])
ct = CleanText(inputCol='name', outputCol='clean_name')
stages+=[mi,mi2,dr,co,lt,dr2,ct]

# the categorical columns which we want to one-hot-encode
stage_cols = ['neighbourhood_group','room_type']

for column in stage_cols:
  # we apply these previous stages for all the categorical columns defined as stage_cols
  stringIndexer = StringIndexer(inputCol = column, outputCol = column + "idx")
  encoder = OneHotEncoder(inputCols = [stringIndexer.getOutputCol()], outputCols = [column + "catVec"])
  stages+=[stringIndexer,encoder]

# we apply a tokenization and remove all the necessary stop words
tokenizer = Tokenizer(inputCol="clean_name", outputCol="vector")
remover = StopWordsRemover()
stopwords = remover.getStopWords() 
remover.setInputCol("vector")
remover.setOutputCol("vector_ns")
ngram = NGram(n=2, inputCol="vector_ns", outputCol="bigrams")

# we add these stages to the pipeline
stages+=[tokenizer,remover,ngram]
pipeline = Pipeline(stages=stages)
df_mod = pipeline.fit(data).transform(data)
# we remove from the "bigrams" column the groups where there are less than 2 words
df_mod = df_mod.where(size(col("bigrams")) >= 2)


stages=[]
# we apply a count vectorizer to the refered column in order to extract some semantics from the text field
countVectorizer = CountVectorizer(minDF=2, inputCol="bigrams", outputCol='textBOW')
stages+= [countVectorizer]
pipeline = Pipeline(stages=stages)
df_mod_2 = pipeline.fit(df_mod).transform(df_mod)

# Casting Types - Final Dataframe

In [0]:
from pyspark.sql.types import FloatType, IntegerType, StringType, DateType

df_mod_2 = df_mod_2.withColumn('minimum_nights', df_mod_2['minimum_nights'].cast(FloatType()))
df_mod_2 = df_mod_2.withColumn('number_of_reviews', df_mod_2['number_of_reviews'].cast(FloatType()))
df_mod_2 = df_mod_2.withColumn('calculated_host_listings_count', df_mod_2['calculated_host_listings_count'].cast(FloatType()))
df_mod_2 = df_mod_2.withColumn('availability_365', df_mod_2['availability_365'].cast(FloatType()))
df_mod_2 = df_mod_2.withColumn('priceLog', df_mod_2['priceLog'].cast(FloatType()))

# Vector Assembling Features

We assembled the features that are significant to the problem resolution, which are: minimum_nights, number_of_reviews, neighbourhood_groupcatVec, room_typecatVec, calculated_host_listings_count.

In [0]:
stages=[]
features = ['minimum_nights','number_of_reviews','neighbourhood_groupcatVec','room_typecatVec','calculated_host_listings_count']
feature = VectorAssembler(inputCols=features, outputCol='fts')
stages +=[feature]
pipeline = Pipeline(stages=stages)
df_mod_3 = pipeline.fit(df_mod_2).transform(df_mod_2)
display(df_mod_3) 

minimum_nights,number_of_reviews,calculated_host_listings_count,availability_365,priceLog,neighbourhood_groupcatVec,room_typecatVec,textBOW,fts
2.0,38.0,13.0,365.0,2.0211892,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(), values -> List())","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(2.0, 38.0, 1.0, 1.0, 13.0))"
30.0,4.0,2.0,346.0,1.9542425,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(8474, 12688, 15254), values -> List(1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(30.0, 4.0, 1.0, 1.0, 2.0))"
1.0,197.0,1.0,309.0,1.8864907,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(1, 14, 173, 252, 1595, 3745, 14979), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(1.0, 197.0, 1.0, 1.0, 1.0))"
2.0,271.0,2.0,347.0,2.09691,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(733, 2907, 5532), values -> List(1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(2.0, 271.0, 1.0, 1.0, 2.0))"
2.0,47.0,1.0,318.0,2.0791812,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(12, 120, 1057, 5055), values -> List(1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(2.0, 47.0, 1.0, 1.0, 1.0))"
5.0,82.0,1.0,63.0,2.0,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(4, 14, 15030), values -> List(1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(5.0, 82.0, 1.0, 1.0, 1.0))"
2.0,189.0,4.0,238.0,1.690196,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(), values -> List())","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(2.0, 189.0, 1.0, 1.0, 4.0))"
1.0,25.0,1.0,311.0,1.7781513,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(), values -> List())","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(1.0, 25.0, 1.0, 1.0, 1.0))"
3.0,67.0,1.0,0.0,1.9242793,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(3283, 6796, 9377, 11074, 11570), values -> List(1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(3.0, 67.0, 1.0, 1.0, 1.0))"
5.0,1.0,1.0,0.0,2.2900345,"Map(vectorType -> sparse, length -> 67, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 76, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 16264, indices -> List(7, 14), values -> List(1.0, 1.0))","Map(vectorType -> sparse, length -> 146, indices -> List(0, 1, 5, 69, 145), values -> List(5.0, 1.0, 1.0, 1.0, 1.0))"


# Exporting the Dataframe

This is the final dataframe that we are going to load on the ML notebooks.

In [0]:
df_mod_3.write.save("/FileStore/tables/finalAIRBNB",format='parquet')