## Seção 4: Spark SQL

In [2]:
import os
import pyspark
import findspark
from pyspark.sql import SparkSession

os.environ["SPARK_HOME"] = "C:\\ApacheSpark\\spark-3.5.3-bin-hadoop3"

findspark.init()
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [64]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# DF despachantes
schema_of_csv_desp = StructType([
    StructField('id', IntegerType(), True),
    StructField('nome', StringType(), True),
    StructField('status', StringType(), True),
    StructField('cidade', StringType(), True),
    StructField('vendas', StringType(), True),
    StructField('data', DateType(), True)
])
despachantes = spark.read.csv(
    r'C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\Material disponibilizado\download\despachantes.csv', 
    header=False,
    schema=schema_of_csv_desp
)
despachantes.createOrReplaceTempView('despachantes')

# DF reclamacoes
schema_of_csv_rec = StructType([
    StructField('idrec', IntegerType(), True),
    StructField('datarec', StringType(), True ),
    StructField('iddesp', IntegerType(), True)
])
reclamacoes = spark.read.csv(
    r'C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\Material disponibilizado\download\reclamacoes.csv',
    header=False,
    schema=schema_of_csv_rec
)
despachantes.createOrReplaceTempView('reclamacoes')


### 33. Views

View temporaria

In [10]:
despachantes.createOrReplaceTempView('despachantes_v1')

In [11]:
spark.sql('select * from despachantes_v1').show()

+---+-------------------+-----+-------------+---+----------+
|_c0|                _c1|  _c2|          _c3|_c4|       _c5|
+---+-------------------+-----+-------------+---+----------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05|
+---+-------------------+-----+-------------+---+----------+



View Global

In [14]:
despachantes.createOrReplaceGlobalTempView('despachantes_v2')

In [16]:
# em toda consulta de view global temporario será necessario indicar que se trata de um view global,
# dessa forma: global_temp.<view_name>
spark.sql('select * from global_temp.despachantes_v2').show()

+---+-------------------+-----+-------------+---+----------+
|_c0|                _c1|  _c2|          _c3|_c4|       _c5|
+---+-------------------+-----+-------------+---+----------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05|
+---+-------------------+-----+-------------+---+----------+



Também é possivel criar as view com uma clausula SQL

In [21]:
spark.sql("CREATE OR REPLACE TEMP VIEW desp_view AS select * from despachantes")

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near '['.(line 1, pos 64)

== SQL ==
CREATE OR REPLACE TEMP VIEW desp_view AS select * from DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string]
----------------------------------------------------------------^^^


Também é possivel criar as view global com uma clausula SQL

In [None]:
spark.sql("CREATE OR REPLACE GLOBAL TEMP VIEW desp_view_v2 AS select * from despachantes")

In [None]:
spark.sql("select * from global_temp.desp_view_v2").show()



### 34. Comparando DataFrames com Tabelas SQL

In [52]:
from pyspark.sql import functions as F
from pyspark.sql.functions import *

VIEW

In [44]:
# despachantes.createOrReplaceTempView('despachantes')

# spark.sql("select * from despachantes").show()
# spark.sql("select nome, vendas from despachantes").show()
# spark.sql('select nome, vendas from despachantes where vendas > 20').show()
spark.sql('select cidade, sum(vendas) from despachantes group by cidade order by 2 desc').show()


+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
| Porto Alegre|      223.0|
|  Santa Maria|       45.0|
|Novo Hamburgo|       34.0|
+-------------+-----------+



API DATAFRAME

In [55]:
# despachantes.show()
# despachantes.select('nome', 'vendas').show()
# despachantes.select('nome', 'vendas').where(F.col("vendas")>20).show() 
despachantes.groupBy('cidade').agg(sum('vendas')).orderBy(F.col("sum(vendas)").desc()).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
| Porto Alegre|      223.0|
|  Santa Maria|       45.0|
|Novo Hamburgo|       34.0|
+-------------+-----------+



### 35. Joins

In [None]:
# Aula explicativa

### 36. Joins com Dataframes e SQL

JOINS SPARK SQL

In [74]:
# INNER JOIN
spark.sql('''
        SELECT 
            b.*
            , a.nome 
        FROM despachantes  A
        INNER JOIN reclamacoes b
            ON A.id = B.iddesp
''').show()

# RIGHT JOIN
spark.sql('''
        SELECT 
            b.*
            , a.nome 
        FROM despachantes  A
        RIGHT JOIN reclamacoes b
            ON A.id = B.iddesp
''').show()

# LEFT JOIN
spark.sql('''
        SELECT 
            b.*
            , a.nome 
        FROM despachantes  A
        LEFT JOIN reclamacoes b
            ON A.id = B.iddesp
''').show()

+-----+----------+------+-------------------+
|idrec|   datarec|iddesp|               nome|
+-----+----------+------+-------------------+
| NULL|      NULL|  NULL|   Carminda Pestana|
|    2|2020-09-11|     2|    Deolinda Vilela|
|    1|2020-09-12|     2|    Deolinda Vilela|
| NULL|      NULL|  NULL|   Emídio Dornelles|
|    3|2020-10-05|     4|Felisbela Dornelles|
|    6|2020-01-09|     5|     Graça Ornellas|
|    5|2020-12-06|     5|     Graça Ornellas|
|    4|2020-10-02|     5|     Graça Ornellas|
| NULL|      NULL|  NULL|   Matilde Rebouças|
| NULL|      NULL|  NULL|    Noêmia   Orriça|
| NULL|      NULL|  NULL|      Roque Vásquez|
|    7|2020-01-05|     9|      Uriel Queiroz|
| NULL|      NULL|  NULL|   Viviana Sequeira|
+-----+----------+------+-------------------+



API DATAFRAME

In [79]:
# INNER JOIN
despachantes.join(
    reclamacoes, despachantes.id == reclamacoes.iddesp, 'inner'
        ).select('idrec','datarec','iddesp','nome'
            ).show()

# RIGHT JOIN
despachantes.join(
    reclamacoes, despachantes.id == reclamacoes.iddesp, 'right'
        ).select('idrec','datarec','iddesp','nome'
            ).show()

# LEFT JOIN
despachantes.join(
    reclamacoes, despachantes.id == reclamacoes.iddesp, 'left'
        ).select('idrec','datarec','iddesp','nome'
            ).show()

+-----+----------+------+-------------------+
|idrec|   datarec|iddesp|               nome|
+-----+----------+------+-------------------+
|    2|2020-09-11|     2|    Deolinda Vilela|
|    1|2020-09-12|     2|    Deolinda Vilela|
|    3|2020-10-05|     4|Felisbela Dornelles|
|    6|2020-01-09|     5|     Graça Ornellas|
|    5|2020-12-06|     5|     Graça Ornellas|
|    4|2020-10-02|     5|     Graça Ornellas|
|    7|2020-01-05|     9|      Uriel Queiroz|
+-----+----------+------+-------------------+

+-----+----------+------+-------------------+
|idrec|   datarec|iddesp|               nome|
+-----+----------+------+-------------------+
|    1|2020-09-12|     2|    Deolinda Vilela|
|    2|2020-09-11|     2|    Deolinda Vilela|
|    3|2020-10-05|     4|Felisbela Dornelles|
|    4|2020-10-02|     5|     Graça Ornellas|
|    5|2020-12-06|     5|     Graça Ornellas|
|    6|2020-01-09|     5|     Graça Ornellas|
|    7|2020-01-05|     9|      Uriel Queiroz|
+-----+----------+------+--------

### 37. Utilizando Spark-sql

In [4]:
# INICIAR SHELL: spark-sql

# Ao utilizar o shell do spark-sql, você irá utilizar o SQL puro, ou seja, todos os comando vão rodar 
# 100% em SQL, isso permite rodar qualquer comando valido da lingaugem. Exemplo:

show databases;
use <database_name>;
show tables;

select * from <table>;

select * from <table> where <filtro>;


### 38. Atividades: Faça você mesmo

1. Crie um banco de dados no DW do Spark chamado VendasVarejo, e persista todas as tabelas neste banco de dados

In [12]:
clientes = spark.read.parquet(r'C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\Material disponibilizado\download\Atividades\Clientes.parquet')
ItensVendas = spark.read.parquet(r'C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\Material disponibilizado\download\Atividades\ItensVendas.parquet')
Produtos = spark.read.parquet(r'C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\Material disponibilizado\download\Atividades\Produtos.parquet')
Vendas = spark.read.parquet(r'C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\Material disponibilizado\download\Atividades\Vendas.parquet')
Vendedores = spark.read.parquet(r'C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\Material disponibilizado\download\Atividades\Vendedores.parquet')

# spark.sql('CREATE DATABASE IF NOT EXISTS VendasVarejo')
# spark.sql('USE VendasVarejo')
# clientes.write.saveAsTable('clientes')
# clientes.write.saveAsTable('ItensVendas')
# clientes.write.saveAsTable('Produtos')
# clientes.write.saveAsTable('Vendas')
# clientes.write.saveAsTable('Vendedores')

clientes.createOrReplaceTempView('clientes')
ItensVendas.createOrReplaceTempView('itensVendas')
Produtos.createOrReplaceTempView('produtos')
Vendas.createOrReplaceTempView('vendas')
Vendedores.createOrReplaceTempView('vendedores')

2. Crie uma consulta que mostre cada item vendido: Nome do Cliente, Data da Venda, Produto, Vendedor
e Valor total do item.

In [31]:
spark.sql('''
    SELECT
        clientes.Cliente
        ,vendas.Data
        ,produtos.Produto
        ,vendedores.Vendedor
        ,itensVendas.ValorTotal
    FROM vendas 
        INNER JOIN vendedores ON vendas.VendedorId = vendedores.VendedorId
            INNER JOIN clientes ON vendas.ClienteID = clientes.ClienteID
                INNER JOIN itensVendas ON vendas.VendasID = itensVendas.VendasID
                    INNER JOIN produtos ON itensVendas.ProdutoID = produtos.ProdutoID
''').show()

+-----------------+--------+--------------------+----------------+----------+
|          Cliente|    Data|             Produto|        Vendedor|ValorTotal|
+-----------------+--------+--------------------+----------------+----------+
|   Cosme Zambujal|1/1/2019|Bicicleta Altools...|    Armando Lago|   7820.85|
|   Cosme Zambujal|1/1/2019|Bermuda Predactor...|    Armando Lago|     97.75|
|   Cosme Zambujal|1/1/2019|Camiseta Predacto...|    Armando Lago|     135.0|
|Gertrudes Hidalgo|1/1/2020|Luva De Ciclismo ...|   Iberê Lacerda|     150.4|
| Antão Corte-Real|2/1/2020|Capacete Gometws ...|Jéssica Castelão|     155.0|
| Antão Corte-Real|2/1/2020|Bicicleta Gometws...|Jéssica Castelão|    5932.0|
| Antão Corte-Real|2/1/2019|Bicicleta Altools...|  Hélio Liberato|   7820.85|
| Antão Corte-Real|2/1/2019|Bermuda Predactor...|  Hélio Liberato|     97.75|
| Antão Corte-Real|2/1/2019|Bicicleta Gometws...|  Hélio Liberato|    5910.0|
| Antão Corte-Real|3/1/2018|Bicicleta Gometws...|  Hélio Liberat

### 39. Resolução atividade 38.1

### 40. Resolução atividade 38.2

### 41. Conectando a Outras Fontes de Dados

Até agora vimos como exportar dados de diversos tipos de arquivos, que são bastante comum no DataLake.
Porém, as vezes é necessário extrair de outras fontes, fontes no qual os dados não estão em formato de
arquivos. Nesses casos é necessário se conectar ao "Gerenciador de dados" e realizar extração.

Com spark você pode se conectar com outras fontes de dados. Você irá receber esses dados no formato de
um dataframe, e nesse momento, você pode exportar esses dados para arquivos como: **parquet, csv, json, etc**. Poderá persistir eles como uma tabela, mas também é possivel gravar de volta os dados na mesma fonte ou até em outra fonte.

**É importante ressaltar que os principios para se conectar e extrair dados são os mesmos, o único ponto importante é que você pode fazer isso atraves de um Drive JBDC ou com o pacote nativo do python pro spark.**

### 42. PostgreSQL

Etapas:

- Instalar
- Criar banco de dados Vendas
- Configurar acesso
- Baixar Driver JDBC
- Usar o Pyspark para ler, transformar e gravar os dados no Postgre

### 43. Instalando PostGreSQL

**Dentro do SHELL DA MÁQUINA VIRTUAL UBUNTU LTS:**

1 - atualizar o local
    sudo apt-get update

2 -instalar postgreSQL
    sudo apt-get update && sudo apt-get install postgre-12

3 - realizando login na ferramenta do postgre
    sudo -u postgres psql

4 - criando banco de dados vendas
    create database vendas;

5 - mudar para o diretorio do banco
    c\ vendas;

6 - criar as tabelas com os scripts disponibilizados na aula
    i\ <caminho_do_arquivo>

7 - verificar se as tabelas foram criadas
    \dt

8 - definir senha para o usuario utilizado
    \password

A instalação realizada já vem com o modo de autenticação que permite fazer uma autenticação com usuario e senha. Porém, existem versões do postgre que não permitem isso por padrão, então vamos ver como deveriamos fazer nesse caso:

**Editar arquivo de configuração diretório inicial do sheel**

1- Abrir o arquivo de configuração
**sudo <editor_de_texto> /etc/postgresql/<versão_do_postgre>/main/pg_hba.conf**
sudo gedit /etc/postgresql/12/main/pg_hba.conf

2 - Procurar linha com autenticação md5
Caso essas linhas estejam comentadas, deve-se descomentar:
    host all all 127.0.0.1/32 md5
    host all all ::1/128 md5

Caso elas estejam da forma abaixo, basta mudar o "identy" para "md5" e reiniciar o postgre.
    host all all 127.0.0.1/32 identy
    host all all 127.0.0.1/32 identy


**Maquina pessoal**

Realizar o download do instalador no navegador web e rodar as queries.




### 44. Drive JDBC para PostGre

Abrir o navegador web e pesquisar pelo drive "Driver JDBC postgre" e realizar o downloadas do arquivo jar.

O próximo passo é inicializar o pypsark apontando para o driver baixado para postgreSQL.

**Dentro da maquina ubuntu:**

pyspark --jars <caminho_do_arquivo>

**No caso do visual code o arquivo devera ser passado na criação da spark session:**

.config("spark.jars", '<caminho_arquivo>')


### 45. Lendo e Gravando Dados no PostgreSQL

Criando dataframe com conexão que irá ler a tabela de vendas no banco de dados vendas

In [7]:
resumo = spark.read.format("jdbc")\
    .option("url","jdbc:postgresql://localhost:5432/vendas")\
    .option("dbtable", "vendas")\
    .option("user", "postgres")\
    .option("password", "123456")\
    .option("driver", "org.postgresql.Driver").load()

resumo.show()

Salvando dados do dataframe no banco de dados postgreSQL

In [None]:
# Conexão permanece a mesma, alterando apenas alguns options: 
# Nome da tabela onde será salvo: .option("dbtable", "<nome_tabela>")\
# Modo do dataframe, no caso save() .option("driver", "org.postgresql.Driver").<modo>()

# Criando dataframe selecionando dados necessários.
venda_data =  resumo.select('data', 'total')

venda_data.write\
    .format("jdbc")\
    .option("url","jdbc:postgresql://localhost:5432/vendas")\
    .option("dbtable", "venda_data")\
    .option("user", "postgres")\
    .option("password", "123456")\
    .option("driver", "org.postgresql.Driver").save()

### 46. MongoBD

Explicação

### 47. Instalando MongoBD

Realizando instação do mongodb.
Utilizei o mongo cloud free.

### 48. Lendo e gravando dados no MongoDB

In [32]:
pip install pymongo

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [31]:
pip install pymongo[srv]

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [24]:
pip install pymongo[srv]


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [46]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

In [47]:
print(pyspark.__version__)

3.5.1


In [None]:
conf = pyspark.SparkConf().set("spark.jars.packages",
                               "org.mongodb.spark:mongo-spark-connector_2.12:10.1.").setMaster("local").setAppName("My App").setAll([("spark.driver.memory", "40g"), ("spark.executor.memory", "50g")])

In [45]:
import os
import findspark
from pyspark.sql import SparkSession

os.environ["SPARK_HOME"] = r"C:\ApacheSpark\spark-3.5.1-bin-hadoop3"
findspark.init()
spark = SparkSession.builder.master('local[*]')\
    .config("spark.mongodb.read.connection.uri", "mongodb+srv://vinisrfp:1XTznESq5xiVYLL2@cluster0.rpumd.mongodb.net/sample_airbnb.listingsAndReviews")\
    .config("spark.mongodb.write.connection.uri", "mongodb+srv://vinisrfp:1XTznESq5xiVYLL2@cluster0.rpumd.mongodb.net/sample_airbnb.listingsAndReviews")\
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
    .getOrCreate()

print(spark.version)

# Instalar driver odbc para conectar ao postgre
# .config("spark.jars", r'C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\postgresql-42.7.3.jar')\



    

3.5.1


In [43]:
# Carregar os dados da collection MongoDB em um DataFrame


### 49. Aplicação 1: Escrevendo no console

A criação do aplicativo será criada em um arquivo .py, ele foi executado no console da máquina virtual, utilizando o comando:

spark-submit <caminho_arquivo>
spark-submit aplicativo.py

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


if __name__ == '__main__':
    spark = SparkSession.builder.appName('exemplo').getOrCreate()

    arqschema = 'id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING'
    despachantes = spark.read.csv(r'C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\Material disponibilizado\download\despachantes.csv', header = False, schema = arqschema)

    calculo = despachantes.select('data').groupBy(year('data')).count()

    calculo.write.format('console').save()

    spark.stop

### 50. Aplicação 2: Escrevendo no console com Parâmetros

A criação do aplicativo será criada em um arquivo .py, ele foi executado no console da máquina virtual, utilizando o comando:

spark-submit <caminho_aplicativo> <caminho_arquivo>
spark-submit aplicativo.py C:\Users\viser\OneDrive\Documentos\Cursos\notebooks\students_pyspark\Material disponibilizado\download\despachantes.csv

In [None]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


if __name__ == '__main__':
    spark = SparkSession.builder.appName('exemplo').getOrCreate()

    arqschema = 'id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING'
    despachantes = spark.read.csv(sys.argv[1], header = False, schema = arqschema)

    calculo = despachantes.select('data').groupBy(year('data')).count()

    calculo.write.format('console').save()

    spark.stop

### 51. Opção e argumentos em Linha de Comando

**Argumentos**

- O primeiro é sempre o nome do aplicativo
- Podemos definir opções e valores
- Podemos ler as opções e respectivos valores, por exemplo:
- **programa -t [formato] -i [csv de entrada] -o [diretório de saída]**

### 52. Aplicação 3: Conversor de Formatos e Arquivos em Spark