In [1]:
#faz import de todas as bibliotecas a serem utilizadas
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import cv2
import pandas as pd
import numpy as np
import os
import json
from tqdm import tqdm, tqdm_notebook
from keras.models import Sequential
from keras.layers import Dense, Flatten, Activation
from keras.layers import Dropout
from keras.layers.convolutional import Conv2D, MaxPooling2D
from keras.utils import np_utils
from keras.optimizers import SGD, RMSprop, Adam
from keras.models import model_from_json
from keras.models import load_model
import matplotlib.pyplot as plt
import pickle

Using TensorFlow backend.


In [257]:
#inicia a captura dos dados do tópico do Kafka
dados = spark.readStream.format("kafka")\
        .option("kafka.bootstrap.servers", "elephant:9092")\
        .option("subscribe", "prediction")\
        .option("startingOffsets","latest")\
        .load()

In [258]:
#transforma os dados capturados em string
dados_transformados = dados.selectExpr("CAST(value AS STRING) as csv")

In [259]:
#separa os campos pela vírgula
dados_finais = dados_transformados.selectExpr(
    "split(csv, ',')[0] as Type", 
    "split(csv, ',')[1] as Name", 
    "split(csv, ',')[2] as Age",
    "split(csv, ',')[3] as Breed1", 
    "split(csv, ',')[4] as Breed2", 
    "split(csv, ',')[5] as Gender",
    "split(csv, ',')[6] as Color1", 
    "split(csv, ',')[7] as Color2", 
    "split(csv, ',')[8] as Color3",
    "split(csv, ',')[9] as MaturitySize",
    "split(csv, ',')[10] as FurLength", 
    "split(csv, ',')[11] as Vaccinated", 
    "split(csv, ',')[12] as Dewormed",
    "split(csv, ',')[13] as Sterilized", 
    "split(csv, ',')[14] as Health",
    "split(csv, ',')[15] as Quantity",
    "split(csv, ',')[16] as Fee", 
    "split(csv, ',')[17] as State",
    "split(csv, ',')[18] as RescuerID", 
    "split(csv, ',')[19] as VideoAmt",
    "split(csv, ',')[20] as Description",
    "split(csv, ',')[21] as PetID",
    "split(csv, ',')[22] as PhotoAmt"
)

In [260]:
!hdfs dfs -rm -r /user/labdata/checkpoint
!hdfs dfs -rm -r /user/labdata/prediction_streaming

Deleted /user/labdata/checkpoint
Deleted /user/labdata/prediction_streaming


In [261]:
#gravar os dados recebidos do tópico Kafka no HDFS para que possa ser executado depois o modelo 
consulta = dados_finais.writeStream.format("csv")\
                               .option("checkpointLocation", "/user/labdata/checkpoint")\
                               .start("/user/labdata/prediction_streaming")

In [262]:
#lê o tópico Kafka no Dataframe do Spark
df_streaming_total = sqlContext.read.option("delimiter",",").option("header","false").csv("/user/labdata/prediction_streaming/part-*")

In [263]:
df_streaming_total.show(truncate=False)

+---+-----------+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+-----+--------------------------------+----+---------+---------+----+
|_c0|_c1        |_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17 |_c18                            |_c19|_c20     |_c21     |_c22|
+---+-----------+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+-----+--------------------------------+----+---------+---------+----+
|2  |Tom & Jerry|3  |265|264|1  |6  |0  |0  |3  |2   |1   |1   |1   |1   |1   |30  |41326|6f40a7acfad5cc0bb3e44591ea446c05|0   |descricao|ac9fefd28|5.0 |
+---+-----------+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+-----+--------------------------------+----+---------+---------+----+



In [264]:
#Recupera o schema atual do dataframe e monta as novas colunas
oldColumns = df_streaming_total.schema.names
newColumns = ['Type', 'Name', 'Age', 'Breed1', 'Breed2', 'Gender','Color1', 'Color2', 'Color3', 'MaturitySize',
                     'FurLength', 'Vaccinated', 'Dewormed', 'Sterilized', 'Health','Quantity', 'Fee', 'State', 
                     'RescuerID', 'VideoAmt', 'Description','PetID', 'PhotoAmt']

In [265]:
#renomeia as colunas com os nomes utilizados no modelo
i = 0
while i <= 22:
    df_streaming_total = df_streaming_total.withColumnRenamed(oldColumns[i], newColumns[i])
    i = i + 1

In [266]:
df_streaming_total.show()

+----+-----------+---+------+------+------+------+------+------+------------+---------+----------+--------+----------+------+--------+---+-----+--------------------+--------+-----------+---------+--------+
|Type|       Name|Age|Breed1|Breed2|Gender|Color1|Color2|Color3|MaturitySize|FurLength|Vaccinated|Dewormed|Sterilized|Health|Quantity|Fee|State|           RescuerID|VideoAmt|Description|    PetID|PhotoAmt|
+----+-----------+---+------+------+------+------+------+------+------------+---------+----------+--------+----------+------+--------+---+-----+--------------------+--------+-----------+---------+--------+
|   2|Tom & Jerry|  3|   265|   264|     1|     6|     0|     0|           3|        2|         1|       1|         1|     1|       1| 30|41326|6f40a7acfad5cc0bb...|       0|  descricao|ac9fefd28|     5.0|
+----+-----------+---+------+------+------+------+------+------+------------+---------+----------+--------+----------+------+--------+---+-----+--------------------+--------+--

In [267]:
#transforma o dataframe spark em dataframe python, pois, nosso modelo está no python/keras
df_streaming = df_streaming_total.toPandas()
df_streaming_totpandas = df_streaming_total.toPandas()

In [268]:
#retirando colunas de nome e descrição, neste primeiro momento não faremos analise de sentimento.
df_streaming = df_streaming.drop(columns=['Description','State','RescuerID'])

In [269]:
#transformar algumas váriaveis para o tipo Inteiro,porque na binarização das variáveis ocorre erro por não serem 
#variáveis numéricas
df_streaming['Type'] = int(df_streaming['Type'])
df_streaming['Gender'] = int(df_streaming['Gender'])
df_streaming['FurLength'] = int(df_streaming['FurLength'])
df_streaming['Vaccinated'] = int(df_streaming['Vaccinated'])
df_streaming['Dewormed'] = int(df_streaming['Dewormed'])
df_streaming['Sterilized'] = int(df_streaming['Sterilized'])
df_streaming['MaturitySize'] = int(df_streaming['MaturitySize'])
df_streaming['Health'] = int(df_streaming['Health'])

In [270]:
#Agrupar o conteúdo de algumas variáveis(Clusterização)

#Name: 2 classes (com nome e sem nome)
df_streaming['NameNew'] = ['1' if pd.notnull(x) else '2' for x in df_streaming['Name']]

#Age: 3 classes (até 3 meses, até 12 meses e acima de 12 meses)
df_streaming['AgeNew'] = ['1' if int(x)<=3 else('2' if (int(x)>3 and int(x)<=12) else '3') for x in df_streaming['Age']]

#Breed1: 3 classes
#Sem raça definida: breed 1 = 307, 266, 265, 299, 264
#Raça pura: breed 1 <> 307, 266, 265, 299, 264 e breed 2 = 0
#Raça misturada: breed 1 <> 307, 266, 265, 299, 264 e breed 2 <> 0
df_streaming['Breed1New'] = ['1' if x in['307','266','265','299','264'] else '2' for x in df_streaming['Breed1']]

#Color: 2 classes
#Quantidade de cores: 1 ou mais que 1
df_streaming['ColorQtde'] = ['1' if x == '0' else '2' for x in df_streaming['Color2']]
#Cor predominante: escura ou clara
df_streaming['ColorMain'] = ['1' if int(x) <= 2 else '2' for x in df_streaming['Color1']]

#MaturitySize: 3 classes (juntar 3-Large e 4-Extra large)
#1 = Small, 2 = Medium, 3 = Large
df_streaming['MaturitySizeNew'] = [x if x != '4' else '3' for x in df_streaming['MaturitySize']] 

#Health: 2 classes (saudável e não saudável)
df_streaming['HealthNew'] = [x if x != '3' else '2' for x in df_streaming['Health']] 

#Quantity: 2 classes (1 animal e mais de 1 animal)
df_streaming['QuantityNew'] = ['2' if int(x)>=2 else '1' for x in df_streaming['Quantity']]

#Fee: 2 classes (com taxa e sem taxa)
df_streaming['FeeNew'] = ['2' if int(x)>0 else '1' for x in df_streaming['Fee']]

#PhotoAmt: 2 classes (1 foto e mais de 1 foto)
df_streaming['PhotoAmtNew'] = ['1' if float(x)==1 else '2' for x in df_streaming['PhotoAmt']]


In [271]:
#exclui as variáveis que foram utilizadas apenas para a criação de novas variáveis.
df_streaming = df_streaming.drop(columns=['Name','VideoAmt','Age','Breed1','Breed2','Color1','Color2','Color3','MaturitySize','Health','Quantity','Fee','PhotoAmt'])

In [272]:
#Tratando as features para serem utilizadas na rede neural
#Categóricas
labels_streaming = ['Type', 'Gender', 'FurLength', 'Vaccinated', 'Dewormed', 'Sterilized','NameNew', 'AgeNew', 'Breed1New', 'ColorQtde','ColorMain', 'MaturitySizeNew', 'HealthNew', 'QuantityNew', 'FeeNew','PhotoAmtNew']

In [273]:
#carregando o modelo do label binarizer para binazirar as variáveis do dataframe
loaded_model_lb = pickle.load(open('binarizer_model.sav', 'rb'))

In [274]:
df_streaming

Unnamed: 0,Type,Gender,FurLength,Vaccinated,Dewormed,Sterilized,PetID,NameNew,AgeNew,Breed1New,ColorQtde,ColorMain,MaturitySizeNew,HealthNew,QuantityNew,FeeNew,PhotoAmtNew
0,2,1,2,1,1,1,ac9fefd28,1,1,1,1,2,3,1,1,2,2


In [275]:
#faz o tranform do dataframe para binarizar as variáveis
for a in labels_streaming:
    df_streaming[a] = loaded_model_lb.transform(df_streaming[a])

  mask |= (ar1 == a)
  mask |= (ar1 == a)
  mask |= (ar1 == a)
  mask |= (ar1 == a)
  mask |= (ar1 == a)
  mask |= (ar1 == a)
  mask |= (ar1 == a)
  mask |= (ar1 == a)


In [276]:
df_streaming

Unnamed: 0,Type,Gender,FurLength,Vaccinated,Dewormed,Sterilized,PetID,NameNew,AgeNew,Breed1New,ColorQtde,ColorMain,MaturitySizeNew,HealthNew,QuantityNew,FeeNew,PhotoAmtNew
0,0,0,0,0,0,0,ac9fefd28,0,0,0,0,1,0,0,0,1,1


In [277]:
#tratando a imagem que será lida no HDFS
img_size = 128
def resize_to_square(im):
    old_size = im.shape[:2]
    ratio = float(img_size)/max(old_size)
    new_size = tuple([int(x*ratio) for x in old_size])    
    im = cv2.resize(im, (new_size[1], new_size[0]))
    delta_w = img_size - new_size[1]
    delta_h = img_size - new_size[0]
    top, bottom = delta_h//2, delta_h-(delta_h//2)
    left, right = delta_w//2, delta_w-(delta_w//2)
    color = [0, 0, 0]
    new_im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT,value=color)
    return new_im

In [278]:
#recuperando a imagem cadastrada para o animal
images_streaming = []
path = "train_images/"
for i in df_streaming['PetID']:
    try:
        im = cv2.imread(f'{path}{i}-1.jpg')
        im = resize_to_square(im)
        print(im.shape)
        images_streaming.append(im)
    except:
        pass
images_streaming = np.asarray(images_streaming)

(128, 128, 3)


In [279]:
#divide o array da imagem em 255
images_streaming = images_streaming/255.0

In [280]:
#exclui a coluna PetID do dataframe para poder rodar o modelo
df_streaming = df_streaming.drop(columns=['PetID'])

In [281]:
# Carregar json e criar modelo
json_file = open('model.json', 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json)

# Carregando pesos para o modelo
loaded_model.load_weights("model.h5")
print("Modelo carregado do disco")


Modelo carregado do disco


In [282]:
#executa o modelo de predição de adoção
preds = loaded_model.predict([df_streaming, images_streaming])

In [283]:
preds

array([[0.6327927 , 0.36720732]], dtype=float32)

In [284]:
#inclui a variável resposta no dataframe final do spark
y = preds[:,-1]
if y>=0.4:
    df_streaming_totpandas['AdoptionSpeed'] = 4
else:
    df_streaming_totpandas['AdoptionSpeed'] = 1


In [285]:
df_streaming_totpandas

Unnamed: 0,Type,Name,Age,Breed1,Breed2,Gender,Color1,Color2,Color3,MaturitySize,...,Health,Quantity,Fee,State,RescuerID,VideoAmt,Description,PetID,PhotoAmt,AdoptionSpeed
0,2,Tom & Jerry,3,265,264,1,6,0,0,3,...,1,1,30,41326,6f40a7acfad5cc0bb3e44591ea446c05,0,descricao,ac9fefd28,5.0,1


In [121]:
df_streaming_tot = spark.createDataFrame(df_streaming_totpandas)

In [122]:
#grava o novo dataframe com a variável resposta no HDFS.
df_streaming_tot.coalesce(1).write.save('/user/labdata/prediction_results', format='csv', mode='append')

In [123]:
#retorna a probabilidade do animal ser adotado
score = preds[:,0]*100
resultado = 'A probabilidade de adoção é : {0} %'.format(score)
print(resultado)


A probabilidade de adoção é : [80.81749] %
