In [1]:
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest

sc = SparkContext(appName="CTRPredictor")

In [20]:
sample = 'file:/root/notebooks/CTRPrediction/dac_sample.txt'
sample_data = sc.textFile(sample, 2).map(lambda x: x.replace('\t', ',')) 


print "Data size is {}".format(sample_data.count())
sample_data.take(2)


Data size is 100000


[u'0,1,1,5,0,1382,4,15,2,181,1,2,,2,68fd1e64,80e26c9b,fb936136,7b4723c4,25c83c98,7e0ccccf,de7995b8,1f89b562,a73ee510,a8cd5504,b2cb9c98,37c9c164,2824a5f6,1adce6ef,8ba8b39a,891b62e7,e5ba7672,f54016b9,21ddcdc9,b1252a9d,07b5194c,,3a171ecb,c5c50484,e8b83407,9727dd16',
 u'0,2,0,44,1,102,8,2,2,4,1,1,,4,68fd1e64,f0cf0024,6f67f7e5,41274cd7,25c83c98,fe6b92e5,922afcc0,0b153874,a73ee510,2b53e5fb,4f1b46f3,623049e6,d7020589,b28479f6,e6c5b5cd,c92f3b61,07c540c4,b04e4670,21ddcdc9,5840adea,60f6221e,,3a171ecb,43f13e8b,e8b83407,731c3655']

In [21]:
# Use randomSplit with weights and seed to get training, test and validation data sets
weights = [.8, .1, .1]
seed = 42

rawTrainData, rawValidationData, rawTestData = sample_data.randomSplit(weights, seed)
# Cache the data
rawTrainData.cache()
rawValidationData.cache()
rawTestData.cache()

nTrain = (rawTrainData.count())
nVal = rawValidationData.count()
nTest = rawTestData.count()
print nTrain, nVal, nTest, nTrain + nVal + nTest


79911 10075 10014 100000


In [5]:
# Import SQLContext 
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
import math

# Parse data and create data frames
def parseData(data, sqlContext):
 
    #Split the csv file by comma and convert each line to a tuple.
    parts = data.map(lambda l: l.split(",", -1))
    features = parts.map(lambda p: Row(Label=(p[0]), IntFeature1=(p[1]), IntFeature2=(p[2]), IntFeature3=p[3],
                                IntFeature4=(p[4]), IntFeature5=(p[5]), IntFeature6=p[6], IntFeature7=p[7], 
                                IntFeature8=(p[8]), IntFeature9=(p[9]), IntFeature10=p[10], IntFeature11=p[11],
                                IntFeature12=(p[12]), IntFeature13=(p[13]), CatFeature1=p[14], CatFeature2=p[15],
                                CatFeature3=p[16], CatFeature4=p[17],CatFeature5=p[18], CatFeature6=p[19],
                                CatFeature7=p[20], CatFeature8=p[21],CatFeature9=p[22], CatFeature10=p[23],
                                CatFeature11=p[24], CatFeature12=p[25],CatFeature13=p[26]))
                                 

    # Apply the schema to the RDD.
    return sqlContext.createDataFrame(features)

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

# Register the DataFrame as a table.
schemaClicks = parseData(sample_data, sqlContext)
schemaClicks.registerTempTable("clicks")
schemaClicks.printSchema()

schemaClicksTrain = parseData(rawTrainData, sqlContext)
schemaClicksTrain.registerTempTable("clicksTrain")


schemaClicksTest = parseData(rawTestData, sqlContext)
schemaClicksTest.registerTempTable("clicksTest")


schemaClicksValidation = parseData(rawValidationData, sqlContext)
schemaClicksValidation.registerTempTable("clicksValidation")



root
 |-- CatFeature1: string (nullable = true)
 |-- CatFeature10: string (nullable = true)
 |-- CatFeature11: string (nullable = true)
 |-- CatFeature12: string (nullable = true)
 |-- CatFeature13: string (nullable = true)
 |-- CatFeature2: string (nullable = true)
 |-- CatFeature3: string (nullable = true)
 |-- CatFeature4: string (nullable = true)
 |-- CatFeature5: string (nullable = true)
 |-- CatFeature6: string (nullable = true)
 |-- CatFeature7: string (nullable = true)
 |-- CatFeature8: string (nullable = true)
 |-- CatFeature9: string (nullable = true)
 |-- IntFeature1: string (nullable = true)
 |-- IntFeature10: string (nullable = true)
 |-- IntFeature11: string (nullable = true)
 |-- IntFeature12: string (nullable = true)
 |-- IntFeature13: string (nullable = true)
 |-- IntFeature2: string (nullable = true)
 |-- IntFeature3: string (nullable = true)
 |-- IntFeature4: string (nullable = true)
 |-- IntFeature5: string (nullable = true)
 |-- IntFeature6: string (nullable = true

In [6]:
# Cast numeric features to double 
df=schemaClicks .withColumn("IntFeature1tmp",schemaClicks.IntFeature1.cast('double')).drop("IntFeature1") .withColumnRenamed("IntFeature1tmp","IntFeature1").withColumn("IntFeature2tmp",schemaClicks.IntFeature2.cast('double')).drop("IntFeature2") .withColumnRenamed("IntFeature2tmp","IntFeature2").withColumn("IntFeature3tmp",schemaClicks.IntFeature3.cast('double')).drop("IntFeature3") .withColumnRenamed("IntFeature3tmp","IntFeature3").withColumn("IntFeature4tmp",schemaClicks.IntFeature4.cast('double')).drop("IntFeature4") .withColumnRenamed("IntFeature4tmp","IntFeature4").withColumn("IntFeature5tmp",schemaClicks.IntFeature5.cast('double')).drop("IntFeature5") .withColumnRenamed("IntFeature5tmp","IntFeature5").withColumn("IntFeature6tmp",schemaClicks.IntFeature6.cast('double')).drop("IntFeature6") .withColumnRenamed("IntFeature6tmp","IntFeature6").withColumn("IntFeature7tmp",schemaClicks.IntFeature7.cast('double')).drop("IntFeature7") .withColumnRenamed("IntFeature7tmp","IntFeature7").withColumn("IntFeature8tmp",schemaClicks.IntFeature8.cast('double')).drop("IntFeature8") .withColumnRenamed("IntFeature8tmp","IntFeature8").withColumn("IntFeature9tmp",schemaClicks.IntFeature9.cast('double')).drop("IntFeature9") .withColumnRenamed("IntFeature9tmp","IntFeature9").withColumn("IntFeature10tmp",schemaClicks.IntFeature10.cast('double')).drop("IntFeature10") .withColumnRenamed("IntFeature10tmp","IntFeature10").withColumn("IntFeature11tmp",schemaClicks.IntFeature11.cast('double')).drop("IntFeature11") .withColumnRenamed("IntFeature11tmp","IntFeature11").withColumn("IntFeature12tmp",schemaClicks.IntFeature12.cast('double')).drop("IntFeature12") .withColumnRenamed("IntFeature12tmp","IntFeature12").withColumn("IntFeature13tmp",schemaClicks.IntFeature13.cast('double')).drop("IntFeature13") .withColumnRenamed("IntFeature13tmp","IntFeature13").withColumn("label",schemaClicks.Label.cast('double')).drop("Label")

dfTrain=schemaClicksTrain .withColumn("IntFeature1tmp",schemaClicksTrain.IntFeature1.cast('double')).drop("IntFeature1") .withColumnRenamed("IntFeature1tmp","IntFeature1").withColumn("IntFeature2tmp",schemaClicksTrain.IntFeature2.cast('double')).drop("IntFeature2") .withColumnRenamed("IntFeature2tmp","IntFeature2").withColumn("IntFeature3tmp",schemaClicksTrain.IntFeature3.cast('double')).drop("IntFeature3") .withColumnRenamed("IntFeature3tmp","IntFeature3").withColumn("IntFeature4tmp",schemaClicksTrain.IntFeature4.cast('double')).drop("IntFeature4") .withColumnRenamed("IntFeature4tmp","IntFeature4").withColumn("IntFeature5tmp",schemaClicksTrain.IntFeature5.cast('double')).drop("IntFeature5") .withColumnRenamed("IntFeature5tmp","IntFeature5").withColumn("IntFeature6tmp",schemaClicksTrain.IntFeature6.cast('double')).drop("IntFeature6") .withColumnRenamed("IntFeature6tmp","IntFeature6").withColumn("IntFeature7tmp",schemaClicksTrain.IntFeature7.cast('double')).drop("IntFeature7") .withColumnRenamed("IntFeature7tmp","IntFeature7").withColumn("IntFeature8tmp",schemaClicksTrain.IntFeature8.cast('double')).drop("IntFeature8") .withColumnRenamed("IntFeature8tmp","IntFeature8").withColumn("IntFeature9tmp",schemaClicksTrain.IntFeature9.cast('double')).drop("IntFeature9") .withColumnRenamed("IntFeature9tmp","IntFeature9").withColumn("IntFeature10tmp",schemaClicksTrain.IntFeature10.cast('double')).drop("IntFeature10") .withColumnRenamed("IntFeature10tmp","IntFeature10").withColumn("IntFeature11tmp",schemaClicksTrain.IntFeature11.cast('double')).drop("IntFeature11") .withColumnRenamed("IntFeature11tmp","IntFeature11").withColumn("IntFeature12tmp",schemaClicksTrain.IntFeature12.cast('double')).drop("IntFeature12") .withColumnRenamed("IntFeature12tmp","IntFeature12").withColumn("IntFeature13tmp",schemaClicksTrain.IntFeature13.cast('double')).drop("IntFeature13") .withColumnRenamed("IntFeature13tmp","IntFeature13").withColumn("label",schemaClicksTrain.Label.cast('double')).drop("Label")
dfTest=schemaClicksTest .withColumn("IntFeature1tmp",schemaClicksTest.IntFeature1.cast('double')).drop("IntFeature1") .withColumnRenamed("IntFeature1tmp","IntFeature1").withColumn("IntFeature2tmp",schemaClicksTest.IntFeature2.cast('double')).drop("IntFeature2") .withColumnRenamed("IntFeature2tmp","IntFeature2").withColumn("IntFeature3tmp",schemaClicksTest.IntFeature3.cast('double')).drop("IntFeature3") .withColumnRenamed("IntFeature3tmp","IntFeature3").withColumn("IntFeature4tmp",schemaClicksTest.IntFeature4.cast('double')).drop("IntFeature4") .withColumnRenamed("IntFeature4tmp","IntFeature4").withColumn("IntFeature5tmp",schemaClicksTest.IntFeature5.cast('double')).drop("IntFeature5") .withColumnRenamed("IntFeature5tmp","IntFeature5").withColumn("IntFeature6tmp",schemaClicksTest.IntFeature6.cast('double')).drop("IntFeature6") .withColumnRenamed("IntFeature6tmp","IntFeature6").withColumn("IntFeature7tmp",schemaClicksTest.IntFeature7.cast('double')).drop("IntFeature7") .withColumnRenamed("IntFeature7tmp","IntFeature7").withColumn("IntFeature8tmp",schemaClicksTest.IntFeature8.cast('double')).drop("IntFeature8") .withColumnRenamed("IntFeature8tmp","IntFeature8").withColumn("IntFeature9tmp",schemaClicksTest.IntFeature9.cast('double')).drop("IntFeature9") .withColumnRenamed("IntFeature9tmp","IntFeature9").withColumn("IntFeature10tmp",schemaClicksTest.IntFeature10.cast('double')).drop("IntFeature10") .withColumnRenamed("IntFeature10tmp","IntFeature10").withColumn("IntFeature11tmp",schemaClicksTest.IntFeature11.cast('double')).drop("IntFeature11") .withColumnRenamed("IntFeature11tmp","IntFeature11").withColumn("IntFeature12tmp",schemaClicksTest.IntFeature12.cast('double')).drop("IntFeature12") .withColumnRenamed("IntFeature12tmp","IntFeature12").withColumn("IntFeature13tmp",schemaClicksTest.IntFeature13.cast('double')).drop("IntFeature13") .withColumnRenamed("IntFeature13tmp","IntFeature13").withColumn("label",schemaClicksTest.Label.cast('double')).drop("Label")
dfValidation=schemaClicksValidation .withColumn("IntFeature1tmp",schemaClicksValidation.IntFeature1.cast('double')).drop("IntFeature1") .withColumnRenamed("IntFeature1tmp","IntFeature1").withColumn("IntFeature2tmp",schemaClicksValidation.IntFeature2.cast('double')).drop("IntFeature2") .withColumnRenamed("IntFeature2tmp","IntFeature2").withColumn("IntFeature3tmp",schemaClicksValidation.IntFeature3.cast('double')).drop("IntFeature3") .withColumnRenamed("IntFeature3tmp","IntFeature3").withColumn("IntFeature4tmp",schemaClicksValidation.IntFeature4.cast('double')).drop("IntFeature4") .withColumnRenamed("IntFeature4tmp","IntFeature4").withColumn("IntFeature5tmp",schemaClicksValidation.IntFeature5.cast('double')).drop("IntFeature5") .withColumnRenamed("IntFeature5tmp","IntFeature5").withColumn("IntFeature6tmp",schemaClicksValidation.IntFeature6.cast('double')).drop("IntFeature6") .withColumnRenamed("IntFeature6tmp","IntFeature6").withColumn("IntFeature7tmp",schemaClicksValidation.IntFeature7.cast('double')).drop("IntFeature7") .withColumnRenamed("IntFeature7tmp","IntFeature7").withColumn("IntFeature8tmp",schemaClicksValidation.IntFeature8.cast('double')).drop("IntFeature8") .withColumnRenamed("IntFeature8tmp","IntFeature8").withColumn("IntFeature9tmp",schemaClicksValidation.IntFeature9.cast('double')).drop("IntFeature9") .withColumnRenamed("IntFeature9tmp","IntFeature9").withColumn("IntFeature10tmp",schemaClicksValidation.IntFeature10.cast('double')).drop("IntFeature10") .withColumnRenamed("IntFeature10tmp","IntFeature10").withColumn("IntFeature11tmp",schemaClicksValidation.IntFeature11.cast('double')).drop("IntFeature11") .withColumnRenamed("IntFeature11tmp","IntFeature11").withColumn("IntFeature12tmp",schemaClicksValidation.IntFeature12.cast('double')).drop("IntFeature12") .withColumnRenamed("IntFeature12tmp","IntFeature12").withColumn("IntFeature13tmp",schemaClicksValidation.IntFeature13.cast('double')).drop("IntFeature13") .withColumnRenamed("IntFeature13tmp","IntFeature13").withColumn("label",schemaClicksValidation.Label.cast('double')).drop("Label")

df.printSchema()

root
 |-- CatFeature1: string (nullable = true)
 |-- CatFeature10: string (nullable = true)
 |-- CatFeature11: string (nullable = true)
 |-- CatFeature12: string (nullable = true)
 |-- CatFeature13: string (nullable = true)
 |-- CatFeature2: string (nullable = true)
 |-- CatFeature3: string (nullable = true)
 |-- CatFeature4: string (nullable = true)
 |-- CatFeature5: string (nullable = true)
 |-- CatFeature6: string (nullable = true)
 |-- CatFeature7: string (nullable = true)
 |-- CatFeature8: string (nullable = true)
 |-- CatFeature9: string (nullable = true)
 |-- IntFeature1: double (nullable = true)
 |-- IntFeature2: double (nullable = true)
 |-- IntFeature3: double (nullable = true)
 |-- IntFeature4: double (nullable = true)
 |-- IntFeature5: double (nullable = true)
 |-- IntFeature6: double (nullable = true)
 |-- IntFeature7: double (nullable = true)
 |-- IntFeature8: double (nullable = true)
 |-- IntFeature9: double (nullable = true)
 |-- IntFeature10: double (nullable = true)
 

In [7]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat1Indexer = StringIndexer(inputCol="CatFeature1", outputCol="indexedCat1")
cat1Encoder = OneHotEncoder(inputCol="indexedCat1", outputCol="CatVector1")

cat2Indexer = StringIndexer(inputCol="CatFeature2", outputCol="indexedCat2")
cat2Encoder = OneHotEncoder(inputCol="indexedCat2", outputCol="CatVector2")

cat3Indexer = StringIndexer(inputCol="CatFeature3", outputCol="indexedCat3")
cat3Encoder = OneHotEncoder(inputCol="indexedCat3", outputCol="CatVector3")

cat4Indexer = StringIndexer(inputCol="CatFeature4", outputCol="indexedCat4")
cat4Encoder = OneHotEncoder(inputCol="indexedCat4", outputCol="CatVector4")

cat5Indexer = StringIndexer(inputCol="CatFeature5", outputCol="indexedCat5")
cat5Encoder = OneHotEncoder(inputCol="indexedCat5", outputCol="CatVector5")

cat6Indexer = StringIndexer(inputCol="CatFeature6", outputCol="indexedCat6")
cat6Encoder = OneHotEncoder(inputCol="indexedCat6", outputCol="CatVector6")

cat7Indexer = StringIndexer(inputCol="CatFeature7", outputCol="indexedCat7")
cat7Encoder = OneHotEncoder(inputCol="indexedCat7", outputCol="CatVector7")

cat8Indexer = StringIndexer(inputCol="CatFeature8", outputCol="indexedCat8")
cat8Encoder = OneHotEncoder(inputCol="indexedCat8", outputCol="CatVector8")

cat9Indexer = StringIndexer(inputCol="CatFeature9", outputCol="indexedCat9")
cat9Encoder = OneHotEncoder(inputCol="indexedCat9", outputCol="CatVector9")

cat10Indexer = StringIndexer(inputCol="CatFeature10", outputCol="indexedCat10")
cat10Encoder = OneHotEncoder(inputCol="indexedCat10", outputCol="CatVector10")

cat11Indexer = StringIndexer(inputCol="CatFeature11", outputCol="indexedCat11")
cat11Encoder = OneHotEncoder(inputCol="indexedCat11", outputCol="CatVector11")

cat12Indexer = StringIndexer(inputCol="CatFeature12", outputCol="indexedCat12")
cat12Encoder = OneHotEncoder(inputCol="indexedCat12", outputCol="CatVector12")

cat13Indexer = StringIndexer(inputCol="CatFeature13", outputCol="indexedCat13")
cat13Encoder = OneHotEncoder(inputCol="indexedCat13", outputCol="CatVector13")



In [8]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

featureAssembler = VectorAssembler(
    inputCols=[
            "IntFeature1", "IntFeature2", "IntFeature3", "IntFeature4", "IntFeature5", "IntFeature6", "IntFeature7", "IntFeature8",
            "IntFeature9", "IntFeature10", "IntFeature11", "IntFeature12", "IntFeature13", 
            "CatVector1", "CatVector2","CatVector3","CatVector4","CatVector5","CatVector6",
            "CatVector7","CatVector8","CatVector9","CatVector10","CatVector11","CatVector12","CatVector13"],
    outputCol="features")

In [9]:
# Playing with Feature Selection
from pyspark.ml import Pipeline
fAssembler = VectorAssembler(
    inputCols=["CatVector1"],
    outputCol="features")
pipelineTmp = Pipeline(stages=[cat1Indexer, cat1Encoder,fAssembler])

modelTmp = pipelineTmp.fit(dfTrain)
tmp = modelTmp.transform(dfTest).select("features")

tmp.show()

+----------------+
|        features|
+----------------+
| (470,[2],[1.0])|
| (470,[0],[1.0])|
| (470,[2],[1.0])|
| (470,[0],[1.0])|
| (470,[1],[1.0])|
| (470,[1],[1.0])|
| (470,[5],[1.0])|
| (470,[0],[1.0])|
| (470,[4],[1.0])|
| (470,[0],[1.0])|
| (470,[2],[1.0])|
| (470,[0],[1.0])|
|(470,[72],[1.0])|
| (470,[0],[1.0])|
| (470,[0],[1.0])|
| (470,[1],[1.0])|
| (470,[1],[1.0])|
| (470,[0],[1.0])|
| (470,[6],[1.0])|
| (470,[0],[1.0])|
+----------------+



In [10]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.01)

In [16]:
from pyspark.ml import Pipeline
    
pipeline = Pipeline(stages=[cat1Indexer, cat2Indexer, cat3Indexer, cat4Indexer, cat5Indexer,cat6Indexer, cat7Indexer,
                            cat8Indexer, cat9Indexer, cat10Indexer, cat11Indexer, cat12Indexer, cat13Indexer,
                            cat1Encoder, cat2Encoder, cat3Encoder, cat4Encoder, cat5Encoder, cat6Encoder, cat7Encoder,
                            cat8Encoder, cat9Encoder, cat10Encoder, cat11Encoder, cat12Encoder, cat13Encoder,
                            fAssembler, lr])

In [17]:
# Train model.  This also runs the indexers, encoders and assembler
model = pipeline.fit(dfTrain)

treeModel = model.stages[27]
print treeModel # summary only

LogisticRegression_4d6ba71b48e1665f203e


In [18]:
prediction = model.transform(dfTest).select("features", "label", "prediction", "rawPrediction", "probability")
prediction.show()

+----------------+-----+----------+--------------------+--------------------+
|        features|label|prediction|       rawPrediction|         probability|
+----------------+-----+----------+--------------------+--------------------+
| (470,[2],[1.0])|  0.0|       0.0|[1.21013445009279...|[0.77032273757685...|
| (470,[0],[1.0])|  0.0|       0.0|[1.22247147156137...|[0.77249819004175...|
| (470,[2],[1.0])|  0.0|       0.0|[1.21013445009279...|[0.77032273757685...|
| (470,[0],[1.0])|  0.0|       0.0|[1.22247147156137...|[0.77249819004175...|
| (470,[1],[1.0])|  0.0|       0.0|[1.21806470675759...|[0.77172279445516...|
| (470,[1],[1.0])|  0.0|       0.0|[1.21806470675759...|[0.77172279445516...|
| (470,[5],[1.0])|  0.0|       0.0|[1.28438632487356...|[0.78319550087281...|
| (470,[0],[1.0])|  1.0|       0.0|[1.22247147156137...|[0.77249819004175...|
| (470,[4],[1.0])|  0.0|       0.0|[1.17124174410751...|[0.76336939346512...|
| (470,[0],[1.0])|  0.0|       0.0|[1.22247147156137...|[0.77249

In [38]:
# Try Cross Validator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from  pyspark.ml.evaluation import BinaryClassificationEvaluator


evaluator = BinaryClassificationEvaluator()
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
numFolds=2

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=numFolds)

cvModel = crossval.fit(dfTrain)
evaluator.evaluate(cvModel.transform(dfTest))



Py4JJavaError: An error occurred while calling o3144.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 117.0 failed 1 times, most recent failure: Lost task 0.0 in stage 117.0 (TID 213, localhost): org.apache.spark.SparkException: Unseen label: 6e4a368f.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:131)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:126)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
	at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
	at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
	at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
	at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
	at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:201)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


In [39]:
# Visualize ROC plot

labelsAndScores = prediction.select("label", "probability")
labelsAndWeights = labelsAndScores.collect()
labelsAndWeights.sort(key=lambda (k, v): v, reverse=True)
labelsByWeight = np.array([k for (k, v) in labelsAndWeights])

length = labelsByWeight.size
truePositives = labelsByWeight.cumsum()
numPositive = truePositives[-1]
falsePositives = np.arange(1.0, length + 1, 1.) - truePositives

truePositiveRate = truePositives / numPositive
falsePositiveRate = falsePositives / (length - numPositive)

# Generate layout and plot data
fig, ax = preparePlot(np.arange(0., 1.1, 0.1), np.arange(0., 1.1, 0.1))
ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
ax.set_ylabel('True Positive Rate (Sensitivity)')
ax.set_xlabel('False Positive Rate (1 - Specificity)')
plt.plot(falsePositiveRate, truePositiveRate, color='#8cbfd0', linestyle='-', linewidth=3.)
plt.plot((0., 1.), (0., 1.), linestyle='--', color='#d6ebf2', linewidth=2.)  # Baseline model
pass

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 119.0 failed 1 times, most recent failure: Lost task 1.0 in stage 119.0 (TID 216, localhost): org.apache.spark.SparkException: Unseen label: c77d22cf.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:131)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:126)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
	at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
	at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
	at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
	at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
	at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
	at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
	at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:111)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:111)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:111)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:111)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


In [None]:
from math import log

def computeLogLoss(p, y):
    """Calculates the value of log loss for a given probabilty and label.
    Note:
        log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
        and when p is 1 we need to subtract a small value (epsilon) from it.
    Args:
        p (float): A probabilty between 0 and 1.
        y (int): A label.  Takes on the values 0 and 1.
    Returns:
        float: The log loss value.
    """
    epsilon = 10e-12
    if y == 1:
        return -log(epsilon + p) if p == 0 else -log(p)
    elif y == 0:
        return -log(1 - p + epsilon) if p == 1 else -log(1 - p)

In [34]:
# ROC for BinaryClassification
from pyspark.mllib.evaluation import BinaryClassificationMetrics

#>>> scoreAndLabels = sc.parallelize([ import 
#...     (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], 2)
#>>> metrics = BinaryClassificationMetrics(scoreAndLabels)
#>>> metrics.areaUnderROC
#0.70...
#>>> metrics.areaUnderPR
#0.83...
#>>> metrics.unpersist()

In [None]:
# To optimize piepline creation using concise code
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, 

OneHotEncoder}
import org.apache.spark.ml.Pipeline

val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")

val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()


//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
  cname => new StringIndexer()
    .setInputCol(cname)
    .setOutputCol(s"${cname}_index")
)

// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)


//encoding columns
val indexColumns  = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
    cname => new OneHotEncoder()
     .setInputCol(cname)
     .setOutputCol(s"${cname}_vec")
)



val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders)