In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler,StringIndexer,VectorIndexer,IndexToString
from pyspark.ml.classification import MultilayerPerceptronClassifier,MultilayerPerceptronClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
spark = SparkSession.builder.appName("RN").getOrCreate()

In [4]:
def condition(r):
    if (r == 1):
        label = "aprobado"    
    else:
        label = "no_aprobado"
    return label

return_condition_udf = F.udf(lambda x: condition(x), StringType())

In [6]:
def get_df_columns_train(df):
    df_train = df.select(F.col("v_0").cast("String"),
                   F.col("v_1").cast("Double"),
                   F.col("v_2").cast("Double"),
                   F.col("v_3").cast("Double"),
                   F.col("v_4").cast("Double"),
                   F.col("v_5").cast("Double"),
                   F.col("v_6").cast("Double"),
                   F.col("v_7").cast("Double"),
                   F.col("v_8").cast("Double"),
                   F.col("v_9").cast("Double"), 
                   F.col("v_10").cast("Double"),
                   F.col("v_11").cast("Double"),
                   F.col("v_12").cast("double").alias("label"))
    df_train = df_train.withColumn("label", return_condition_udf(F.col("label")))
    return df_train

def get_df_columns_test(df):
    df_test = df.select(F.col("v_0").cast("String"),
                   F.col("v_1").cast("Double"),
                   F.col("v_2").cast("Double"),
                   F.col("v_3").cast("Double"),
                   F.col("v_4").cast("Double"),
                   F.col("v_5").cast("Double"),
                   F.col("v_6").cast("Double"),
                   F.col("v_7").cast("Double"),
                   F.col("v_8").cast("Double"),
                   F.col("v_9").cast("Double"), 
                   F.col("v_10").cast("Double"),
                   F.col("v_11").cast("Double"))
    return df_test

In [7]:
def df_train():
    df = spark.read.format("com.databricks.spark.csv")\
               .option("header", "true")\
               .load("data/train.csv") 
            
    my_df = get_df_columns_train(df)
    
    my_df = get_df_columns_train(df)
    feature_columns = my_df.columns[1:-1]
    
    #data preparations
    assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')   
    dataset = assembler.transform(df)
    
    labelIndexer   = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(dataset)
    featureIndexer  = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=2).fit(dataset)
    
    (trainingData, testData) = dataset.randomSplit([0.8, 0.2])
    
    layers = [12, 5, 4, 4, 7]
    
    trainer = MultilayerPerceptronClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures",\
                                         maxIter=100, layers=layers, blockSize=128, seed=1234)
    labelConverter  = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                                labels=labelIndexer.labels)

    pipeline = Pipeline(stages=[labelIndexer, featureIndexer, trainer, labelConverter])
    model = pipeline.fit(trainingData)
    predictions = model.transform(testData)
    model.write().overwrite().save("data/model/RN")
    predictions.select("prediction", "indexedLabel", "features").show(5)
    evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Predictions accuracy = %g, Test Error = %g" % (accuracy,(1.0 - accuracy)))
    
    return model

In [8]:
def df_test():
    df = spark.read.format("com.databricks.spark.csv")\
              .option("header", "true")\
              .load("test.csv") 
    df = get_df_columns_test(df)

    assembler = VectorAssembler(inputCols=df.columns[1:], outputCol='features')
    pipeline = Pipeline(stages=[assembler])
    pipelineModel = pipeline.fit(df)
    
    dataset = pipelineModel.transform(df)
    
     
    loadedPipeline = PipelineModel.read().load("/jonathan/RN")
    predictions = loadedPipeline.transform(dataset)

    return predictions

In [9]:
my_model = df_train()

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|(16,[0,3,7,8,9,12...|
|       0.0|         0.0|(16,[0,3,7,8,9,12...|
|       2.0|         2.0|(16,[2,5,7,8,11,1...|
|       3.0|         3.0|(16,[2,5,6,12],[1...|
|       0.0|         0.0|(16,[3,5,6,7,8,9,...|
+----------+------------+--------------------+
only showing top 5 rows

Predictions accuracy = 0.823529, Test Error = 0.176471


In [10]:
df2 = df_test()

In [11]:
df2_select = df2.select("animal_name", "predictedLabel")

In [12]:
df2_select.toPandas()

Unnamed: 0,animal_name,predictedLabel
0,skimmer,Bird
1,skua,Bird
2,slowworm,Fish
3,slug,Fish
4,sole,Fish
5,sparrow,Bird
6,squirrel,Mamal
7,starfish,Invertebrate
8,stingray,Fish
9,swan,Bird


In [195]:
model2 = PipelineModel.read().load("RN/mlp")
treeModel = model.stages[2]
treeModel.layers
treeModel.weights
treeModel.params