# <font color='blue'>Data Science Academy</font>
# <font color='blue'>Big Data Real-Time Analytics com Python e Spark</font>

# <font color='blue'>Capítulo 8</font>

### *********** Atenção: *********** 
Utilize Java JDK 1.8 e Apache Spark 2.4.2

Java JDK 1.8:

https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

*Caso receba mensagem de erro "name 'sc' is not defined", interrompa o pyspark e apague o diretório metastore_db no mesmo diretório onde está este Jupyter notebook*

# Spark SQL

O Spark SQL é usado para acessar dados estruturados com Spark.

#### Acesse http://localhost:4040 sempre que quiser acompanhar a execução dos jobs. 
#### Pacotes adicionais podem ser encontrados aqui: https://spark-packages.org/ (usaremos um destes pacotes para conexão com o MongoDB).

## Spark SQL - Spark Session e SQL Context

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row

In [2]:
print(sc)

<SparkContext master=local[*] appName=PySparkShell>


In [3]:
# Spark Session - usado para trabalhar com o Spark
spSession = SparkSession.builder.master("local").appName("DSA-SparkSQL").getOrCreate()

In [4]:
# Criando o SQL Context para trabalhar com Spark SQL
sqlContext = SQLContext(sc)

In [5]:
# Importando o arquivo e criando um RDD
linhasRDD1 = sc.textFile("data/carros.csv")

In [6]:
linhasRDD1.count()

198

In [7]:
# Removendo a primeira linha - Transformação 1
linhasRDD2 = linhasRDD1.filter(lambda x: "FUELTYPE" not in x)

In [8]:
linhasRDD2.count()

197

In [9]:
# Dividindo o conjunto de dados em colunas - Transformação 2
linhasRDD3 = linhasRDD2.map(lambda line: line.split(","))

In [10]:
# Dividindo o conjunto de dados em colunas - Transformação 3
linhasRDD4 = linhasRDD3.map(lambda p: Row(make = p[0], body = p[4], hp = int(p[7])))

In [11]:
print(linhasRDD4)

PythonRDD[4] at RDD at PythonRDD.scala:53


In [12]:
?Row

In [13]:
linhasRDD4.collect()

[Row(body='hatchback', hp=69, make='subaru'),
 Row(body='hatchback', hp=48, make='chevrolet'),
 Row(body='hatchback', hp=68, make='mazda'),
 Row(body='hatchback', hp=62, make='toyota'),
 Row(body='hatchback', hp=68, make='mitsubishi'),
 Row(body='hatchback', hp=60, make='honda'),
 Row(body='sedan', hp=69, make='nissan'),
 Row(body='hatchback', hp=68, make='dodge'),
 Row(body='hatchback', hp=68, make='plymouth'),
 Row(body='hatchback', hp=68, make='mazda'),
 Row(body='hatchback', hp=68, make='mitsubishi'),
 Row(body='hatchback', hp=68, make='dodge'),
 Row(body='hatchback', hp=68, make='plymouth'),
 Row(body='hatchback', hp=70, make='chevrolet'),
 Row(body='hatchback', hp=62, make='toyota'),
 Row(body='hatchback', hp=68, make='dodge'),
 Row(body='hatchback', hp=58, make='honda'),
 Row(body='hatchback', hp=62, make='toyota'),
 Row(body='hatchback', hp=76, make='honda'),
 Row(body='sedan', hp=70, make='chevrolet'),
 Row(body='sedan', hp=69, make='nissan'),
 Row(body='hatchback', hp=68, mak

In [14]:
# Criando um dataframe a partir do RDD
linhasDF = spSession.createDataFrame(linhasRDD4)

In [15]:
linhasDF.show()

+---------+---+----------+
|     body| hp|      make|
+---------+---+----------+
|hatchback| 69|    subaru|
|hatchback| 48| chevrolet|
|hatchback| 68|     mazda|
|hatchback| 62|    toyota|
|hatchback| 68|mitsubishi|
|hatchback| 60|     honda|
|    sedan| 69|    nissan|
|hatchback| 68|     dodge|
|hatchback| 68|  plymouth|
|hatchback| 68|     mazda|
|hatchback| 68|mitsubishi|
|hatchback| 68|     dodge|
|hatchback| 68|  plymouth|
|hatchback| 70| chevrolet|
|hatchback| 62|    toyota|
|hatchback| 68|     dodge|
|hatchback| 58|     honda|
|hatchback| 62|    toyota|
|hatchback| 76|     honda|
|    sedan| 70| chevrolet|
+---------+---+----------+
only showing top 20 rows



In [16]:
type(linhasDF)

pyspark.sql.dataframe.DataFrame

In [17]:
# Mesma coisa que: SELECT * FROM linhasDF
linhasDF.select("*").show()

+---------+---+----------+
|     body| hp|      make|
+---------+---+----------+
|hatchback| 69|    subaru|
|hatchback| 48| chevrolet|
|hatchback| 68|     mazda|
|hatchback| 62|    toyota|
|hatchback| 68|mitsubishi|
|hatchback| 60|     honda|
|    sedan| 69|    nissan|
|hatchback| 68|     dodge|
|hatchback| 68|  plymouth|
|hatchback| 68|     mazda|
|hatchback| 68|mitsubishi|
|hatchback| 68|     dodge|
|hatchback| 68|  plymouth|
|hatchback| 70| chevrolet|
|hatchback| 62|    toyota|
|hatchback| 68|     dodge|
|hatchback| 58|     honda|
|hatchback| 62|    toyota|
|hatchback| 76|     honda|
|    sedan| 70| chevrolet|
+---------+---+----------+
only showing top 20 rows



In [18]:
# Mesma coisa que: SELECT * FROM linhasDF ORDER BY make
linhasDF.orderBy("make").show()

+-----------+---+-----------+
|       body| hp|       make|
+-----------+---+-----------+
|  hatchback|154|alfa-romero|
|convertible|111|alfa-romero|
|convertible|111|alfa-romero|
|      sedan|110|       audi|
|      sedan|115|       audi|
|      sedan|110|       audi|
|      wagon|110|       audi|
|      sedan|140|       audi|
|      sedan|102|       audi|
|      sedan|101|        bmw|
|      sedan|101|        bmw|
|      sedan|121|        bmw|
|      sedan|121|        bmw|
|      sedan|182|        bmw|
|      sedan|182|        bmw|
|      sedan|121|        bmw|
|      sedan|182|        bmw|
|      sedan| 70|  chevrolet|
|  hatchback| 70|  chevrolet|
|  hatchback| 48|  chevrolet|
+-----------+---+-----------+
only showing top 20 rows



In [19]:
# Registrando o dataframe como uma Temp Table
linhasDF.createOrReplaceTempView("linhasTB")

In [20]:
!java -version

openjdk version "1.8.0_222"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.222-b10, mixed mode)


In [21]:
# Executando queries SQL ANSI
spSession.sql("select * from linhasTB where make = 'nissan'").show()

+---------+---+------+
|     body| hp|  make|
+---------+---+------+
|    sedan| 69|nissan|
|    sedan| 69|nissan|
|    sedan| 69|nissan|
|    sedan| 55|nissan|
|    sedan| 69|nissan|
|    wagon| 69|nissan|
|    sedan| 69|nissan|
|hatchback| 69|nissan|
|    wagon| 69|nissan|
|  hardtop| 69|nissan|
|hatchback| 97|nissan|
|    sedan| 97|nissan|
|    sedan|152|nissan|
|    sedan|152|nissan|
|    wagon|152|nissan|
|hatchback|160|nissan|
|hatchback|160|nissan|
|hatchback|200|nissan|
+---------+---+------+



In [22]:
# Executando queries SQL ANSI
spSession.sql("select make, body, avg(hp) from linhasTB group by make, body").show()

+-------------+-----------+-----------------+
|         make|       body|          avg(hp)|
+-------------+-----------+-----------------+
|       nissan|      wagon|96.66666666666667|
|       subaru|      sedan|             90.2|
|     plymouth|      sedan|             68.0|
|        dodge|  hatchback|             90.2|
|       nissan|      sedan|             89.0|
|        honda|      sedan|             89.8|
|   mitsubishi|  hatchback|            105.0|
|        mazda|      sedan|82.66666666666667|
|  alfa-romero|convertible|            111.0|
|mercedes-benz|convertible|            155.0|
|     plymouth|      wagon|             88.0|
|mercedes-benz|      wagon|            123.0|
|        isuzu|  hatchback|             90.0|
|       toyota|convertible|            116.0|
|        mazda|  hatchback|             89.4|
|    chevrolet|      sedan|             70.0|
|      mercury|  hatchback|            175.0|
|      porsche|  hatchback|            143.0|
|        honda|      wagon|       

## Spark SQL e Arquivos CSV

In [23]:
carrosDF = spSession.read.csv("data/carros.csv", header = True)

In [24]:
type(carrosDF)

pyspark.sql.dataframe.DataFrame

In [25]:
carrosDF.show()

+----------+--------+------+-----+---------+-----+---------+---+----+--------+-------+-----+
|      MAKE|FUELTYPE|ASPIRE|DOORS|     BODY|DRIVE|CYLINDERS| HP| RPM|MPG-CITY|MPG-HWY|PRICE|
+----------+--------+------+-----+---------+-----+---------+---+----+--------+-------+-----+
|    subaru|     gas|   std|  two|hatchback|  fwd|     four| 69|4900|      31|     36| 5118|
| chevrolet|     gas|   std|  two|hatchback|  fwd|    three| 48|5100|      47|     53| 5151|
|     mazda|     gas|   std|  two|hatchback|  fwd|     four| 68|5000|      30|     31| 5195|
|    toyota|     gas|   std|  two|hatchback|  fwd|     four| 62|4800|      35|     39| 5348|
|mitsubishi|     gas|   std|  two|hatchback|  fwd|     four| 68|5500|      37|     41| 5389|
|     honda|     gas|   std|  two|hatchback|  fwd|     four| 60|5500|      38|     42| 5399|
|    nissan|     gas|   std|  two|    sedan|  fwd|     four| 69|5200|      31|     37| 5499|
|     dodge|     gas|   std|  two|hatchback|  fwd|     four| 68|5500| 

In [26]:
# Registrando o dataframe como uma Temp Table
carrosDF.createOrReplaceTempView("carrosTB")

In [27]:
# Executando queries SQL ANSI
spSession.sql("select make, hp, price from carrosTB where CYLINDERS = 'three'").show()

+---------+---+-----+
|     make| hp|price|
+---------+---+-----+
|chevrolet| 48| 5151|
+---------+---+-----+



In [28]:
carrosTT = spSession.sql("select make, hp, price from carrosTB where CYLINDERS = 'three'")

In [29]:
carrosTT.show()

+---------+---+-----+
|     make| hp|price|
+---------+---+-----+
|chevrolet| 48| 5151|
+---------+---+-----+



## Aplicando Machine Learning

In [30]:
# Carregando o arquivo CSV e mantendo o objeto em cache
carros = sc.textFile("data/carros.csv")
carros.cache()

data/carros.csv MapPartitionsRDD[55] at textFile at NativeMethodAccessorImpl.java:0

In [31]:
# Remove a primeira linha (header)
primeiraLinha = carros.first()
linhas = carros.filter(lambda x: x != primeiraLinha)
linhas.count()

197

In [32]:
# Importando função row
from pyspark.sql import Row

In [33]:
# Convertendo para um vetor de linhas
def transformToNumeric(inputStr) :
    
    attList = inputStr.split(",")
    
    doors = 1.0 if attList[3] == "two" else 2.0
    
    body = 1.0 if attList[4] == "sedan" else 2.0 
       
    # Filtrando colunas não necessárias nesta etapa
    valores = Row(DOORS = doors, BODY = float(body), HP = float(attList[7]), RPM = float(attList[8]), MPG = float(attList[9]))
    return valores

In [34]:
# Aplicando a função aos dados e persistindo o resultado em memória
autoMap = linhas.map(transformToNumeric)
autoMap.persist()
autoMap.collect()

[Row(BODY=2.0, DOORS=1.0, HP=69.0, MPG=31.0, RPM=4900.0),
 Row(BODY=2.0, DOORS=1.0, HP=48.0, MPG=47.0, RPM=5100.0),
 Row(BODY=2.0, DOORS=1.0, HP=68.0, MPG=30.0, RPM=5000.0),
 Row(BODY=2.0, DOORS=1.0, HP=62.0, MPG=35.0, RPM=4800.0),
 Row(BODY=2.0, DOORS=1.0, HP=68.0, MPG=37.0, RPM=5500.0),
 Row(BODY=2.0, DOORS=1.0, HP=60.0, MPG=38.0, RPM=5500.0),
 Row(BODY=1.0, DOORS=1.0, HP=69.0, MPG=31.0, RPM=5200.0),
 Row(BODY=2.0, DOORS=1.0, HP=68.0, MPG=37.0, RPM=5500.0),
 Row(BODY=2.0, DOORS=1.0, HP=68.0, MPG=37.0, RPM=5500.0),
 Row(BODY=2.0, DOORS=1.0, HP=68.0, MPG=31.0, RPM=5000.0),
 Row(BODY=2.0, DOORS=1.0, HP=68.0, MPG=31.0, RPM=5500.0),
 Row(BODY=2.0, DOORS=2.0, HP=68.0, MPG=31.0, RPM=5500.0),
 Row(BODY=2.0, DOORS=2.0, HP=68.0, MPG=31.0, RPM=5500.0),
 Row(BODY=2.0, DOORS=1.0, HP=70.0, MPG=38.0, RPM=5400.0),
 Row(BODY=2.0, DOORS=1.0, HP=62.0, MPG=31.0, RPM=4800.0),
 Row(BODY=2.0, DOORS=1.0, HP=68.0, MPG=31.0, RPM=5500.0),
 Row(BODY=2.0, DOORS=1.0, HP=58.0, MPG=49.0, RPM=4800.0),
 Row(BODY=2.0,

In [35]:
# Criando o Dataframe
carrosDf = spSession.createDataFrame(autoMap)
carrosDf.show()

+----+-----+----+----+------+
|BODY|DOORS|  HP| MPG|   RPM|
+----+-----+----+----+------+
| 2.0|  1.0|69.0|31.0|4900.0|
| 2.0|  1.0|48.0|47.0|5100.0|
| 2.0|  1.0|68.0|30.0|5000.0|
| 2.0|  1.0|62.0|35.0|4800.0|
| 2.0|  1.0|68.0|37.0|5500.0|
| 2.0|  1.0|60.0|38.0|5500.0|
| 1.0|  1.0|69.0|31.0|5200.0|
| 2.0|  1.0|68.0|37.0|5500.0|
| 2.0|  1.0|68.0|37.0|5500.0|
| 2.0|  1.0|68.0|31.0|5000.0|
| 2.0|  1.0|68.0|31.0|5500.0|
| 2.0|  2.0|68.0|31.0|5500.0|
| 2.0|  2.0|68.0|31.0|5500.0|
| 2.0|  1.0|70.0|38.0|5400.0|
| 2.0|  1.0|62.0|31.0|4800.0|
| 2.0|  1.0|68.0|31.0|5500.0|
| 2.0|  1.0|58.0|49.0|4800.0|
| 2.0|  2.0|62.0|31.0|4800.0|
| 2.0|  1.0|76.0|30.0|6000.0|
| 1.0|  2.0|70.0|38.0|5400.0|
+----+-----+----+----+------+
only showing top 20 rows



In [36]:
# Sumarizando as estatísticas do conjunto de dados
summStats = carrosDf.describe().toPandas()
summStats

Unnamed: 0,summary,BODY,DOORS,HP,MPG,RPM
0,count,197.0,197.0,197.0,197.0,197.0
1,mean,1.532994923857868,1.5685279187817258,103.60406091370558,25.15228426395939,5118.020304568528
2,stddev,0.5001812579359883,0.4965435277816749,37.63920534951836,6.437862917085915,481.03591405011446
3,min,1.0,1.0,48.0,13.0,4150.0
4,max,2.0,2.0,262.0,49.0,6600.0


In [None]:
# Extraindo as médias
medias = summStats.iloc[1,1:5].values.tolist()
medias

In [None]:
# Extraindo o desvio padrão
desvios_padroes = summStats.iloc[2,1:5].values.tolist()
desvios_padroes

In [None]:
# Inserindo a média e o desvio padrão em uma variável do tipo broadcast 
bcMedias = sc.broadcast(medias)
bcDesviosP = sc.broadcast(desvios_padroes)

In [None]:
# Importando a Função Vectors
from pyspark.ml.linalg import Vectors

In [None]:
# Função para normalizar os dados e criar um vetor denso
def centerAndScale(inRow) :
    global bcMedias
    global bcDesviosP
    
    meanArray = bcMedias.value
    stdArray = bcDesviosP.value

    retArray = []
    
    for i in range(len(meanArray)):
        retArray.append( (float(inRow[i]) - float(meanArray[i])) / float(stdArray[i]) )
    return Vectors.dense(retArray)

In [None]:
# Aplicando a normalização aos dados
csAuto = carrosDf.rdd.map(centerAndScale)
csAuto.collect()

In [None]:
# Criando um Spark Dataframe com as features (atributos)
autoRows = csAuto.map(lambda f: Row(features = f))
autoDf = spSession.createDataFrame(autoRows)
autoDf.select("features").show(10)

In [None]:
# Importando o algoritmo K-Means para clusterização
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k = 3, seed = 1)
modelo = kmeans.fit(autoDf)
previsoes = modelo.transform(autoDf)
previsoes.show()

In [None]:
# Plot dos resultados
import pandas as pd
import matplotlib.pylab as plt
%matplotlib inline

In [None]:
# Função para leitura dos dados e plotagem
def unstripData(instr) :
    return ( instr["prediction"], instr["features"][0], instr["features"][1],instr["features"][2],instr["features"][3])


In [None]:
# Organizando os dados para o Plot
unstripped = previsoes.rdd.map(unstripData)
predList = unstripped.collect()
predPd = pd.DataFrame(predList)

In [None]:
plt.cla()
plt.scatter(predPd[3], predPd[4], c = predPd[0])

## Spark SQL e Arquivos JSON

Neste site você pode validar a estrutura de um arquivo JSON: http://jsonlint.com/

In [None]:
# Importando o arquivo JSON
funcDF = spSession.read.json("data/funcionarios.json")

In [None]:
funcDF.show()

In [None]:
funcDF.printSchema()

In [None]:
type(funcDF)

In [None]:
# Operações com Dataframe Spark SQL - select()
funcDF.select("nome").show()

In [None]:
# Operações com Dataframe Spark SQL - filter()
funcDF.filter(funcDF["idade"] == 50).show()

In [None]:
# Operações com Dataframe Spark SQL - groupBy()
funcDF.groupBy("sexo").count().show()

In [None]:
# Operações com Dataframe Spark SQL - groupBy()
funcDF.groupBy("deptid").agg({"salario": "avg", "idade": "max"}).show()

In [None]:
# Registrando o dataframe como uma Temp Table
funcDF.registerTempTable("funcTB")

In [None]:
# Executando queries SQL ANSI
spSession.sql("select deptid, max(idade), avg(salario) from funcTB group by deptid").show()

## Temp Tables

In [None]:
# Registrando o dataframe como temp Table
funcDF.createOrReplaceTempView("funcTB")

In [None]:
spSession.sql("select * from funcTB where salario = 9700").show()

In [None]:
# Criando Temp Table
sqlContext.registerDataFrameAsTable(funcDF, "funcTB2")

In [None]:
type(funcTB2)

In [None]:
# Persistindo a Temp Table 
funcTB3 = spSession.table("funcTB2")

In [None]:
type(funcTB3)

In [None]:
# Comparando o Dataframe com a tabela temporária criada
sorted(funcDF.collect()) == sorted(funcTB3.collect())

In [None]:
# Aplicando o filtro
sqlContext.registerDataFrameAsTable(funcDF, "funcTB2")
funcTB3 = spSession.table("funcTB2")
funcTB3.filter("idade = '42'").first()

In [None]:
# Drop Temp Table
sqlContext.dropTempTable("funcTB2")

## Banco de Dados Relacional

Extraindo Dados do MySQL. Primeiro precisamos baixar o driver JDBC. Haverá um driver JDBC para cada banco de dados que você conectar (Oracle, SQL Server, etc...)

1- Download do Driver JDBC para o MySQL: http://dev.mysql.com/downloads/connector/j/

2- Baixar o arquivo .zip

3- Descompactar o arquivo e copiar o arquivo mysql-connector-java-8.0.16.jar para a pasta /opt/Spark/jars ou para SO Windows em C:\Spark\jars

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spSession = SparkSession.builder.master("local").appName("DSA-SparkSQL").getOrCreate()
sqlContext = SQLContext(sc)

In [None]:
mysql_df = spSession.read.format("jdbc").options(
    url = "jdbc:mysql://localhost/carros",
    serverTimezone = "UTC",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "carrosTB",
    user = "root",
    password = "dsa1234").load()

In [None]:
mysql_df.show()

In [None]:
mysql_df.registerTempTable("carrostb")

In [None]:
spSession.sql("select * from carrostb where hp = '68'").show()

## Banco de Dados Não-Relacional

Spark Connector: https://docs.mongodb.com/spark-connector/current/

Mongo Spark: https://spark-packages.org/package/mongodb/mongo-spark

$SPARK_HOME/bin/pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0

In [None]:
# Imports
from pyspark.sql import SparkSession

### Leitura

In [None]:
# Cria a sessão
my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://localhost/test_db.test_collection") \
    .config("spark.mongodb.output.uri", "mongodb://localhost/test_db.test_collection") \
    .getOrCreate()

In [None]:
# Carrega os dados do MongoDB no Spark
dados = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [None]:
dados.printSchema()

In [None]:
dados.count()

In [None]:
dados.head()

In [None]:
dados.show()

### Gravação

In [None]:
registro = spark.createDataFrame([("Camisa T-Shirt",  50)], ["item", "qty"])

In [None]:
registro.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

In [None]:
dados.show()

# Fim

### Obrigado - Data Science Academy - <a href="http://facebook.com/dsacademybr">facebook.com/dsacademybr</a>