In [0]:
!java -version

In [0]:
sc.version

In [0]:
#!pip install mlflow

In [0]:
#import math
#import pandas as pd
#import matplotlib.pylab as plt
#%matplotlib inline

In [0]:
# carregar tabela de dados
df = table("cadastro_nomes_sexo")
#df = spark.sql("select * from cadastro_nomes_sexo")

In [0]:
df.printSchema()

In [0]:
df.show(10)

In [0]:
rdd = df.rdd.filter(lambda x: x['SEXO'] in ['M', 'F'])
#rdd = df.rdd
#rdd = df.rdd.map(list)
#rdd = df.rdd.map(tuple)
rdd

In [0]:
type(rdd)

In [0]:
rdd.take(5)

In [0]:
from pyspark.sql import Row

rdd2 = rdd.map(lambda x: Row(
  nome=x['PNOME'],
  sexo={'F': 0, 'M': 1, 'X': 9}[x['SEXO']]
))
rdd2

In [0]:
rdd2.take(5)

In [0]:
MAIOR_QTDE_LETRAS = 16 # fixado para o maior arquivo

def incluir_letras(row):
  row_dict = row.asDict()
  rev = row_dict['nome'][::-1]
  tam = len(rev)
  letras = [0] * MAIOR_QTDE_LETRAS
  
  for i in range(tam):
    letras[i] = ord(rev[i]) - 64
  row_dict['letras'] = letras
  
  new_row = Row(**row_dict)
  return new_row

In [0]:
rdd3 = rdd2.map(incluir_letras)
rdd3

In [0]:
rdd3.take(5)

In [0]:
df3 = rdd3.toDF()
df3

In [0]:
df3.select("nome", "sexo").describe().show()

In [0]:
from pyspark.ml.linalg import Vectors

def converter_para_ponto_rotulado(row):
    obj = (row["nome"], row["sexo"], Vectors.dense(row["letras"]))
    return obj

In [0]:
rdd4 = rdd3.map(converter_para_ponto_rotulado)
rdd4

In [0]:
rdd4.take(5)

In [0]:
df4 = rdd4.toDF(["nome", "real", "features"])
df4

In [0]:
df4.show(5, truncate=False)

In [0]:
from pyspark.ml.feature import PCA

pca = PCA(
  k = 10,
  inputCol = "features",
  outputCol = "pcaFeatures"
)

#pcaModel = pca.fit(df4)
#pcaResult = pcaModel.transform(df4).select("nome", "real", "pcaFeatures")
#pcaResult.show(5)

In [0]:
# usar redução de dimensionalidade PCA
#df5 = pcaResult.withColumnRenamed("pcaFeatures", "features")

# não usar PCA
df5 = df4

df5.show(5, truncate=False)

In [0]:
# dividir dados entre treino e teste
(dados_treino, dados_teste) = df5.randomSplit([0.7, 0.3], seed=42)

dados_treino.cache()
dados_teste.cache()

print("dados de treino:", dados_treino.count())
print("dados de teste: ", dados_teste.count())

In [0]:
# https://spark.apache.org/docs/latest/ml-statistics.html

'''from pyspark.ml.stat import Correlation

r1 = Correlation.corr(dados_treino, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

r2 = Correlation.corr(dados_treino, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))'''

In [0]:
from pyspark.ml.stat import ChiSquareTest

r = ChiSquareTest.test(dados_treino, "features", "real").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

In [0]:
'''from pyspark.ml.stat import Summarizer

# create summarizer for multiple metrics "mean" and "count"
summarizer = Summarizer.metrics("mean", "count")

# compute statistics for multiple metrics with weight
dados_treino.select(summarizer.summary(dados_treino.features, dados_treino.real)).show(truncate=False)

# compute statistics for multiple metrics without weight
dados_treino.select(summarizer.summary(dados_treino.features)).show(truncate=False)

# compute statistics for single metric "mean" with weight
dados_treino.select(Summarizer.mean(dados_treino.features, dados_treino.real)).show(truncate=False)

# compute statistics for single metric "mean" without weight
dados_treino.select(Summarizer.mean(dados_treino.features)).show(truncate=False)'''

In [0]:
from datetime import datetime

models = {}

def evaluate_model(name, classifier, evaluator, train_data, test_data):

  start = datetime.now()
  model = classifier.fit(train_data)
  end = datetime.now()
  elapsed = int((end - start).total_seconds() * 1000)

  predictions = model.transform(test_data)
  score = evaluator.evaluate(predictions) * 100

  models[name] = (model, score, elapsed)
  print(model, '\nScore: %.2f [%5s ms]' % (score, elapsed))
  return model

In [0]:
# https://spark.apache.org/docs/latest/ml-tuning.html
import numpy as np
import re

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

NUMBER_KFOLD_SPLITS = 5 # number of splits in cross-validation

# faz o ajuste fino do modelo, calculando os melhores hiperparâmetros
def fine_tune_model(estimator, params, evaluator, train_data, test_data):

  print('\nFine Tuning Model:')
  print(estimator) #, "\nparams:", params)
  
  crossval = CrossValidator(estimator=estimator,
                            estimatorParamMaps=params,
                            evaluator=evaluator,
                            numFolds=NUMBER_KFOLD_SPLITS,
                            seed=42,
                            parallelism=3)
  search = crossval.fit(train_data)

  predictions = search.transform(test_data)
  score = evaluator.evaluate(predictions) * 100

  hyper_list = []
  hyperparams = search.getEstimatorParamMaps()[np.argmax(search.avgMetrics)]
  for i in range(len(hyperparams.items())):
    hyper_name = re.search("name='(.+?)'", str([x for x in hyperparams.items()][i])).group(1)
    hyper_value = [x for x in hyperparams.items()][i][1]
    hyper_list.append({hyper_name: hyper_value})

  print('\nBest Score: %.2f' % score)
  print('Best Params:', hyper_list)
  return search

In [0]:
'''from pyspark.ml.evaluation import MulticlassClassificationEvaluator

avaliador = MulticlassClassificationEvaluator(
    predictionCol = "previsto",
    labelCol = "real",
    metricName = "accuracy"
)'''

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

avaliador = BinaryClassificationEvaluator(
    rawPredictionCol = "previsto",
    labelCol = "real",
    metricName = "areaUnderROC"
)

In [0]:
# Logistic Regression
from pyspark.ml.classification import LogisticRegression

estimador = LogisticRegression(
  regParam=0.1,
  maxIter=10,
  labelCol="real",
  featuresCol="features",
  predictionCol="previsto"
)

params = ParamGridBuilder() \
    .addGrid(estimador.regParam, [1.0, 0.1, 0.01]) \
    .addGrid(estimador.maxIter, [10, 100, 1000]) \
    .build()
#fine_tune_model(estimador, params, avaliador, dados_treino, dados_teste)

evaluate_model('LR', estimador, avaliador, dados_treino, dados_teste)

In [0]:
# Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier

estimador = DecisionTreeClassifier(
  maxDepth=9,
  labelCol="real",
  featuresCol="features",
  predictionCol="previsto"
)

params = ParamGridBuilder() \
    .addGrid(estimador.maxDepth, [2, 5, 7, 9, 11, 13]) \
    .build()
#fine_tune_model(estimador, params, avaliador, dados_treino, dados_teste)

evaluate_model('DT', estimador, avaliador, dados_treino, dados_teste)

In [0]:
# Random Forest
from pyspark.ml.classification import RandomForestClassifier

estimador = RandomForestClassifier(
  maxDepth=19,
  labelCol="real",
  featuresCol="features",
  predictionCol="previsto"
)

params = ParamGridBuilder() \
    .addGrid(estimador.maxDepth, [9, 11, 13, 17, 19]) \
    .build()
#fine_tune_model(estimador, params, avaliador, dados_treino, dados_teste)

evaluate_model('RF', estimador, avaliador, dados_treino, dados_teste)

In [0]:
# Gradient-Boosted Trees (GBTs)
from pyspark.ml.classification import GBTClassifier

estimador = GBTClassifier(
  #maxDepth=10,
  labelCol="real",
  featuresCol="features",
  predictionCol="previsto"
)

params = ParamGridBuilder() \
    .addGrid(estimador.maxDepth, [5, 10]) \
    .build()
#fine_tune_model(estimador, params, avaliador, dados_treino, dados_teste)

# só classificação binária!
evaluate_model('GB', estimador, avaliador, dados_treino, dados_teste)

In [0]:
# Multilayer Perceptron (MLP)
from pyspark.ml.classification import MultilayerPerceptronClassifier

estimador = MultilayerPerceptronClassifier(
  #maxIter=100,
  layers=[4, 5, 3],
  #blockSize=128,
  seed=42,
  labelCol="real",
  featuresCol="features",
  predictionCol="previsto"
)

params = ParamGridBuilder() \
    .addGrid(estimador.layers, [[4,5,3], [4,8,3], [4,5,6,3]]) \
    .build()
#fine_tune_model(estimador, params, avaliador, dados_treino, dados_teste)

# está dando erro...
#evaluate_model('MLP', estimador, avaliador, dados_treino, dados_teste)

In [0]:
# Linear Support Vector Machine (SVM)
from pyspark.ml.classification import LinearSVC

estimador = LinearSVC(
  maxIter=100,
  regParam=0.1,
  labelCol="real",
  featuresCol="features",
  predictionCol="previsto"
)

params = ParamGridBuilder() \
    .addGrid(estimador.maxIter, [10, 50, 100, 500]) \
    .addGrid(estimador.regParam, [0.05, 0.1, 0.5]) \
    .build()
#fine_tune_model(estimador, params, avaliador, dados_treino, dados_teste)

# só classificação binária!
evaluate_model('LSVM', estimador, avaliador, dados_treino, dados_teste)

In [0]:
# Naïve Bayes
from pyspark.ml.classification import NaiveBayes

estimador = NaiveBayes(
  smoothing=0.25,
  modelType="multinomial",
  labelCol="real",
  featuresCol="features",
  predictionCol="previsto"
)

params = ParamGridBuilder() \
    .addGrid(estimador.smoothing, [0.0, 0.25, 0.5, 0.75, 1.0]) \
    .addGrid(estimador.modelType, ["multinomial"]) \
    .build()
#fine_tune_model(estimador, params, avaliador, dados_treino, dados_teste)

evaluate_model('NB', estimador, avaliador, dados_treino, dados_teste)

In [0]:
# Factorization Machines (FM)
from pyspark.ml.classification import FMClassifier

estimador = FMClassifier(
  stepSize=0.001,
  labelCol="real",
  featuresCol="features",
  predictionCol="previsto"
)

params = ParamGridBuilder() \
    .addGrid(estimador.stepSize, [0.001, 0.01, 0.1, 1.0]) \
    .build()
#fine_tune_model(estimador, params, avaliador, dados_treino, dados_teste)

# só classificação binária!
evaluate_model('FM', estimador, avaliador, dados_treino, dados_teste)

In [0]:
#results = []
#for key, value in models.items():
#  tup = (key,) + value
#  results.append(tup)
#results

In [0]:
'''from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
schema = StructType([ \
  StructField("Model", StringType(), True), \
  StructField("Estimator", StringType(), True), \
  StructField("Score", FloatType(), True), \
  StructField("Time (ms)", IntegerType(), True) \
])'''

In [0]:
'''results_df = spark.createDataFrame(data=results, schema=schema)
results_df.printSchema()
results_df.show(truncate=False)'''

In [0]:
names = []
estimators = []
scores = []
times = []

for key, value in models.items():
  (model, score, elapsed) = value
  names.append(key)
  estimators.append(model)
  scores.append(score)
  times.append(elapsed)

In [0]:
import pandas as pd

results_df = pd.DataFrame({
  'Model': names,
  'Score': scores,
  'Time (ms)': times,
  'Estimator': estimators})

results_df.sort_values(by='Score', ascending=False)

Unnamed: 0,Model,Score,Time (ms),Estimator
1,DT,99.615126,2393,DecisionTreeClassificationModel: uid=DecisionT...
2,RF,99.190351,22776,RandomForestClassificationModel: uid=RandomFor...
6,GB,98.934773,18598,GBTClassificationModel: uid = GBTClassifier_b2...
3,LSVM,95.275978,9664,"LinearSVCModel: uid=LinearSVC_6fc815743a5f, nu..."
5,FM,95.027094,33890,FMClassificationModel: uid=FMClassifier_4bb846...
0,LR,94.949332,1669,LogisticRegressionModel: uid=LogisticRegressio...
4,NB,91.981002,612,"NaiveBayesModel: uid=NaiveBayes_0a15e8b730ca, ..."


In [0]:
modelo = models['DT'][0]
modelo

In [0]:
# gerar previsões e mostrar exemplos
previsoes = modelo.transform(dados_teste)
previsoes.select("nome", "real", "previsto").show(10)

In [0]:
display(previsoes.head(5))

nome,real,features,rawPrediction,probability,previsto
AARONSON,1,"Map(vectorType -> dense, length -> 16, values -> List(14.0, 15.0, 19.0, 14.0, 15.0, 18.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(0.0, 2236.0))","Map(vectorType -> dense, length -> 2, values -> List(0.0, 1.0))",1.0
ABADIO,1,"Map(vectorType -> dense, length -> 16, values -> List(15.0, 9.0, 4.0, 1.0, 2.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 5894.0))","Map(vectorType -> dense, length -> 2, values -> List(1.6963528413910093E-4, 0.999830364715861))",1.0
ABAITE,0,"Map(vectorType -> dense, length -> 16, values -> List(5.0, 20.0, 9.0, 1.0, 2.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(746.0, 8.0))","Map(vectorType -> dense, length -> 2, values -> List(0.9893899204244032, 0.010610079575596816))",0.0
ABDA,0,"Map(vectorType -> dense, length -> 16, values -> List(1.0, 4.0, 2.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(7789.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",0.0
ABDALLAH,0,"Map(vectorType -> dense, length -> 16, values -> List(8.0, 1.0, 12.0, 12.0, 1.0, 4.0, 2.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(72.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",0.0


In [0]:
# matriz de confusão
previsoes.groupBy("real", "previsto").count().show()

In [0]:
previsoes.filter("real = 1").filter("real != previsto").select("nome", "real", "previsto").show(10)

In [0]:
previsoes.filter("real = 0").filter("real != previsto").select("nome", "real", "previsto").show(10)

In [0]:
# https://docs.databricks.com/applications/machine-learning/model-export/mleap-model-export.html#export-and-import-models-in-python
#!pip install mleap

In [0]:
%sh
rm -rf /tmp/mleap_python_model_export
mkdir /tmp/mleap_python_model_export

In [0]:
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

modelo.serializeToBundle("jar:file:/tmp/mleap_python_model_export/72-modelo-genero-nome-pyspark-json.zip", previsoes)

In [0]:
dados = pd.DataFrame({'REAL': y, 'PREV': y_pred}, index=X.index)
for col in dados.columns:
    dados[col] = dados[col].map({0: 'X', 1: 'F', 2: 'M'})
dados.head()

In [0]:
from sklearn.metrics import accuracy_score

accuracy_score(y, y_pred)

In [0]:
from sklearn.metrics import confusion_matrix

confusion_matrix(y, y_pred)

In [0]:
dados[dados['REAL'] != dados['PREV']].head(20)