# <font color='blue'>Big Data Real-Time Analytics com Python e Spark</font>

# <font color='blue'>Capítulo 8</font>

<img width='400px' src='spark.png'>

### *********** Atenção: *********** 
Utilize Java JDK 1.8 e Apache Spark 2.4.2

Java JDK 1.8:

https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

*Caso receba mensagem de erro "name 'sc' is not defined", interrompa o pyspark e apague o diretório metastore_db no mesmo diretório onde está este Jupyter notebook*

---

# Spark SQL - Manipulação de GRANDES conjuntos de dados

O Spark SQL é usado para acessar, consultar e manipular dados estruturados com Spark.

<img width='700px' src='dataframe.png'>

#### Acessar http://localhost:4040 para execução dos jobs - User Interface
#### Pacotes adicionais podem ser encontrados aqui: https://spark-packages.org/ (usaremos um destes pacotes para conexão com o MongoDB).

---

## Spark SQL - Spark Session e SQL Context

# val sqlContext = new org.apache.spark.sql.SQLContext(sc)

In [4]:
# Pacote SQL do PySpark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row

Ao executar a linha de comando PySpark, automaticamente é gerado um "SparkContext", estabelecendo a conexão com a infraestura do Apache Spark. 

Neste caso é uma conexão local, a minha própria máquina. 

In [7]:
print(sc)

<SparkContext master=local[*] appName=PySparkShell>


## Para trabalhar com Spark SQL, são necessários 3 componentes:

### 1. Spark Session.builder.master("local")
### 2. SQL Context
### 3. DataFrame Estrutura de Dados

### Método .getOrCreate() - Se essa sessão já existir, conectamos. Caso contráro, criamos. 

In [9]:
# Spark Session - usado para trabalhar com o Spark
spSession = SparkSession.builder.master("local").appName("Leo-SparkSQL").getOrCreate()

### Com o SparkSession criado, precisamos de um SQLContext, apenas chamando a função e passando (sc)

In [None]:
# Criando o SQL Context para trabalhar com Spark SQL
sqlContext = SQLContext(sc)

### Criar RDD passando o arquivo .csv em sc.textFile()

In [10]:
# Importando o arquivo e criando um RDD
linhasRDD1 = sc.textFile("data/carros.csv")

In [11]:
linhasRDD1.count() # Chamar .count() AÇÃO

198

### Executar uma TRANSFORMAÇÃO - lazy evaluation, fica na fila

- Aplicando filtro ao RDD
- Passando função anônima como parâmetro
- Neste filtro lambda retorna x: sempre que a palavra "FUELTYPE" não aparecer em x. 
- "FUELTYPE" é uma das palavras que aparece no dataset

## Transformação 1- Filtro not in

In [13]:
# Removendo a primeira linha - Transformação 1
linhasRDD2 = linhasRDD1.filter(lambda x: "FUELTYPE" not in x)

### Processada ao chamar a AÇÃO .count()

In [14]:
linhasRDD2.count()

197

## Transformação 2 - Separar por Colunas

- Aplicando a função map() ao RDD
- Passando como parâmetro a função anônima
- Lambda retorna cada linha, para cada linha realizar .split() com caracter ","

In [16]:
# Dividindo o conjunto de dados em colunas - Transformação 2
linhasRDD3 = linhasRDD2.map(lambda line: line.split(","))

## Transformação 3 - 



In [20]:
# Dividindo o conjunto de dados em colunas - Transformação 3
linhasRDD4 = linhasRDD3.map(lambda p: Row(make = p[0], body = p[4], hp = int(p[7])))

In [21]:
print(linhasRDD4)

PythonRDD[4] at RDD at PythonRDD.scala:53


In [18]:
?Row

In [22]:
linhasRDD4.collect()

[Row(body='hatchback', hp=69, make='subaru'),
 Row(body='hatchback', hp=48, make='chevrolet'),
 Row(body='hatchback', hp=68, make='mazda'),
 Row(body='hatchback', hp=62, make='toyota'),
 Row(body='hatchback', hp=68, make='mitsubishi'),
 Row(body='hatchback', hp=60, make='honda'),
 Row(body='sedan', hp=69, make='nissan'),
 Row(body='hatchback', hp=68, make='dodge'),
 Row(body='hatchback', hp=68, make='plymouth'),
 Row(body='hatchback', hp=68, make='mazda'),
 Row(body='hatchback', hp=68, make='mitsubishi'),
 Row(body='hatchback', hp=68, make='dodge'),
 Row(body='hatchback', hp=68, make='plymouth'),
 Row(body='hatchback', hp=70, make='chevrolet'),
 Row(body='hatchback', hp=62, make='toyota'),
 Row(body='hatchback', hp=68, make='dodge'),
 Row(body='hatchback', hp=58, make='honda'),
 Row(body='hatchback', hp=62, make='toyota'),
 Row(body='hatchback', hp=76, make='honda'),
 Row(body='sedan', hp=70, make='chevrolet'),
 Row(body='sedan', hp=69, make='nissan'),
 Row(body='hatchback', hp=68, mak

## Criar Tabela|DataFrame a partir do spSession passando como parâmetro um RDD

In [24]:
# Criando um dataframe a partir do RDD
linhasDF = spSession.createDataFrame(linhasRDD4)

In [25]:
linhasDF.show()

+---------+---+----------+
|     body| hp|      make|
+---------+---+----------+
|hatchback| 69|    subaru|
|hatchback| 48| chevrolet|
|hatchback| 68|     mazda|
|hatchback| 62|    toyota|
|hatchback| 68|mitsubishi|
|hatchback| 60|     honda|
|    sedan| 69|    nissan|
|hatchback| 68|     dodge|
|hatchback| 68|  plymouth|
|hatchback| 68|     mazda|
|hatchback| 68|mitsubishi|
|hatchback| 68|     dodge|
|hatchback| 68|  plymouth|
|hatchback| 70| chevrolet|
|hatchback| 62|    toyota|
|hatchback| 68|     dodge|
|hatchback| 58|     honda|
|hatchback| 62|    toyota|
|hatchback| 76|     honda|
|    sedan| 70| chevrolet|
+---------+---+----------+
only showing top 20 rows



In [27]:
type(linhasDF)

pyspark.sql.dataframe.DataFrame

In [28]:
# Mesma coisa que: SELECT * FROM linhasDF
linhasDF.select("*").show()

+---------+---+----------+
|     body| hp|      make|
+---------+---+----------+
|hatchback| 69|    subaru|
|hatchback| 48| chevrolet|
|hatchback| 68|     mazda|
|hatchback| 62|    toyota|
|hatchback| 68|mitsubishi|
|hatchback| 60|     honda|
|    sedan| 69|    nissan|
|hatchback| 68|     dodge|
|hatchback| 68|  plymouth|
|hatchback| 68|     mazda|
|hatchback| 68|mitsubishi|
|hatchback| 68|     dodge|
|hatchback| 68|  plymouth|
|hatchback| 70| chevrolet|
|hatchback| 62|    toyota|
|hatchback| 68|     dodge|
|hatchback| 58|     honda|
|hatchback| 62|    toyota|
|hatchback| 76|     honda|
|    sedan| 70| chevrolet|
+---------+---+----------+
only showing top 20 rows



In [29]:
# Mesma coisa que: SELECT * FROM linhasDF ORDER BY make
linhasDF.orderBy("make").show()

+-----------+---+-----------+
|       body| hp|       make|
+-----------+---+-----------+
|  hatchback|154|alfa-romero|
|convertible|111|alfa-romero|
|convertible|111|alfa-romero|
|      sedan|110|       audi|
|      sedan|115|       audi|
|      sedan|110|       audi|
|      wagon|110|       audi|
|      sedan|140|       audi|
|      sedan|102|       audi|
|      sedan|101|        bmw|
|      sedan|101|        bmw|
|      sedan|121|        bmw|
|      sedan|121|        bmw|
|      sedan|182|        bmw|
|      sedan|182|        bmw|
|      sedan|121|        bmw|
|      sedan|182|        bmw|
|      sedan| 70|  chevrolet|
|  hatchback| 70|  chevrolet|
|  hatchback| 48|  chevrolet|
+-----------+---+-----------+
only showing top 20 rows



## Criando tabela temporária

In [31]:
# Registrando o dataframe como uma Temp Table
linhasDF.createOrReplaceTempView("linhasTB")

In [49]:
!java -version

java version "11.0.6" 2020-01-14 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.6+8-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.6+8-LTS, mixed mode)


In [None]:
# Executando queries SQL ANSI --- JAVA 8
spSession.sql("select * from linhasTB where make = 'nissan'").show()

In [None]:
# Executando queries SQL ANSI ---- JAVA 8
spSession.sql("select make, body, avg(hp) from linhasTB group by make, body").show()

---

## Spark SQL e Arquivos CSV

In [35]:
carrosDF = spSession.read.csv("data/carros.csv", header = True)

In [36]:
type(carrosDF)

pyspark.sql.dataframe.DataFrame

In [37]:
carrosDF.show()

+----------+--------+------+-----+---------+-----+---------+---+----+--------+-------+-----+
|      MAKE|FUELTYPE|ASPIRE|DOORS|     BODY|DRIVE|CYLINDERS| HP| RPM|MPG-CITY|MPG-HWY|PRICE|
+----------+--------+------+-----+---------+-----+---------+---+----+--------+-------+-----+
|    subaru|     gas|   std|  two|hatchback|  fwd|     four| 69|4900|      31|     36| 5118|
| chevrolet|     gas|   std|  two|hatchback|  fwd|    three| 48|5100|      47|     53| 5151|
|     mazda|     gas|   std|  two|hatchback|  fwd|     four| 68|5000|      30|     31| 5195|
|    toyota|     gas|   std|  two|hatchback|  fwd|     four| 62|4800|      35|     39| 5348|
|mitsubishi|     gas|   std|  two|hatchback|  fwd|     four| 68|5500|      37|     41| 5389|
|     honda|     gas|   std|  two|hatchback|  fwd|     four| 60|5500|      38|     42| 5399|
|    nissan|     gas|   std|  two|    sedan|  fwd|     four| 69|5200|      31|     37| 5499|
|     dodge|     gas|   std|  two|hatchback|  fwd|     four| 68|5500| 

In [41]:
# Registrando o dataframe como uma Temp Table
carrosDF.createOrReplaceTempView("carrosTB")

In [None]:
# Executando queries SQL ANSI
spSession.sql("select make, hp, price from carrosTB where CYLINDERS = 'three'").show()

In [None]:
carrosTT = spSession.sql("select make, hp, price from carrosTB where CYLINDERS = 'three'")

In [None]:
carrosTT.show()

## Aplicando Machine Learning

In [43]:
# Carregando o arquivo CSV e mantendo o objeto em cache
carros = sc.textFile("data/carros.csv")
carros.cache()

data/carros.csv MapPartitionsRDD[40] at textFile at NativeMethodAccessorImpl.java:0

In [44]:
# Remove a primeira linha (header)
primeiraLinha = carros.first()
linhas = carros.filter(lambda x: x != primeiraLinha)
linhas.count()

197

In [None]:
# Importando função row
from pyspark.sql import Row

In [None]:
# Convertendo para um vetor de linhas
def transformToNumeric(inputStr) :
    
    attList = inputStr.split(",")
    
    doors = 1.0 if attList[3] == "two" else 2.0
    
    body = 1.0 if attList[4] == "sedan" else 2.0 
       
    # Filtrando colunas não necessárias nesta etapa
    valores = Row(DOORS = doors, BODY = float(body), HP = float(attList[7]), RPM = float(attList[8]), MPG = float(attList[9]))
    return valores

In [None]:
# Aplicando a função aos dados e persistindo o resultado em memória
autoMap = linhas.map(transformToNumeric)
autoMap.persist()
autoMap.collect()

In [None]:
# Criando o Dataframe
carrosDf = spSession.createDataFrame(autoMap)
carrosDf.show()

## .decribe() Descrever estatísticas do DF, estruturar em Pandas

In [None]:
# Sumarizando as estatísticas do conjunto de dados
summStats = carrosDf.describe().toPandas()
summStats

In [None]:
# Extraindo as médias
medias = summStats.iloc[1,1:5].values.tolist()
medias

In [None]:
# Extraindo o desvio padrão
desvios_padroes = summStats.iloc[2,1:5].values.tolist()
desvios_padroes

In [None]:
# Inserindo a média e o desvio padrão em uma variável do tipo broadcast 
bcMedias = sc.broadcast(medias)
bcDesviosP = sc.broadcast(desvios_padroes)

In [None]:
# Importando a Função Vectors
from pyspark.ml.linalg import Vectors

In [None]:
# Função para normalizar os dados e criar um vetor denso
def centerAndScale(inRow) :
    global bcMedias
    global bcDesviosP
    
    meanArray = bcMedias.value
    stdArray = bcDesviosP.value

    retArray = [] # array vazio 
    
# Para cada valor de i no range de valores, no comprimento de array de médias    
    for i in range(len(meanArray)):
# Pegar o Array vazio e adicionar a narmalização para deixar os dados na mesma escala
        retArray.append( (float(inRow[i]) - float(meanArray[i])) / float(stdArray[i]) )
# 
    return Vectors.dense(retArray)

In [None]:
# Aplicando a normalização aos dados
csAuto = carrosDf.rdd.map(centerAndScale)
csAuto.collect()

In [None]:
# Criando um Spark Dataframe com as features (atributos)
autoRows = csAuto.map(lambda f: Row(features = f))
autoDf = spSession.createDataFrame(autoRows)
autoDf.select("features").show(10)

In [None]:
# Importando o algoritmo K-Means para clusterização
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k = 3, seed = 1)
modelo = kmeans.fit(autoDf)
previsoes = modelo.transform(autoDf)
previsoes.show()

In [None]:
# Plot dos resultados
import pandas as pd
import matplotlib.pylab as plt
%matplotlib inline

In [None]:
# Função para leitura dos dados e plotagem
def unstripData(instr) :
    return ( instr["prediction"], instr["features"][0], instr["features"][1],instr["features"][2],instr["features"][3])


In [None]:
# Organizando os dados para o Plot
unstripped = previsoes.rdd.map(unstripData)
predList = unstripped.collect()
predPd = pd.DataFrame(predList)

In [None]:
plt.cla()
plt.scatter(predPd[3], predPd[4], c = predPd[0])

## Spark SQL e Arquivos JSON

Neste site você pode validar a estrutura de um arquivo JSON: http://jsonlint.com/

In [None]:
# Importando o arquivo JSON
funcDF = spSession.read.json("data/funcionarios.json")

In [None]:
funcDF.show()

In [None]:
funcDF.printSchema()

In [None]:
type(funcDF)

In [None]:
# Operações com Dataframe Spark SQL - select()
funcDF.select("nome").show()

In [None]:
# Operações com Dataframe Spark SQL - filter()
funcDF.filter(funcDF["idade"] == 50).show()

In [None]:
# Operações com Dataframe Spark SQL - groupBy()
funcDF.groupBy("sexo").count().show()

In [None]:
# Operações com Dataframe Spark SQL - groupBy()
funcDF.groupBy("deptid").agg({"salario": "avg", "idade": "max"}).show()

In [None]:
# Registrando o dataframe como uma Temp Table
funcDF.registerTempTable("funcTB")

In [None]:
# Executando queries SQL ANSI
spSession.sql("select deptid, max(idade), avg(salario) from funcTB group by deptid").show()

## Temp Tables

In [None]:
# Registrando o dataframe como temp Table
funcDF.createOrReplaceTempView("funcTB")

In [None]:
spSession.sql("select * from funcTB where salario = 9700").show()

In [None]:
# Criando Temp Table
sqlContext.registerDataFrameAsTable(funcDF, "funcTB2")

In [None]:
type(funcTB2)

In [None]:
# Persistindo a Temp Table 
funcTB3 = spSession.table("funcTB2")

In [None]:
type(funcTB3)

In [None]:
# Comparando o Dataframe com a tabela temporária criada
sorted(funcDF.collect()) == sorted(funcTB3.collect())

In [None]:
# Aplicando o filtro
sqlContext.registerDataFrameAsTable(funcDF, "funcTB2")
funcTB3 = spSession.table("funcTB2")
funcTB3.filter("idade = '42'").first()

In [None]:
# Drop Temp Table
sqlContext.dropTempTable("funcTB2")

## Banco de Dados Relacional

Extraindo Dados do MySQL. Primeiro precisamos baixar o driver JDBC. Haverá um driver JDBC para cada banco de dados que você conectar (Oracle, SQL Server, etc...)

1- Download do Driver JDBC para o MySQL: http://dev.mysql.com/downloads/connector/j/

2- Baixar o arquivo .zip

3- Descompactar o arquivo e copiar o arquivo mysql-connector-java-8.0.16.jar para a pasta /opt/Spark/jars ou para SO Windows em C:\Spark\jars

In [51]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spSession = SparkSession.builder.master("local").appName("DSA-SparkSQL").getOrCreate()
sqlContext = SQLContext(sc)

In [53]:
mysql_df = spSession.read.format("jdbc").options(
    url = "jdbc:mysql://localhost/carros",
    serverTimezone = "UTC",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "carrosTB",
    user = "root",
    password = "dsa1234").load()

Py4JJavaError: An error occurred while calling o535.load.
: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:99)
	at scala.Option.foreach(Option.scala:274)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:35)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [None]:
mysql_df.show()

In [None]:
mysql_df.registerTempTable("carrostb")

In [None]:
spSession.sql("select * from carrostb where hp = '68'").show()

## Banco de Dados Não-Relacional

Spark Connector: https://docs.mongodb.com/spark-connector/current/

Mongo Spark: https://spark-packages.org/package/mongodb/mongo-spark

$SPARK_HOME/bin/pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0

In [None]:
# Imports
from pyspark.sql import SparkSession

### Leitura

In [None]:
# Cria a sessão
my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://localhost/test_db.test_collection") \
    .config("spark.mongodb.output.uri", "mongodb://localhost/test_db.test_collection") \
    .getOrCreate()

In [None]:
# Carrega os dados do MongoDB no Spark
dados = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [None]:
dados.printSchema()

In [None]:
dados.count()

In [None]:
dados.head()

In [None]:
dados.show()

### Gravação

In [None]:
registro = spark.createDataFrame([("Camisa T-Shirt",  50)], ["item", "qty"])

In [None]:
registro.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

In [None]:
dados.show()

<img width='621' src='joins.png'>