# Dataframe assembler

This notebook will:

* Input original dataset
* Conduct feature engineering for the following columns:
    * Lat/long: Add clusters, potentially; also neighborhood/other vars
    * Features: Exploded? and k-means clustering into 20 clusters -- kapow!
    * Manager: Add a manager score
    * Description: replace with text analysis thing, add columns for exclamations and punctuation
* This will generate a dataframe with several 'features' columns (eg. 'features_description', 'features_manager' etc.)
* We will then combine these columns into a single column of features vectors:
https://scikit-learn.org/0.18/auto_examples/hetero_feature_union.html looks very helpful for doing this

* We then split the data using 20% testing, 80% cv with 5 folds of 16% to parameterize the model

    * First model= logistic regression using no engineered features
    * Second model= random forest with no engineered features

    * Third model= logistic regression with engineered features
    * Fourth model= random forest with engineered features


Cross-validation and model comparison is based on log-loss.
    

In [2]:
# Initiate spark

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
import seaborn as sns
import matplotlib.pyplot as plt

import pandas as pd

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Assemble_Data") \
    .config("spark.executor.memory", '4g') \
    .config('spark.executor.cores', '1') \
    .config('spark.cores.max', '1') \
    .config("spark.driver.memory",'1g') \
    .getOrCreate()

sc = spark.sparkContext
sqlCtx = SQLContext(sc)

In [3]:
# Import data
train_data_pd = pd.read_json("data/train.json")
train_data_df = sqlCtx.createDataFrame(train_data_pd)

# Feature Engineering

## Lat/long work:

**Still need to implement pipeline. I had a little trouble with it but will add it soon.**

Also, the clusters are mapped to each observaition via a unique id that I added but the id is not returned. Not sure if this causes problems or not but if it does we'll need to add the ID to the train df prior to adding the column.

In [4]:
from pyspark.ml.feature import Bucketizer #For grouping lat and long into buckets for clustering
from pyspark.ml.clustering import KMeans #Clustering coord data
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer #index response variable as numerical
from pyspark.sql.functions import monotonically_increasing_id #Joining clusters back to original data need a unique ID
import numpy #Create splits vector for bucketizing

#Make sure to clean lat and long prior to this. Or I could put the cleaning in the function.
#I'm not sure how to use xining's outlier smoother function and I used a different method in
# my code. 
def get_coord_clusters(df, inputCol1, inputCol2, numClust):
    #Get df of only columns of interest
    partial_df = df.select(inputCol1, inputCol2, 'interest_level')
    
    ###Index interest level(Probably will need to be removed. Not necessary and will be done
    ###in other parts of code.)
    
    #Instantiate and fit indexer and drop old interest_level
    indexer = StringIndexer(inputCol = 'interest_level', outputCol = 'interest_level_index')
    partial_df = indexer.fit(partial_df).transform(partial_df).drop('interest_level')
    
    ###Bucketize latitude and longitude: based on the min and max of their range.
    
    #Get min and max of each col
    min1 = partial_df.agg({inputCol1: 'min'}).collect()[0][0]
    max1 = partial_df.agg({inputCol1: 'max'}).collect()[0][0]
    
    min2 = partial_df.agg({inputCol2: 'min'}).collect()[0][0]
    max2 = partial_df.agg({inputCol2: 'max'}).collect()[0][0]
    
    #Get splits
    splits1 = numpy.arange(min1 - .001, max1 + .001, .001) #widen interval by .0001 to make sure all values are bucketized
    splits2 = numpy.arange(min2 - .001, max2 + .001, .001)
    #Instantiate bucketizers
    bucketizer1 = Bucketizer(splits = splits1, inputCol = inputCol1, outputCol = 'bucketed1')
    bucketizer2 = Bucketizer(splits = splits2, inputCol = inputCol2, outputCol = 'bucketed2')
    #Bucketize both columns and add them to our df
    partial_df = bucketizer1.transform(partial_df)
    partial_df = bucketizer2.transform(partial_df)
    
    ###Vectorize our buckets
    
    # This will return a new DF with all the columns + unique id
    partial_df = partial_df.withColumn("id", monotonically_increasing_id())

    #Create('assemble') a vector of longitude and latitude to feed into our model as
    # a single predictor variable
    assembler = VectorAssembler(
        inputCols = ['bucketed1', 'bucketed2'],
        outputCol = 'featuresVec')

    #Run our df through the assembler
    partial_df = assembler.transform(partial_df)

    
    ###KMeans Clustering
    
    #Instantiate
    kmeans = KMeans().setK(numClust).setFeaturesCol('featuresVec')
    model = kmeans.fit(partial_df)
    
    #Get clusters and join them with partial_df
    transformed = model.transform(partial_df).select('prediction', 'id')
    partial_df = transformed.join(partial_df, 'id')
    
    #Rename predictions
    partial_df = partial_df.withColumnRenamed('prediction', 'CoordinateCluster')

    
    return partial_df['CoordinateCluster']



## 'Features' work:

I have commented the rows, for more thurough explanations look in `rentalPrice_jonas.ipynb`

In [12]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.clustering import BisectingKMeans
import pyspark.sql.types as typ

def get_features_column(df, inputCol = "features"):
    
    #creates 1 sting of the features
    string_assembler = F.UserDefinedFunction(lambda x: ','.join(x), typ.StringType())
    df = df.withColumn(inputCol, string_assembler(df[inputCol]))
    #lower case everything
    df = df.withColumn(inputCol, F.lower(df[inputCol]))
    #adds feature "missing features" to NaN
    df = df.withColumn(inputCol, 
                             F.when(df[inputCol] == '', 'missing features')
                             .otherwise(df[inputCol]))
    #split df on "," and "*" stores as new data frame
    feat_df = df.withColumn("features_list", F.split(df[inputCol], ',| \* '))
    #explodes the features into column "ex_features_list"
    feat_df_ex = feat_df.withColumn("ex_features_list", F.explode(feat_df["features_list"]))
    #creates clustering data frame with only column "ex_features_list"
    clustering_df = feat_df_ex[["ex_features_list"]]
    #renames the column
    clustering_df = clustering_df.withColumnRenamed("ex_features_list", "text")

    #creates a tokenizer 
    tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
    #removes stop words
    remover = StopWordsRemover(inputCol="tokens", outputCol="stopWordsRemovedTokens")
    #hashes the features into sparse vectors
    hashingTF = HashingTF(inputCol="stopWordsRemovedTokens", outputCol="rawFeatures", numFeatures=2000)
    #invers document frequency - importance of the work (kind of)
    idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
    
    #creates and fits the pipeline
    pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])
    pipelined_df = pipeline.fit(clustering_df).transform(clustering_df)
    
    #Set the number of clusters determined in rentalPrice_jonas.ipynb
    num_k = 20
    #creates the k-means
    km = BisectingKMeans(k = num_k)
    #fits it to the pipelined data frame
    model = km.fit(pipelined_df)
    #transform into the results
    results = model.transform(pipelined_df)
    #changes the name of the column "prediction" to "cluster"
    results = results.withColumnRenamed("prediction", "clusters")
    
    return results

Testing the function

In [13]:
features_col = get_features_column(train_data_df, "features")

In [14]:
features_col

DataFrame[text: string, tokens: array<string>, stopWordsRemovedTokens: array<string>, rawFeatures: vector, features: vector, clusters: int]

In [17]:
features_col.show(10)

+--------------------+--------------------+----------------------+--------------------+--------------------+--------+
|                text|              tokens|stopWordsRemovedTokens|         rawFeatures|            features|clusters|
+--------------------+--------------------+----------------------+--------------------+--------------------+--------+
|    missing features| [missing, features]|   [missing, features]|(2000,[1525,1755]...|(2000,[1525,1755]...|      16|
|             doorman|           [doorman]|             [doorman]|  (2000,[825],[1.0])|(2000,[825],[2.55...|      10|
|            elevator|          [elevator]|            [elevator]|   (2000,[31],[1.0])|(2000,[31],[2.335...|       0|
|      fitness center|   [fitness, center]|     [fitness, center]|(2000,[1173,1966]...|(2000,[1173,1966]...|      14|
|        cats allowed|     [cats, allowed]|       [cats, allowed]|(2000,[1350,1966]...|(2000,[1350,1966]...|       3|
|        dogs allowed|     [dogs, allowed]|       [dogs,

## Manager work:

In [18]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Bucketizer
from collections import OrderedDict

def get_manager_bucket_column(df, inputColum = "manager_id", dictionary = ""):
    
    if 'interest_level' in train_data_df.columns:
        string_indexer = StringIndexer(inputCol = inputColum, outputCol = "manager_idx")
        manager_df = string_indexer.fit(df).transform(df)
    
        manager_join = manager_df[["manager_idx"]].groupBy("manager_idx").count()
    
        manager_df = manager_df.join(manager_join, on = "manager_idx", how = "left")
    
        manager_df = manager_df.withColumn("interest_level", F.when(manager_df["interest_level"] == 'low', 0)
                                         .when(manager_df["interest_level"] == 'medium', 1)
                                         .otherwise(2))
    
        manager_skill = manager_df.groupBy("manager_idx").agg({"interest_level": "mean"})
        manager_skill = manager_skill.withColumnRenamed("avg(interest_level)", "manager_skill")
        
        splits = [0,1,2,float("Inf")]
        buck = Bucketizer(splits = splits, inputCol = "manager_skill", outputCol = "manager_bucket")
        manager_skill_bucket = buck.transform(manager_skill)
        
        manager_dictionary = manager_skill_bucket[["manager_idx", "manager_bucket"]]\
                                            .toPandas()\
                                            .set_index('manager_idx')\
                                            .T.to_dict('index')
        
        return manager_skill_bucket["manager_bucket"], manager_dictionary
    else:
        
        schema = StructType([
                    StructField('manager_idx', StringType(), True),
                    StructField('manager_bucket', StringType(), True)
                    ])

        dtaRDD = spark.sparkContext.parallelize(dictionary) \
                    .map(lambda x: Row(**OrderedDict(sorted(x.items()))))

        dict_df = spark.createDataFrame(dtaRdd, schema) 
        
        
        
        temp_df = dict_df.dropDuplicates(["manager_id"])[["manager_id", "manager_bucket"]]
    
        test_df = df.join(temp_df, on = "manager_id", how = "left")
        test_df = test_df.na.fill(0)
    
        return test_df["manager_bucket"]

In [19]:
(train, test) = train_data_df.randomSplit(seed = 1337, weights = [0.8, 0.2])
test = test.drop("manager_skill")

In [20]:
bucket_col, manager_dictionary = get_manager_bucket_column(train)
manager_dictionary

{'manager_bucket': {299.0: 0.0,
  305.0: 0.0,
  496.0: 0.0,
  558.0: 0.0,
  596.0: 0.0,
  692.0: 0.0,
  769.0: 0.0,
  934.0: 0.0,
  1051.0: 0.0,
  1761.0: 0.0,
  2734.0: 0.0,
  2815.0: 0.0,
  2862.0: 2.0,
  147.0: 1.0,
  170.0: 0.0,
  184.0: 0.0,
  576.0: 0.0,
  720.0: 0.0,
  782.0: 0.0,
  810.0: 0.0,
  1369.0: 0.0,
  1587.0: 1.0,
  1765.0: 1.0,
  2382.0: 0.0,
  2523.0: 2.0,
  3029.0: 0.0,
  160.0: 0.0,
  169.0: 0.0,
  608.0: 0.0,
  735.0: 0.0,
  1777.0: 0.0,
  1988.0: 1.0,
  2317.0: 0.0,
  2373.0: 0.0,
  2638.0: 0.0,
  2741.0: 0.0,
  3108.0: 0.0,
  8.0: 0.0,
  67.0: 0.0,
  70.0: 0.0,
  311.0: 0.0,
  379.0: 0.0,
  486.0: 1.0,
  571.0: 0.0,
  878.0: 0.0,
  994.0: 0.0,
  1350.0: 0.0,
  1584.0: 0.0,
  1622.0: 0.0,
  1767.0: 0.0,
  1846.0: 0.0,
  1951.0: 0.0,
  1976.0: 0.0,
  2164.0: 0.0,
  2527.0: 0.0,
  2822.0: 0.0,
  3009.0: 0.0,
  3085.0: 0.0,
  168.0: 0.0,
  702.0: 0.0,
  818.0: 0.0,
  1095.0: 0.0,
  2167.0: 0.0,
  2587.0: 0.0,
  2705.0: 0.0,
  2821.0: 1.0,
  3245.0: 0.0,
  0.0: 0.0,


In [22]:
manager_dictionary.keys()

dict_keys(['manager_bucket'])

In [23]:
manager_dictionary["manager_bucket"]

{299.0: 0.0,
 305.0: 0.0,
 496.0: 0.0,
 558.0: 0.0,
 596.0: 0.0,
 692.0: 0.0,
 769.0: 0.0,
 934.0: 0.0,
 1051.0: 0.0,
 1761.0: 0.0,
 2734.0: 0.0,
 2815.0: 0.0,
 2862.0: 2.0,
 147.0: 1.0,
 170.0: 0.0,
 184.0: 0.0,
 576.0: 0.0,
 720.0: 0.0,
 782.0: 0.0,
 810.0: 0.0,
 1369.0: 0.0,
 1587.0: 1.0,
 1765.0: 1.0,
 2382.0: 0.0,
 2523.0: 2.0,
 3029.0: 0.0,
 160.0: 0.0,
 169.0: 0.0,
 608.0: 0.0,
 735.0: 0.0,
 1777.0: 0.0,
 1988.0: 1.0,
 2317.0: 0.0,
 2373.0: 0.0,
 2638.0: 0.0,
 2741.0: 0.0,
 3108.0: 0.0,
 8.0: 0.0,
 67.0: 0.0,
 70.0: 0.0,
 311.0: 0.0,
 379.0: 0.0,
 486.0: 1.0,
 571.0: 0.0,
 878.0: 0.0,
 994.0: 0.0,
 1350.0: 0.0,
 1584.0: 0.0,
 1622.0: 0.0,
 1767.0: 0.0,
 1846.0: 0.0,
 1951.0: 0.0,
 1976.0: 0.0,
 2164.0: 0.0,
 2527.0: 0.0,
 2822.0: 0.0,
 3009.0: 0.0,
 3085.0: 0.0,
 168.0: 0.0,
 702.0: 0.0,
 818.0: 0.0,
 1095.0: 0.0,
 2167.0: 0.0,
 2587.0: 0.0,
 2705.0: 0.0,
 2821.0: 1.0,
 3245.0: 0.0,
 0.0: 0.0,
 69.0: 0.0,
 206.0: 0.0,
 524.0: 1.0,
 650.0: 0.0,
 758.0: 0.0,
 973.0: 0.0,
 1226.0: 

In [21]:
test_bucket_col = get_manager_bucket_column(train, dictionary = manager_dictionary)

## Description work:

In [3]:
from pyspark.sql.functions import isnan
from pyspark.sql.functions import when, lit, col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Make the replace function
def replace(column, value):
    return when(column != value, column).otherwise(lit("none"))

# Make the full function
def add_description_columns(df):
    # select only the description
    #train_data_df2 = df.select("interest_level","description")
    # clean blanks
    train4 = df.withColumn("description", replace(col("description"), '        '))
    train4 = train4.withColumn("description", replace(col("description"), ""))
    train4 = train4.withColumn("description", replace(col("description"), " "))
    train4 = train4.withColumn("description", replace(col("description"), "           "))
    # regular expression tokenizer
    regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W") # I don't know what W is...

    # stop words
    add_stopwords = ["a","the","it","of","the","is","and", # standard stop words
     "A","this","in","for"]
    stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

    # bag of words count
    countVectors = CountVectorizer(inputCol="filtered", outputCol="word_features", vocabSize=1000, minDF=5)
    
    pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])

    # Fit the pipeline to training documents.
    pipelineFit = pipeline.fit(train4)
    dataset = pipelineFit.transform(train4)
    dataset = dataset.withColumn("label", dataset["interest_level"].cast(IntegerType()))
    
    return dataset["word_features"]

In [52]:
new_data_3 = add_description_columns(new_df2)

In [4]:
word_features_col = add_description_columns(train_data_df)

In [13]:
word_features_col

Column<b'word_features'>

# Photos

In [17]:
# using the df where I made the label into integers
def photo_taker(df):
    train_data_df_p = df.select("interest_level", "photos")
    photo_rdd = train_data_df_p.rdd
    photo_rdd = photo_rdd.map(lambda x: (x[0], len(x[1])))
    photo_df = sqlCtx.createDataFrame(photo_rdd, ["label", "features"])
    return photo_df["features"]

In [18]:
photo_taker(train_data_df)

Column<b'features'>

In [83]:
# Same train/test setup as before
dataset5_rdd = photo_rdd.map(lambda x: (x[0], DenseVector(x[1:])))
dataset5_rdd.take(2)

+-----------+----------+---------+--------+--------------------+-------------------+--------------------+---------------+--------------------+--------+---------+--------------------+--------------------+-----+------------------+--------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----+------------------+--------------------+--------------------+--------------------+-----+
|manager_idx|listing_id|bathrooms|bedrooms|         building_id|            created|         description|display_address|            features|latitude|longitude|          manager_id|              photos|price|    street_address|interest_level|feature_cluster_0|feature_cluster_1|feature_clust

In [None]:
dataset5 = sqlCtx.createDataFrame(dataset5_rdd, ["label", "features"])
dataset5 = dataset5.withColumn("label", dataset5["label"].cast(IntegerType()))
dataset5.printSchema()
dataset5.show(2)

In [None]:
columns_num = [0, 2, 3, 4, 9, 10, 11, 13, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 37, 40]

In [89]:
features_to_combine = ['manager_idx',
                      'bathrooms',
                      'bedrooms',
                      'building_id',
                      'latitude',
                      'longitude',
                      'manager_id',
                      'price',
                      'feature_cluster_0',
                      'feature_cluster_1',
                      'feature_cluster_2',
                      'feature_cluster_3',
                      'feature_cluster_4',
                      'feature_cluster_5',
                      'feature_cluster_6',
                      'feature_cluster_7',
                      'feature_cluster_8',
                      'feature_cluster_9',
                      'feature_cluster_10',
                      'feature_cluster_11',
                      'feature_cluster_12',
                      'feature_cluster_13',
                      'feature_cluster_14',
                      'feature_cluster_15',
                      'feature_cluster_16',
                      'feature_cluster_17',
                      'feature_cluster_18',
                      'feature_cluster_19',
                      'manager_skill'
                      ]


In [94]:
new_data_3b = new_data_3.select((*(col(c).cast("float").alias(c) for c in features_to_combine)), "word_features","label")
new_data_3b

DataFrame[manager_idx: float, bathrooms: float, bedrooms: float, building_id: float, latitude: float, longitude: float, manager_id: float, price: float, feature_cluster_0: float, feature_cluster_1: float, feature_cluster_2: float, feature_cluster_3: float, feature_cluster_4: float, feature_cluster_5: float, feature_cluster_6: float, feature_cluster_7: float, feature_cluster_8: float, feature_cluster_9: float, feature_cluster_10: float, feature_cluster_11: float, feature_cluster_12: float, feature_cluster_13: float, feature_cluster_14: float, feature_cluster_15: float, feature_cluster_16: float, feature_cluster_17: float, feature_cluster_18: float, feature_cluster_19: float, manager_skill: float, word_features: vector, label: int]

In [95]:
# convert to rdd
new_data_3_rdd = new_data_3b.rdd

In [103]:
#new_data_3_rdd.take(1)
columns_num = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28]

In [104]:
input_rdd = new_data_3_rdd.map(lambda x: (x[30], DenseVector([x[i] for i in columns_num])))

In [105]:
input_rdd.take(2)

# Ugh, need to convert word_features to a dense vector, unlist it, and then combine with this rdd I think...
# We also have missing values...

[(0,
  DenseVector([299.0, 1.0, 3.0, nan, 40.7399, -73.9864, nan, 5595.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5556])),
 (0,
  DenseVector([299.0, 1.0, 2.0, nan, 40.7399, -73.9864, nan, 3995.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5556]))]