In [1]:
# Treinamento de modelo usando SPark (PySpark)

#----------------------------------------
# Exemplo 1 - Usando a bilbioteca MLLib
#----------------------------------------
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext

sc=SparkContext.getOrCreate()

# LabeledPoint -> estrutura de dados para alimentar os modelos da biblioteca MLLib
# Formato: LabeledPoint(classe/valor_predicao,[caracteristica1,carcteristica2,carcteristica3,...])
data=[LabeledPoint(0,[0.0,1.0]),LabeledPoint(0,[0.1,0.9]),LabeledPoint(1,[1.0,0.0]),LabeledPoint(1,[0.8,0.3])]
print(data)

[LabeledPoint(0.0, [0.0,1.0]), LabeledPoint(0.0, [0.1,0.9]), LabeledPoint(1.0, [1.0,0.0]), LabeledPoint(1.0, [0.8,0.3])]


In [2]:
rddData=sc.parallelize(data)
print(rddData.glom().collect())

[[LabeledPoint(0.0, [0.0,1.0]), LabeledPoint(0.0, [0.1,0.9]), LabeledPoint(1.0, [1.0,0.0]), LabeledPoint(1.0, [0.8,0.3])]]


In [14]:
#Como funciona o treinamento em paralelo?

# >> Supondo que dividi a rddData em 2 particoes: rddData=sc.parallelize(data,2) 

# MASTER: w -> parametro de otimizacao da regressao logistica

#iteracao 1
#-------------
#[
# WORKER1: [LabeledPoint(0.0, [0.0,1.0]), LabeledPoint(0.0, [0.1,0.9])] -> some 1 do w
# WORKER2: [LabeledPoint(1.0, [1.0,0.0]), LabeledPoint(1.0, [0.8,0.3])] -> subtrai 0,2 do w
#]

# MASTER: w_ite1=w+1-0.2


#iteracao 2
#-------------
#[
# WORKER1: [LabeledPoint(0.0, [0.0,1.0]), LabeledPoint(0.0, [0.1,0.9])] -> some 2 do w_ite1
# WORKER2: [LabeledPoint(1.0, [1.0,0.0]), LabeledPoint(1.0, [0.8,0.3])] -> subtrai 0,5 do w_ite1
#]

# MASTER: w_ite2=w_ite+2-0.5

In [3]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

lrm = LogisticRegressionWithSGD.train(rddData,iterations=10)

In [5]:
print(lrm.predict([0.95,0.12])) # nao paraleliza o comando predict pois nao e rdd 

1


In [6]:
print(lrm.predict([0.01,0.7])) # nao paraleliza o comando predict pois nao e rdd 

0


In [8]:
rddTeste=sc.parallelize([[0.1,0.75],[0.2,0.5],[0.9,0.2]])
print(rddTeste.glom().collect())

[[[0.1, 0.75], [0.2, 0.5], [0.9, 0.2]]]


In [10]:
print(lrm.predict(rddTeste).collect()) # comando predict executado em paralelo nos workers sobre as particoes da rdd

[0, 0, 1]


In [11]:
#----------------------------------------
# Exemplo 2 - Usando a blibioteca ML
#----------------------------------------

from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

In [15]:
temp_df=spark.createDataFrame([Row(v1=0.65, v2=1.0, v3=0.7, v4=1),Row(v1=0.5,v2=0.7,v3=0.4,v4=1),
                              Row(v1=-0.1,v2=0.0,v3=0.1,v4=0),Row(v1=-0.5,v2=0.2,v3=-0.1,v4=0)])

print(temp_df.show())

+----+---+----+---+
|  v1| v2|  v3| v4|
+----+---+----+---+
|0.65|1.0| 0.7|  1|
| 0.5|0.7| 0.4|  1|
|-0.1|0.0| 0.1|  0|
|-0.5|0.2|-0.1|  0|
+----+---+----+---+

None


In [34]:
trainingData=temp_df.rdd.map(lambda x: (Vectors.dense(x[0:-1]), x[-1])).toDF(["features","label"]) 
trainingData.show()

+---------------+-----+
|       features|label|
+---------------+-----+
| [0.65,1.0,0.7]|    1|
|  [0.5,0.7,0.4]|    1|
| [-0.1,0.0,0.1]|    0|
|[-0.5,0.2,-0.1]|    0|
+---------------+-----+



In [44]:
#trainingDireto=spark.createDataFrame([Row('features'=[0.65,1.0,0.7], label=1),Row('features'=[0.65, 1.0, 0.7], label=1),Row(features=[0.5,0.7,0.4],label=1),Row(features=[-0.1,0.0,0.1],label=0),Row(features=[-0.5,0.2,-0.1],label=0)]])
#trainingDireto.show()

In [45]:
lrm=LogisticRegression(maxIter=10)
model=lrm.fit(trainingData)

In [42]:
test_df=spark.createDataFrame([Row(v1=0.6, v2=0.9, v3=0.5, v4=0),Row(v1=-0.4,v2=0.1,v3=-0.7,v4=0)])

testData=test_df.rdd.map(lambda x: (Vectors.dense(x[0:-1]), x[-1])).toDF(["features","label"]) 
testData.show()

+---------------+-----+
|       features|label|
+---------------+-----+
|  [0.6,0.9,0.5]|    0|
|[-0.4,0.1,-0.7]|    0|
+---------------+-----+



In [46]:
predictionsTestData=model.transform(testData)
predictionsTestData.show()

+---------------+-----+--------------------+--------------------+----------+
|       features|label|       rawPrediction|         probability|prediction|
+---------------+-----+--------------------+--------------------+----------+
|  [0.6,0.9,0.5]|    0|[-10.588123992255...|[2.52130407904822...|       1.0|
|[-0.4,0.1,-0.7]|    0|[10.9422417586944...|[0.99998230554869...|       0.0|
+---------------+-----+--------------------+--------------------+----------+

