In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RFormula

In [2]:
spark = SparkSession.\
        builder.\
        appName("tcc_pucminas_ml_fretes").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "7G").\
        getOrCreate()

In [3]:
fretes = spark.read.csv(path="data/fretes.csv", sep=";", header=True)

In [4]:
dataset_prep = fretes.selectExpr("upper(UF_origem) as uf_ori",
                            "upper(UF_Destino) as uf_dst",
                            "KM as km",
                            "regexp_replace(Preco,'([ˆ.]|[^,]*$)|[ˆ,]','') as preco").\
                selectExpr("*","format_number(preco/km,4) as preco_km").\
                where("not isnull(preco_km) and Pton = 0 and PED = 0")

In [5]:
dataset_prep.show()

+------+------+----+------+--------+
|uf_ori|uf_dst|  km| preco|preco_km|
+------+------+----+------+--------+
|    CE|    SC|3520|   150|  0.0426|
|    SP|    MG| 537|   300|  0.5587|
|    PA|    PA| 265|     3|  0.0113|
|    SP|    SP| 162|    25|  0.1543|
|    RS|    SP|1073|  2000|  1.8639|
|    TO|    GO| 625|  1000|  1.6000|
|    SP|    SP| 327|   200|  0.6116|
|    SP|    SP|  21|   318| 15.1429|
|    SP|    SP| 282|   500|  1.7730|
|    RJ|    MG| 341|   750|  2.1994|
|    RJ|    MG| 341|   750|  2.1994|
|    PA|    MG|2035|   200|  0.0983|
|    PA|    SP|2182|   250|  0.1146|
|    SP|    ES|1152|  5800|  5.0347|
|    PB|    PE| 115|  2500| 21.7391|
|    MT|    MG|1988|   700|  0.3521|
|    MG|    MT| 934|  3800|  4.0685|
|    SP|    PE|2566| 12000|  4.6765|
|    SP|    PA|2438|  9000|  3.6916|
|    SP|    BA|1910|  9000|  4.7120|
+------+------+----+------+--------+
only showing top 20 rows



In [6]:
dataset_prep.count()

542

In [7]:
# Preparando os dados para aplicão fórmula R.
formula = RFormula(
    formula="uf_dst ~ uf_ori + preco_km",
    featuresCol="features",
    labelCol="label")

In [8]:
dataset = formula.fit(dataset_prep).transform(dataset_prep)

In [9]:
(training, test) = dataset.randomSplit([0.95, 0.05])

In [10]:
training.show(10, False)

+------+------+----+-----+--------+-----------------------+-----+
|uf_ori|uf_dst|km  |preco|preco_km|features               |label|
+------+------+----+-----+--------+-----------------------+-----+
|AC    |BA    |4567| 10  |0.0022  |(283,[16,42],[1.0,1.0])|3.0  |
|BA    |BA    |272 | 700 |2.5735  |(283,[0,172],[1.0,1.0])|3.0  |
|BA    |BA    |517 | 2500|4.8356  |(283,[0,26],[1.0,1.0]) |3.0  |
|BA    |BA    |517 | 2500|4.8356  |(283,[0,26],[1.0,1.0]) |3.0  |
|BA    |BA    |517 | 2500|4.8356  |(283,[0,26],[1.0,1.0]) |3.0  |
|BA    |BA    |517 | 2500|4.8356  |(283,[0,26],[1.0,1.0]) |3.0  |
|BA    |BA    |517 | 2500|4.8356  |(283,[0,26],[1.0,1.0]) |3.0  |
|BA    |BA    |517 | 2500|4.8356  |(283,[0,26],[1.0,1.0]) |3.0  |
|BA    |BA    |517 | 2500|4.8356  |(283,[0,26],[1.0,1.0]) |3.0  |
|BA    |BA    |517 | 2500|4.8356  |(283,[0,26],[1.0,1.0]) |3.0  |
+------+------+----+-----+--------+-----------------------+-----+
only showing top 10 rows



In [11]:
training.printSchema()

root
 |-- uf_ori: string (nullable = true)
 |-- uf_dst: string (nullable = true)
 |-- km: string (nullable = true)
 |-- preco: string (nullable = true)
 |-- preco_km: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [12]:
# Criando a instância da Regressão Logítica . Esssa instância é o Estimador.
lr = LogisticRegression(maxIter=100, regParam=0.01)
# Para imprimir os parametoros, instrução, e valores padrões.
#print("LogisticRegression parâmetros:\n" + lr.explainParams() + "\n")

In [13]:
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

In [14]:
# A transformação é dada pela o estimador,
# Podemos ver os parâmetros do fit().
# Instancia do modelo mode1 da regressão logística .
#print(model1.extractParamMap())

In [15]:
# Podemos, alternativamente, especificar parâmetros usando um dicionário Python com um paramMap
paramMap = {lr.maxIter: 100}
paramMap[lr.maxIter] = 30  # Especificando parâmero, sobrescrevendo o maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # Especificando múltiplos parâmetros.

In [16]:
# Pode combinar o paramMaps, são dicionários no python.
paramMap2 = {lr.probabilityCol: "myProbability"}  # Alterando name  da coluna
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

In [17]:
# Agora aprendizado comm o novo modelo model2 usando os parâmetros do paramMapCombined.
# paramMapCombined sobrescreve todos os parâmetros configurados anteriormente via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
#print(model2.extractParamMap())

In [18]:
test.show(10)

+------+------+----+-----+--------+--------------------+-----+
|uf_ori|uf_dst|  km|preco|preco_km|            features|label|
+------+------+----+-----+--------+--------------------+-----+
|    BA|    BA| 517| 2500|  4.8356|(283,[0,26],[1.0,...|  3.0|
|    BA|    BA| 517| 2500|  4.8356|(283,[0,26],[1.0,...|  3.0|
|    BA|    PA|2033| 7850|  3.8613|(283,[0,22],[1.0,...|  7.0|
|    BA|    PB| 921| 3900|  4.2345|(283,[0,23],[1.0,...|  9.0|
|    BA|    PE| 706| 3750|  5.3116|(283,[0,27],[1.0,...|  4.0|
|    BA|    PE| 805| 3750|  4.6584|(283,[0,24],[1.0,...|  4.0|
|    BA|    RJ|1629| 6200|  3.8060|(283,[0,20],[1.0,...|  0.0|
|    BA|    RJ|1629| 6200|  3.8060|(283,[0,20],[1.0,...|  0.0|
|    PA|    CE|1660| 6500|  3.9157|(283,[4,214],[1.0...|  5.0|
|    PB|    SP|2703| 6000|  2.2198|(283,[15,163],[1....|  2.0|
+------+------+----+-----+--------+--------------------+-----+
only showing top 10 rows



In [19]:
# Gerandoa as previsões na partição test de dados utilizando a transformação do método transform() .
# LogisticRegression.transform só utilizará a coluna 'features'.
# Nota que no model2.transform() a nova coluna é "myProbability" diferente do padrão do método lr.probabilityCol.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()


In [20]:
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

In [21]:
i = 0
print (test.columns[0:5])
for row in result:
    print (test.collect()[i][:5])
    i += 1
    print("features (Características do dado)= %s\nlabel = %s <-> prob = %s\nprediction = %s\n"
          % (row.features, row.label,  row.prediction, row.myProbability))

  

['uf_ori', 'uf_dst', 'km', 'preco', 'preco_km']
('BA', 'BA', '517', ' 2500', '4.8356')
features (Características do dado)= (283,[0,26],[1.0,1.0])
label = 3.0 <-> prob = 3.0
prediction = [0.05174507220777835,0.02313623662989299,0.023164175975093227,0.7190582627279983,0.03577502206168011,0.02323584023193223,0.014733057604374118,0.01816496575646411,0.01784666951358778,0.016549833286149774,0.008378294715170778,0.007556691089299961,0.005787851985004157,0.0058435652330021326,0.00546766924561357,0.004360614348286188,0.0037387880295079046,0.003637964531501942,0.001920320557430431,0.001600169155965197,0.0017076514779302604,0.00170675254881067,0.000981694787090989,0.0009816947870909898,0.0009816947870909898,0.0009816947870909898,0.0009577519391618146]

('BA', 'BA', '517', ' 2500', '4.8356')
features (Características do dado)= (283,[0,26],[1.0,1.0])
label = 3.0 <-> prob = 3.0
prediction = [0.05174507220777835,0.02313623662989299,0.023164175975093227,0.7190582627279983,0.03577502206168011,0.023235

In [22]:
i = 0
print (test.columns[0:5])
for row in result:
    print (test.collect()[i][:5])
    i += 1
    print("label = %s <-> prob = %s\n"
          % (row.label,  row.prediction))



['uf_ori', 'uf_dst', 'km', 'preco', 'preco_km']
('BA', 'BA', '517', ' 2500', '4.8356')
label = 3.0 <-> prob = 3.0

('BA', 'BA', '517', ' 2500', '4.8356')
label = 3.0 <-> prob = 3.0

('BA', 'PA', '2033', ' 7850', '3.8613')
label = 7.0 <-> prob = 7.0

('BA', 'PB', '921', ' 3900', '4.2345')
label = 9.0 <-> prob = 9.0

('BA', 'PE', '706', ' 3750', '5.3116')
label = 4.0 <-> prob = 4.0

('BA', 'PE', '805', ' 3750', '4.6584')
label = 4.0 <-> prob = 4.0

('BA', 'RJ', '1629', ' 6200', '3.8060')
label = 0.0 <-> prob = 0.0

('BA', 'RJ', '1629', ' 6200', '3.8060')
label = 0.0 <-> prob = 0.0

('PA', 'CE', '1660', ' 6500', '3.9157')
label = 5.0 <-> prob = 2.0

('PB', 'SP', '2703', ' 6000', '2.2198')
label = 2.0 <-> prob = 4.0

('RS', 'RS', '123', ' 24', '0.1951')
label = 1.0 <-> prob = 1.0

('RS', 'RS', '123', ' 24', '0.1951')
label = 1.0 <-> prob = 1.0

('RS', 'RS', '211', ' 31', '0.1469')
label = 1.0 <-> prob = 1.0

('RS', 'RS', '284', ' 44', '0.1549')
label = 1.0 <-> prob = 1.0

('SE', 'RS', '325