### Classification
### Prediction for FOOD related business
To have clearity in understanding which feature columns contributes in our prediction we can broadly categorize the business into different Categories like Food, Entertaiment, Medical, Services, Shooping, Education etc. For predicting the popularity of the Yelp business we decide to choose Food related business and feature columns.

Sub categories under Food category are 'Wine Bars','Vietnamese','vegetarian','vegan','Turkish','Thai','Tex-Mex','Tea Rooms','Tapas/Small Plates','Tapas Bars','Taiwanese','Szechuan','Sushi Bars','Steakhouses','Soup','Soul Food','Seafood','Sandwiches','Salad','Russian','Restaurants','restaurant' etc.

The feature columns related to food are review_count,stars,Take-out,GoodFor_lunch,GoodFor_dinner,GoodFor_breakfast,Noise_Level, Takes_Reservations,Delivery,Parking_lot,WheelchairAccessible,Alcohol,WaiterService,Wi-Fi.

California State University, Los Angeles

Author: Ruchi Singh

Instructor: Jongwook Woo

Date: 05/20/2017

### Download Data
download the "Business-Food.csv" file and upload in Databricks. Data-> default-> Create Table. Rename the table as "Food2" and check for all the columns datatype.

This is the data to be used for training the machine learning algorithm.

### Logestic regression
The Logestic Regression classification model is used to predict the stars (popularity) for the business.The assumtion made here is that the business is unpopular if the Star is less than 3 and the business is popular if the Stars are more than 3.

### Prepare the Data
First, import the libraries you will need and prepare the training and test data:


In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession 
spark = SparkSession.builder.master("local").appName("classification").getOrCreate()

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.sql import SQLContext

### Load Food table
Food table created is now loaded in Spark using SQL query:

In [3]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [51]:
data = spark.read.csv('Business-Food.csv', inferSchema=True , header = True)

In [52]:
data.registerTempTable("food2")
csv = sqlContext.sql("Select * from food2")

In [53]:
data = csv.select("review_count","Take-out", "GoodFor_lunch", "GoodFor_dinner", "GoodFor_breakfast"
                  ,"Noise_Level", "Takes_Reservations","Delivery","Parking_lot", "WheelchairAccessible"
                  ,"Alcohol", "WaiterService","Wi-Fi","stars")

In [13]:
data.show(5)

+------------+--------+-------------+--------------+-----------------+-----------+------------------+--------+-----------+--------------------+--------+-------------+-----+-----+
|review_count|Take-out|GoodFor_lunch|GoodFor_dinner|GoodFor_breakfast|Noise_Level|Takes_Reservations|Delivery|Parking_lot|WheelchairAccessible| Alcohol|WaiterService|Wi-Fi|stars|
+------------+--------+-------------+--------------+-----------------+-----------+------------------+--------+-----------+--------------------+--------+-------------+-----+-----+
|           4|    TRUE|        FALSE|         FALSE|            FALSE|    average|             FALSE|   FALSE|      FALSE|                 N/A|    none|        FALSE|  N/A|    1|
|          20|    TRUE|         TRUE|         FALSE|            FALSE|    average|             FALSE|   FALSE|      FALSE|                 N/A|full_bar|         TRUE|   no|    1|
|          21|    TRUE|        FALSE|          TRUE|            FALSE|       loud|             FALSE|   F

### String Indexer
StringIndexer encodes a string column of labels to a column of label indices.

In [14]:
def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdata = df
    for c in cols:
        si = StringIndexer(inputCol=c, outputCol=c+"-x")
        sm = si.fit(newdata)
        newdata = sm.transform(newdata).drop(c)
        newdata = newdata.withColumnRenamed(c+"-x", c)
    return newdata

In [15]:
dfnumeric = indexStringColumns(data, ["Take-out","GoodFor_lunch", "GoodFor_dinner", "GoodFor_breakfast"
                                      ,"Noise_Level", "Takes_Reservations","Delivery","Parking_lot",
                                      "WheelchairAccessible","Alcohol", "WaiterService","Wi-Fi"])

In [16]:
dfnumeric.show(25)

+------------+-----+--------+-------------+--------------+-----------------+-----------+------------------+--------+-----------+--------------------+-------+-------------+-----+
|review_count|stars|Take-out|GoodFor_lunch|GoodFor_dinner|GoodFor_breakfast|Noise_Level|Takes_Reservations|Delivery|Parking_lot|WheelchairAccessible|Alcohol|WaiterService|Wi-Fi|
+------------+-----+--------+-------------+--------------+-----------------+-----------+------------------+--------+-----------+--------------------+-------+-------------+-----+
|           4|    1|     0.0|          0.0|           0.0|              0.0|        0.0|               0.0|     0.0|        0.0|                 0.0|    1.0|          2.0|  0.0|
|          20|    1|     0.0|          2.0|           0.0|              0.0|        0.0|               0.0|     0.0|        0.0|                 0.0|    2.0|          0.0|  1.0|
|          21|    1|     0.0|          0.0|           2.0|              0.0|        3.0|               0.0|   

### Encoder
One-hot encoding maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, in classification model, to use categorical features.

In [23]:
def oneHotEncodeColumns(df, cols):
    from pyspark.ml.feature import OneHotEncoder
    newdf = df
    for c in cols:
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        onehotenc = onehotenc.fit(newdf)
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

In [24]:
dfhot = oneHotEncodeColumns(dfnumeric, ["Take-out","GoodFor_lunch", "GoodFor_dinner", "GoodFor_breakfast"
                                        ,"Noise_Level", "Takes_Reservations","Delivery","Parking_lot"
                                        , "WheelchairAccessible","Alcohol", "WaiterService","Wi-Fi"])

In [25]:
dfhot.show(25)

+------------+-----+-------------+-------------+--------------+-----------------+-------------+------------------+-------------+-------------+--------------------+-------------+-------------+-------------+
|review_count|stars|     Take-out|GoodFor_lunch|GoodFor_dinner|GoodFor_breakfast|  Noise_Level|Takes_Reservations|     Delivery|  Parking_lot|WheelchairAccessible|      Alcohol|WaiterService|        Wi-Fi|
+------------+-----+-------------+-------------+--------------+-----------------+-------------+------------------+-------------+-------------+--------------------+-------------+-------------+-------------+
|           4|    1|(3,[0],[1.0])|(3,[0],[1.0])| (3,[0],[1.0])|    (3,[0],[1.0])|(5,[0],[1.0])|     (3,[0],[1.0])|(3,[0],[1.0])|(3,[0],[1.0])|       (3,[0],[1.0])|(4,[1],[1.0])|(3,[2],[1.0])|(4,[0],[1.0])|
|          20|    1|(3,[0],[1.0])|(3,[2],[1.0])| (3,[0],[1.0])|    (3,[0],[1.0])|(5,[0],[1.0])|     (3,[0],[1.0])|(3,[0],[1.0])|(3,[0],[1.0])|       (3,[0],[1.0])|(4,[2],[1.0])

### Vector Assembler
VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression.

In [26]:
va = VectorAssembler(outputCol="features", inputCols=list(set(dfhot.columns)-set(['stars'])))
lpoints = va.transform(dfhot).select("features", "stars").withColumnRenamed("stars","label")

### Data Split
Split the data into training and test data in the ratio 80:20 using a random split.

In [27]:
splits = lpoints.randomSplit([0.8, 0.2])
adulttrain = splits[0].cache()
adultvalid = splits[1].cache()

### Define the Pipeline
Now define a pipeline that creates a feature vector and trains a classification model

In [28]:
lr = LogisticRegression(regParam=0.01, maxIter=1000, fitIntercept=True)
lrmodel = lr.fit(adulttrain)
lrmodel = lr.setParams(regParam=0.01, maxIter=500, fitIntercept=True).fit(adulttrain)
lrmodel.intercept
validpredicts = lrmodel.transform(adultvalid)

In [29]:
validpredicts.show(5)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(41,[0,3,6,9,10,1...|    0|[-1.0590798658517...|[0.25748533279105...|       1.0|
|(41,[0,3,6,9,10,1...|    1|[-1.0590798658517...|[0.25748533279105...|       1.0|
|(41,[0,3,6,9,10,1...|    1|[-1.0836492318831...|[0.2528160548031,...|       1.0|
|(41,[0,3,6,9,10,1...|    1|[-0.7249206686152...|[0.32631033987436...|       1.0|
|(41,[0,3,6,9,10,1...|    1|[-0.0956151753622...|[0.47611440075399...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



### Evaluate the model
Using a BinaryClassificationEvaluator the classification model used on the data is evaluated.

In [30]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bceval = BinaryClassificationEvaluator()
bceval.evaluate(validpredicts)
bceval.getMetricName()
bceval.setMetricName("areaUnderPR")
bceval.evaluate(validpredicts)

0.9256845639198134

In [31]:
display(validpredicts)

DataFrame[features: vector, label: int, rawPrediction: vector, probability: vector, prediction: double]

### Cross validation
It is is to ensure that every example from the original dataset has the same chance of appearing in the training and testing set.

In [32]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(2)
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, 
                                       [1000]).addGrid(lr.regParam, 
                                                       [0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]).build()
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(adulttrain)
BinaryClassificationEvaluator().evaluate(cvmodel.bestModel.transform(adultvalid))

0.7094784502275469

### Tune Parameters
You can tune parameters to find the best model for your data. A simple way to do this is to use **TrainValidationSplit** to evaluate each combination of parameters defined in a **ParameterGrid** against a subset of the training data in order to find the best performing parameters.
#### Regularization 
It is a way of avoiding Imbalances in the way that the data is trained against the training data so that the model ends up being over fit to the training data. In other words It works really well with the training data but it doesn't generalize well with other data. That we can use a **regularization parameter** to vary the way that the model balances that way.
#### Training ratio of 0.8
It is going to use 80% of the the data that it's got in its training set to train the model and then the remaining 20% is going to use to validate the trained model.

In **ParamGridBuilder**, all possible combinations are generated from regParam, maxIter, threshold. So it is going to try each combination of the parameters with 80% of the the data to train the model and 20% to to validate it.

In [33]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1, 0.01]).addGrid(lr.maxIter,
                                                                              [10, 5]).addGrid(lr.threshold, 
                                                                                        [0.35, 0.30]).build()
tvs = TrainValidationSplit(estimator=lr, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, 
                           trainRatio=0.8)
model = tvs.fit(adulttrain)

### Test the Model
Now you're ready to apply the model to the test data.

In [34]:
prediction = model.transform(adultvalid)
# LogisticRegression
predicted = prediction.select("features", "prediction", "probability", "label")

predicted.show(100)

+--------------------+----------+--------------------+-----+
|            features|prediction|         probability|label|
+--------------------+----------+--------------------+-----+
|(41,[0,3,6,9,10,1...|       1.0|[0.25938459523370...|    0|
|(41,[0,3,6,9,10,1...|       1.0|[0.25938459523370...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.25516207088584...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.34044522478200...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.47351113031675...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.46983994129986...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.24072712167424...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.21104595990405...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.19627875559802...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.27189534348392...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.26754272839388...|    0|
|(41,[0,3,6,9,10,1...|       1.0|[0.17189082017934...|    1|
|(41,[0,3,6,9,10,1...|       1.0|[0.16484135361452...|    1|
|(41,[0,3,6,9,10,1...|  

### Compute Confusion Matrix Metrics: Only for Classification Logistic Regression not for Linear Regression
Classifiers are typically evaluated by creating a confusion matrix, which indicates the number of:

1. True Positives
2. True Negatives
3. False Positives
4. False Negatives

From these core measures, other evaluation metrics such as precision and recall can be calculated.

### Result
Precision (0.8464591933947285), Recall (1.0): Precision becomes a little bit lower but the precision becomes much higher than previous no tuning example.

In [36]:
tp = float(predicted.filter("prediction == 1.0 AND label == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND label == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND label == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND label == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|            5331.0|
|       FP|             967.0|
|       TN|               2.0|
|       FN|               0.0|
|Precision|0.8464591933947285|
|   Recall|               1.0|
+---------+------------------+



### Review the Area Under ROC: Only for Classification Logistic Regression 
Another way to assess the performance of a classification model is to measure the area under a ROC curve for the model. the spark.ml library includes a **BinaryClassificationEvaluator** class that you can use to compute this.

In [37]:
display(metrics)

DataFrame[metric: string, value: double]

In [40]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
aur = evaluator.evaluate(validpredicts)
print ("AUR = ", aur)

AUR =  0.5050194173573229
