In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
            .builder \
            .appName("Python Spark example") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()

In [3]:
df = spark.read.format('com.databricks.spark.csv') \
            .options(header='true', inferschema='true') \
            .load("bow_counts_toNum.csv",header=True);

In [4]:
df.select('Restaurant_Name','face','ship','warning','fuss','instruction').show(5)

+---------------+----+----+-------+----+-----------+
+---------------+----+----+-------+----+-----------+
|              1|   0|   0|      0|   0|          0|
|              1|   0|   0|      0|   0|          0|
|              1|   0|   0|      0|   0|          0|
|              1|   0|   0|      0|   0|          0|
|              1|   0|   0|      0|   0|          0|
+---------------+----+----+-------+----+-----------+
only showing top 5 rows



In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

In [6]:
# Convert to float format
def string_to_float(x):
    return float(x)

#
def condition(r):
    if (r == 1):
        label = "Ess-a-Bagel"
    elif(r == 2):
        label = "Momofuku Noodle Bar"
    elif(r == 3):
        label = "Totto Ramen"
    elif(r == 4):
        label = "Ippudo Westside"
    elif(r == 5):
        label = "The Halal Guys"
    elif(r == 6):
        label = "Beauty & Essex"
    elif(r == 7):
        label = "Lombardi's Pizza"
    elif(r == 8):
        label = "Carmine's Italian Restaurant - Times Square"
    elif(r == 9):
        label = "Ippudo NY"
    elif(r == 10):
        label = "Buddakan"
    return label

string_to_float_udf = udf(string_to_float, DoubleType())
quality_udf = udf(lambda x: condition(x), StringType())

df = df.withColumn("Restaurant_Name", quality_udf("Restaurant_Name"))
df.select('Restaurant_Name','face','ship','warning','fuss','instruction').show(5)

+---------------+----+----+-------+----+-----------+
+---------------+----+----+-------+----+-----------+
|    Ess-a-Bagel|   0|   0|      0|   0|          0|
|    Ess-a-Bagel|   0|   0|      0|   0|          0|
|    Ess-a-Bagel|   0|   0|      0|   0|          0|
|    Ess-a-Bagel|   0|   0|      0|   0|          0|
|    Ess-a-Bagel|   0|   0|      0|   0|          0|
+---------------+----+----+-------+----+-----------+
only showing top 5 rows



In [7]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [8]:
transformed = transData(df)
transformed.show(5)

+--------------------+-----------+
|            features|      label|
+--------------------+-----------+
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|
+--------------------+-----------+
only showing top 5 rows



In [9]:
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(transformed)
labelIndexer.transform(transformed).show(5, True)

+--------------------+-----------+------------+
|            features|      label|indexedLabel|
+--------------------+-----------+------------+
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|         3.0|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|         3.0|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|         3.0|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|         3.0|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|         3.0|
+--------------------+-----------+------------+
only showing top 5 rows



In [13]:
# Automatically identify categorical features, and index them.
featureIndexer =VectorIndexer(inputCol="features", \
                              outputCol="indexedFeatures").fit(transformed)
featureIndexer.transform(transformed).show(5, True)

+--------------------+-----------+--------------------+
|            features|      label|     indexedFeatures|
+--------------------+-----------+--------------------+
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|[0.0,0.0,0.0,0.0,...|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|[0.0,0.0,0.0,0.0,...|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|[0.0,0.0,0.0,0.0,...|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|[0.0,0.0,0.0,0.0,...|
|[0.0,0.0,0.0,0.0,...|Ess-a-Bagel|[0.0,0.0,0.0,0.0,...|
+--------------------+-----------+--------------------+
only showing top 5 rows



In [14]:
# Split the data into training and test sets (20% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.8, 0.2])

In [21]:
from time import time
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
t0 = time()
logr = LogisticRegression(featuresCol='indexedFeatures', labelCol='indexedLabel')
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, logr,labelConverter])
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)
print("time: ", time()-t0,'seconds.')

# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (accuracy))


time:  7.071635961532593 seconds.
+--------------------+--------------------+----------------+
|            features|               label|  predictedLabel|
+--------------------+--------------------+----------------+
|[0.0,0.0,0.0,0.0,...|      Beauty & Essex|     Ess-a-Bagel|
|[0.0,0.0,0.0,0.0,...|    Lombardi's Pizza|  The Halal Guys|
|[0.0,0.0,0.0,0.0,...|Carmine's Italian...| Ippudo Westside|
|[0.0,0.0,0.0,0.0,...|    Lombardi's Pizza|Lombardi's Pizza|
|[0.0,0.0,0.0,0.0,...|         Totto Ramen|       Ippudo NY|
+--------------------+--------------------+----------------+
only showing top 5 rows

Test Accuracy = 0.669565


In [22]:
from pyspark.ml.classification import RandomForestClassifier
t0 = time()
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)
print("time: ", time()-t0,'seconds.')

# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (accuracy))

rfModel = model.stages[-2]
print(rfModel)  # summary only

time:  13.801464080810547 seconds.
+--------------------+--------------------+--------------------+
|            features|               label|      predictedLabel|
+--------------------+--------------------+--------------------+
|[0.0,0.0,0.0,0.0,...|      Beauty & Essex|Carmine's Italian...|
|[0.0,0.0,0.0,0.0,...|    Lombardi's Pizza|    Lombardi's Pizza|
|[0.0,0.0,0.0,0.0,...|Carmine's Italian...|         Ess-a-Bagel|
|[0.0,0.0,0.0,0.0,...|    Lombardi's Pizza|    Lombardi's Pizza|
|[0.0,0.0,0.0,0.0,...|         Totto Ramen|      The Halal Guys|
+--------------------+--------------------+--------------------+
only showing top 5 rows

Test Accuracy = 0.513043
RandomForestClassificationModel (uid=RandomForestClassifier_79dc05989f74) with 10 trees


In [23]:
from pyspark.ml.classification import NaiveBayes
t0 = time()
nb = NaiveBayes(featuresCol='indexedFeatures', labelCol='indexedLabel')
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, nb,labelConverter])
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)
print("time: ", time()-t0,'seconds.')

# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (accuracy))


time:  7.719007968902588 seconds.
+--------------------+--------------------+--------------------+
|            features|               label|      predictedLabel|
+--------------------+--------------------+--------------------+
|[0.0,0.0,0.0,0.0,...|      Beauty & Essex|Carmine's Italian...|
|[0.0,0.0,0.0,0.0,...|    Lombardi's Pizza|    Lombardi's Pizza|
|[0.0,0.0,0.0,0.0,...|Carmine's Italian...|Carmine's Italian...|
|[0.0,0.0,0.0,0.0,...|    Lombardi's Pizza|    Lombardi's Pizza|
|[0.0,0.0,0.0,0.0,...|         Totto Ramen|         Totto Ramen|
+--------------------+--------------------+--------------------+
only showing top 5 rows

Test Accuracy = 0.773913


In [24]:
from pyspark.ml.classification import DecisionTreeClassifier
t0 = time()
# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree,labelConverter])
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)
print("time: ", time()-t0,'seconds.')

# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (accuracy))

rfModel = model.stages[-2]
print(rfModel)  # summary only

time:  13.733898878097534 seconds.
+--------------------+--------------------+----------------+
|            features|               label|  predictedLabel|
+--------------------+--------------------+----------------+
|[0.0,0.0,0.0,0.0,...|      Beauty & Essex|  Beauty & Essex|
|[0.0,0.0,0.0,0.0,...|    Lombardi's Pizza|Lombardi's Pizza|
|[0.0,0.0,0.0,0.0,...|Carmine's Italian...|  Beauty & Essex|
|[0.0,0.0,0.0,0.0,...|    Lombardi's Pizza|  Beauty & Essex|
|[0.0,0.0,0.0,0.0,...|         Totto Ramen|       Ippudo NY|
+--------------------+--------------------+----------------+
only showing top 5 rows

Test Accuracy = 0.443478
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_f97a68bc056b) of depth 5 with 15 nodes
