In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("Classificiation")\
    .getOrCreate()

In [None]:
data = spark.read.format("libsvm").load("lib.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 300 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=32).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])


# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(50)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       1.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       1.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0| 

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("lib.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
|       0.0|         0.0|(58,[0,1,2,3,4,5,...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0.0387331 


In [7]:
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# load data file.
inputData = spark.read.format("libsvm") \
    .load("lib.txt")

# generate the train/test split.
(train, test) = inputData.randomSplit([0.7, 0.3])

# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0378368
