# CONFIGURAÇÃO DO AMBIENTE

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=c3cc8e384b2fcef43e5640da2f58200c3e21db8111ee954a31dd3928d4a05f1a
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [None]:
!pip list

Package                            Version
---------------------------------- -------------------
absl-py                            1.4.0
accelerate                         0.34.2
aiohappyeyeballs                   2.4.3
aiohttp                            3.10.9
aiosignal                          1.3.1
alabaster                          0.7.16
albucore                           0.0.16
albumentations                     1.4.15
altair                             4.2.2
annotated-types                    0.7.0
anyio                              3.7.1
argon2-cffi                        23.1.0
argon2-cffi-bindings               21.2.0
array_record                       0.5.1
arviz                              0.19.0
astropy                            6.1.4
astropy-iers-data                  0.2024.10.7.0.32.46
astunparse                         1.6.3
async-timeout                      4.0.3
atpublic                           4.1.0
attrs                              24.2.0
audioread         

In [None]:
import pyspark
print(pyspark.__version__)

3.5.3


# DATAFRAME PART 1

In [None]:
# importando o spark, sempre precisamos do "SparkSession"
from pyspark.sql import SparkSession

In [None]:
# ativando o auto-completar
%config Completer.use_jedi = True

In [None]:
from pyspark.sql import SparkSession

# Criando uma sessão Spark
spark = SparkSession.builder.appName("MeuApp").getOrCreate()

# Criando um DataFrame
df1 = spark.createDataFrame([("Pedro", 10), ("Maria", 20), ("José", 40)])


In [None]:
### INICIO DATAFRAME PART 1 ###
# criando um dataframe
# '[]' pq eh uma lista
df1 = spark.createDataFrame([("Pedro", 10), ("Maria", 20), ("José", 40)])

In [None]:
# exibindo
df1.show()

+-----+---+
|   _1| _2|
+-----+---+
|Pedro| 10|
|Maria| 20|
| José| 40|
+-----+---+



In [None]:
# criando uma var para o schema: "nome tipo"
schema = "Id INT, Nome STRING"
# criando uma var para os dados
dados = [[1, "Pedro"], [2, "Maria"]]
# usando os dados acima para criar um novo dataFrame
df2 = spark.createDataFrame(dados, schema)

In [None]:
df2.show()

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
+---+-----+



In [None]:
# especificar as linhas a serem exibidas
df2.show(1)

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
+---+-----+
only showing top 1 row



In [None]:
# importando a função 'sum'
from pyspark.sql.functions import sum

In [None]:
# segundo dataFrame
schema2 = "Produtos STRING, Vendas INT"
dados2 = [["Caneta", 10], ["Lápis", 20], ["Caneta", 40]]

df3 = spark.createDataFrame(dados2, schema2)

In [None]:
# exibindo so a primeira linha
df3.show(1)

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
+--------+------+
only showing top 1 row



In [None]:
# agrupando dados semelhantes, nesse caso 'caneta'
# soma as 'vendas' e agrupando por 'produtos'
agrupado = df3.groupBy("Produtos").agg(sum("Vendas"))

#'agrupado' agora é um novo dataFrame

In [None]:
agrupado.show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



In [None]:
# poderia concatenar
df3.groupBy("Produtos").agg(sum("Vendas")).show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



In [None]:
# supondo um banco grande(date warehouse)
# podemos ver poucas colunas ou uma unica
# dataFrame.select(Nome da coluna)
df3.select("Produtos").show()

+--------+
|Produtos|
+--------+
|  Caneta|
|   Lápis|
|  Caneta|
+--------+



In [None]:
# na ordem q quiser
df3.select("Vendas", "Produtos").show()

+------+--------+
|Vendas|Produtos|
+------+--------+
|    10|  Caneta|
|    20|   Lápis|
|    40|  Caneta|
+------+--------+



In [None]:
# criando expressões
# 'expr' de expressão
from pyspark.sql.functions import expr

In [None]:
# calcular o percentual nesse caso
# crio um campo novo com pre fixo de 'expr' + a expressão em si entre aspas("")
df3.select("Produtos", "Vendas", expr("Vendas * 0.2")).show()

+--------+------+--------------+
|Produtos|Vendas|(Vendas * 0.2)|
+--------+------+--------------+
|  Caneta|    10|           2.0|
|   Lápis|    20|           4.0|
|  Caneta|    40|           8.0|
+--------+------+--------------+



# DATAFRAME PART 2

In [None]:
### DATAFRAME PART 2 ###

# ver estrutura, lendo uma propriedade
df3.schema

StructType([StructField('Produtos', StringType(), True), StructField('Vendas', IntegerType(), True)])

In [None]:
# vendo colunas do dataframe
df3.columns

['Produtos', 'Vendas']

In [None]:
# iniciou da importação
from pyspark.sql.types import *

# definido o schema do o arquivo q vai ser importado
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"

In [None]:
# Ler o arquivo CSV com o delimitador correto
despachantes = spark.read.csv("/content/despachantes.csv", header=False, schema= arqschema)

# Converter a coluna 'data' que está em formato serial para uma data legível (considerando que a data é baseada no formato Excel)
despachantes = despachantes.withColumn("data", expr("date_add('1899-12-30', data)"))

# Exibir o resultado
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# caminho, cabeçario, formato, separador, inferir o schema (identificar automatico), padrao do endcoding (escrita do csv)
desp_autoschema = spark.read.load("/content/despachantes.csv", header=True, format="csv", sep=";", inferSchema=True, encoding='ISO-8859-1')

In [None]:
desp_autoschema.show()

+---+-------------------+------+-------------+------+-----+
| id|               nome|status|       cidade|vendas| data|
+---+-------------------+------+-------------+------+-----+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|44054|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|43895|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|43866|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|43866|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|43866|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|43470|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|43743|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|43895|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|43225|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|44079|
+---+-------------------+------+-------------+------+-----+



In [None]:
despachantes.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', DateType(), True)])

In [None]:
desp_autoschema.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', IntegerType(), True)])

# DATAFRAME PART 3 (IMPORTANDO DADOS)

In [None]:
from pyspark.sql import functions as Func

# 'select' para colunas q vamos ver, 'where' condição 'Func'.col (colunas) > 20
despachantes.select("id", "nome", "vendas").where(Func.col("vendas") > 20).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|      Noêmia Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



In [None]:
# operadores lógicos do spark:
# & - and | e
# | - our | ou
# ~ - not | não

# 'select' para colunas q vamos ver, 'where' condição 'Func'.col (colunas) > 20 '&'(e) outra condição (atentar aos parenteses)
despachantes.select("id", "nome", "vendas").where( (Func.col("vendas") > 20) & (Func.col("vendas") < 40) ).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



In [None]:
# renomeando a coluna "nome"
# ATENÇÃO: atribuindo ao um novo dataframe, pois os dataframe no spark são IMUTÁVEIS!
# banco de dados conseguiria, no spark não
novodf = despachantes.withColumnRenamed("nome", "nomes")
novodf.columns

['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

In [None]:
from pyspark.sql.functions import *

# criando um 'data2' paara o novo dataframe e transformando seu tipo do 'data' para timestamp + formato
despachantes2 = despachantes.withColumn("data2", to_timestamp(Func.col("data"), "yyyy-MM-dd"))
despachantes2.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', DateType(), True), StructField('data2', TimestampType(), True)])

In [None]:
# mesmo q seja uma coluna de data e não esteja no formato 'date', podemos utilizar funções para data
# mostra o ano
despachantes2.select(year("data")).show()

+----------+
|year(data)|
+----------+
|      2020|
|      2020|
|      2020|
|      2020|
|      2020|
|      2019|
|      2019|
|      2020|
|      2018|
|      2020|
+----------+



In [None]:
# mostra os anos unicos
despachantes2.select(year("data")).distinct().show()

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+



In [None]:
# mostrando tb o nome e ordernando pelo nome
despachantes2.select("nome", year("data")).orderBy("nome").distinct().show()

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|    Deolinda Vilela|      2020|
|      Uriel Queiroz|      2018|
|      Roque Vásquez|      2020|
|   Viviana Sequeira|      2020|
|     Graça Ornellas|      2020|
|      Noêmia Orriça|      2019|
|   Matilde Rebouças|      2019|
|   Carminda Pestana|      2020|
+-------------------+----------+



In [None]:
# agrupando pelo o ano da data e contando
despachantes2.select("data").groupBy(year("data")).count().show()

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+



In [None]:
# soma de tds as vendas
despachantes2.select(Func.sum("vendas")).show()

+-----------+
|sum(vendas)|
+-----------+
|        325|
+-----------+



# PRINCIPAIS AÇÕES E TRANSFORMAÇÕES

In [None]:
# dados de forma tabular
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# dados na forma de lista
despachantes.take(1)

[Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data=datetime.date(2020, 8, 11))]

In [None]:
# retorna tds os dados na forma de uma lista
despachantes.collect()

[Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data=datetime.date(2020, 8, 11)),
 Row(id=2, nome='Deolinda Vilela', status='Ativo', cidade='Novo Hamburgo', vendas=34, data=datetime.date(2020, 3, 5)),
 Row(id=3, nome='Emídio Dornelles', status='Ativo', cidade='Porto Alegre', vendas=34, data=datetime.date(2020, 2, 5)),
 Row(id=4, nome='Felisbela Dornelles', status='Ativo', cidade='Porto Alegre', vendas=36, data=datetime.date(2020, 2, 5)),
 Row(id=5, nome='Graça Ornellas', status='Ativo', cidade='Porto Alegre', vendas=12, data=datetime.date(2020, 2, 5)),
 Row(id=6, nome='Matilde Rebouças', status='Ativo', cidade='Porto Alegre', vendas=22, data=datetime.date(2019, 1, 5)),
 Row(id=7, nome='Noêmia Orriça', status='Ativo', cidade='Santa Maria', vendas=45, data=datetime.date(2019, 10, 5)),
 Row(id=8, nome='Roque Vásquez', status='Ativo', cidade='Porto Alegre', vendas=65, data=datetime.date(2020, 3, 5)),
 Row(id=9, nome='Uriel Queiroz', status='Ativo', cida

In [None]:
# retorna o numero de registro (numero  de linhas do dataframe)
despachantes.count()

10

In [None]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# ordenando

# ordem crescente é o padrão
despachantes.orderBy("vendas").show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# ordem decrescente
despachantes.orderBy(Func.col("vendas").desc()).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# mais de uma coluna decrescente
despachantes.orderBy(Func.col("cidade").desc(), Func.col("vendas").desc()).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# agrupando

# quanto vendi por cidade?
# 'agg' agregar, 'sum' soma
despachantes.groupBy("cidade").agg(Func.sum("vendas")).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
|  Santa Maria|         68|
|Novo Hamburgo|         34|
| Porto Alegre|        223|
+-------------+-----------+



In [None]:
# agregando as vendas por cidade e ordenando em ordem decrescente
despachantes.groupBy("cidade").agg(Func.sum("vendas")).orderBy(Func.col("sum(vendas)").desc()  ).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
| Porto Alegre|        223|
|  Santa Maria|         68|
|Novo Hamburgo|         34|
+-------------+-----------+



In [None]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# filtro
despachantes.filter(Func.col("nome") == "Deolinda Vilela").show()

+---+---------------+------+-------------+------+----------+
| id|           nome|status|       cidade|vendas|      data|
+---+---------------+------+-------------+------+----------+
|  2|Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
+---+---------------+------+-------------+------+----------+



# EXPORTANDO DADOS

In [None]:
# tradicionalenmte usado em dados
despachantes.write.format("parquet").save("dfimportparquet")

In [None]:
despachantes.write.format("csv").save("dfimportcsv")

In [None]:
despachantes.write.format("json").save("dfimportjson")

In [None]:
# tradicionalenmte usado em dados
despachantes.write.format("orc").save("dfimportorc")

# IMPORTANDO DADOS

In [None]:
# não executar pois o colab buga
# !mv <nome do arquivo> <nome q vai recebr>

In [None]:
# importando o 'parquet'
# o 'erro' é pq o colab não consegue renomear!
par = spark.read.format("parquet").load("/content/dfimportparquet/despachantes.parquet")

In [None]:
par.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
par.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', DateType(), True)])

In [None]:
# importando o 'json'
js = spark.read.format("json").load("/content/dfimportjson/despachantes.json")

In [None]:
js.show()

+-------------+----------+---+-------------------+------+------+
|       cidade|      data| id|               nome|status|vendas|
+-------------+----------+---+-------------------+------+------+
|  Santa Maria|2020-08-11|  1|   Carminda Pestana| Ativo|    23|
|Novo Hamburgo|2020-03-05|  2|    Deolinda Vilela| Ativo|    34|
| Porto Alegre|2020-02-05|  3|   Emídio Dornelles| Ativo|    34|
| Porto Alegre|2020-02-05|  4|Felisbela Dornelles| Ativo|    36|
| Porto Alegre|2020-02-05|  5|     Graça Ornellas| Ativo|    12|
| Porto Alegre|2019-01-05|  6|   Matilde Rebouças| Ativo|    22|
|  Santa Maria|2019-10-05|  7|      Noêmia Orriça| Ativo|    45|
| Porto Alegre|2020-03-05|  8|      Roque Vásquez| Ativo|    65|
| Porto Alegre|2018-05-05|  9|      Uriel Queiroz| Ativo|    54|
| Porto Alegre|2020-09-05| 10|   Viviana Sequeira| Ativo|     0|
+-------------+----------+---+-------------------+------+------+



In [None]:
js.schema

StructType([StructField('cidade', StringType(), True), StructField('data', StringType(), True), StructField('id', LongType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('vendas', LongType(), True)])

In [None]:
# importando o 'orc'
orc = spark.read.format("orc").load("/content/dfimportorc/despachantes.orc")

In [None]:
orc.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
orc.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', DateType(), True)])

In [None]:
# importando o 'csv'
cs = spark.read.format("csv").load("/content/dfimportcsv/despachantes.csv")

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/dfimportcsv/despachantes.csv.

In [None]:
cs.show()

In [None]:
cs.schema

In [None]:
arqschema

In [None]:
# passando o schema persnalizado
cs2 = spark.read.format("csv").load("/content/dfimportcsv/despachantes.csv", schema=arqschema)

In [None]:
cs2.show()

In [None]:
cs2.schema

# FAÇA VOCÊ MESMO #1

In [None]:
# importando dados
# precisa importar os arquivos da atividade!
clientes = spark.read.load("/content/Clientes.parquet")

In [None]:
clientes.show(2)

+---------+--------------------+------+------+------+
|ClienteID|             Cliente|Estado|Genero|Status|
+---------+--------------------+------+------+------+
|        1|Adelina Buenaventura|    RJ|     M|Silver|
|        2|        Adelino Gago|    RJ|     M|Silver|
+---------+--------------------+------+------+------+
only showing top 2 rows



In [None]:
# importando
vendas = spark.read.load("/content/Vendas.parquet")

In [None]:
vendas.show(2)

+--------+----------+---------+--------+------+
|VendasID|VendedorID|ClienteID|    Data| Total|
+--------+----------+---------+--------+------+
|       1|         1|       91|1/1/2019|8053.6|
|       2|         6|      185|1/1/2020| 150.4|
+--------+----------+---------+--------+------+
only showing top 2 rows



In [None]:
# 1 - mostrar clientes, estado e status
clientes.select("Cliente", "Estado", "Status").show()

+--------------------+------+--------+
|             Cliente|Estado|  Status|
+--------------------+------+--------+
|Adelina Buenaventura|    RJ|  Silver|
|        Adelino Gago|    RJ|  Silver|
|     Adolfo Patrício|    PE|  Silver|
|    Adriana Guedelha|    RO|Platinum|
|       Adélio Lisboa|    SE|  Silver|
|       Adérito Bahía|    MA|  Silver|
|       Aida Dorneles|    RN|  Silver|
|   Alarico Quinterno|    AC|  Silver|
|    Alberto Cezimbra|    AM|  Silver|
|    Alberto Monsanto|    RN|    Gold|
|       Albino Canela|    AC|  Silver|
|     Alceste Varanda|    RR|  Silver|
|  Alcides Carvalhais|    RO|  Silver|
|        Aldo Martins|    GO|  Silver|
|   Alexandra Tabares|    MG|  Silver|
|      Alfredo Cotrim|    SC|  Silver|
|     Almeno Figueira|    SC|  Silver|
|      Alvito Peralta|    AM|  Silver|
|     Amadeu Martinho|    RN|  Silver|
|      Amélia Estévez|    PE|  Silver|
+--------------------+------+--------+
only showing top 20 rows



In [None]:
# criando dataframe para armazenar
dfAtv1 = clientes.select("Cliente", "Estado", "Status")

In [None]:
dfAtv1.show()

+--------------------+------+--------+
|             Cliente|Estado|  Status|
+--------------------+------+--------+
|Adelina Buenaventura|    RJ|  Silver|
|        Adelino Gago|    RJ|  Silver|
|     Adolfo Patrício|    PE|  Silver|
|    Adriana Guedelha|    RO|Platinum|
|       Adélio Lisboa|    SE|  Silver|
|       Adérito Bahía|    MA|  Silver|
|       Aida Dorneles|    RN|  Silver|
|   Alarico Quinterno|    AC|  Silver|
|    Alberto Cezimbra|    AM|  Silver|
|    Alberto Monsanto|    RN|    Gold|
|       Albino Canela|    AC|  Silver|
|     Alceste Varanda|    RR|  Silver|
|  Alcides Carvalhais|    RO|  Silver|
|        Aldo Martins|    GO|  Silver|
|   Alexandra Tabares|    MG|  Silver|
|      Alfredo Cotrim|    SC|  Silver|
|     Almeno Figueira|    SC|  Silver|
|      Alvito Peralta|    AM|  Silver|
|     Amadeu Martinho|    RN|  Silver|
|      Amélia Estévez|    PE|  Silver|
+--------------------+------+--------+
only showing top 20 rows



In [None]:
# 2 - consulta q mostre os clientes platinum e gold
from pyspark.sql import functions as Func

# mostra tds as colunas
clientes.select("*").where( (Func.col("Status") == "Platinum") | (Func.col("Status") == "Gold") ).show()

+---------+-------------------+------+------+--------+
|ClienteID|            Cliente|Estado|Genero|  Status|
+---------+-------------------+------+------+--------+
|        4|   Adriana Guedelha|    RO|     F|Platinum|
|       10|   Alberto Monsanto|    RN|     M|    Gold|
|       28|      Anna Carvajal|    RS|     F|    Gold|
|       49|      Bento Quintão|    SP|     M|    Gold|
|       68|      Carminda Dias|    AM|     F|    Gold|
|       83|      Cláudio Jorge|    TO|     M|    Gold|
|      121|    Dionísio Saltão|    PR|     M|    Gold|
|      166|   Firmino Meireles|    AM|     M|    Gold|
|      170|      Flor Vilanova|    CE|     M|Platinum|
|      220|Honorina Villaverde|    PE|     F|    Gold|
|      230|    Ibijara Botelho|    RR|     F|Platinum|
|      237|  Iracema Rodríguez|    BA|     F|    Gold|
|      247|         Joana Ataí|    GO|     F|Platinum|
+---------+-------------------+------+------+--------+



In [None]:
# 2 - consulta q mostre os clientes platinum e gold
from pyspark.sql import functions as Func

# mostra só as colunas passadas
clientes.select("Cliente", "Status").where( (Func.col("Status") == "Platinum") | (Func.col("Status") == "Gold") ).show()

+-------------------+--------+
|            Cliente|  Status|
+-------------------+--------+
|   Adriana Guedelha|Platinum|
|   Alberto Monsanto|    Gold|
|      Anna Carvajal|    Gold|
|      Bento Quintão|    Gold|
|      Carminda Dias|    Gold|
|      Cláudio Jorge|    Gold|
|    Dionísio Saltão|    Gold|
|   Firmino Meireles|    Gold|
|      Flor Vilanova|Platinum|
|Honorina Villaverde|    Gold|
|    Ibijara Botelho|Platinum|
|  Iracema Rodríguez|    Gold|
|         Joana Ataí|Platinum|
+-------------------+--------+



In [None]:
# atribuindo o resultado a um dataframe
dfAtv2 = clientes.select("*").where( (Func.col("Status") == "Platinum") | (Func.col("Status") == "Gold") )
dfAtv2.show()

+---------+-------------------+------+------+--------+
|ClienteID|            Cliente|Estado|Genero|  Status|
+---------+-------------------+------+------+--------+
|        4|   Adriana Guedelha|    RO|     F|Platinum|
|       10|   Alberto Monsanto|    RN|     M|    Gold|
|       28|      Anna Carvajal|    RS|     F|    Gold|
|       49|      Bento Quintão|    SP|     M|    Gold|
|       68|      Carminda Dias|    AM|     F|    Gold|
|       83|      Cláudio Jorge|    TO|     M|    Gold|
|      121|    Dionísio Saltão|    PR|     M|    Gold|
|      166|   Firmino Meireles|    AM|     M|    Gold|
|      170|      Flor Vilanova|    CE|     M|Platinum|
|      220|Honorina Villaverde|    PE|     F|    Gold|
|      230|    Ibijara Botelho|    RR|     F|Platinum|
|      237|  Iracema Rodríguez|    BA|     F|    Gold|
|      247|         Joana Ataí|    GO|     F|Platinum|
+---------+-------------------+------+------+--------+



In [None]:
vendas.show()

+--------+----------+---------+---------+--------+
|VendasID|VendedorID|ClienteID|     Data|   Total|
+--------+----------+---------+---------+--------+
|       1|         1|       91| 1/1/2019|  8053.6|
|       2|         6|      185| 1/1/2020|   150.4|
|       3|         7|       31| 2/1/2020|  6087.0|
|       4|         5|       31| 2/1/2019| 13828.6|
|       5|         5|       31| 3/1/2018|26096.66|
|       6|         5|       31| 4/1/2020| 18402.0|
|       7|         5|       31| 6/1/2019|  7524.2|
|       8|         5|      186| 6/1/2019| 12036.6|
|       9|         7|       91| 6/1/2020| 2804.75|
|      10|         2|      202| 6/1/2020|  8852.0|
|      11|         7|       58| 8/1/2019|16545.25|
|      12|         7|       58| 9/1/2018|11411.88|
|      13|         7|       58|10/1/2019| 15829.7|
|      14|         3|      249|12/1/2020| 6154.36|
|      15|         4|      249|12/1/2018| 3255.08|
|      16|         7|      192|13/1/2020| 2901.25|
|      17|         2|       79|

In [None]:
# 3 - demonstre quanto cada status de cliente representa em vendas
# tabela da qual pegamos (tabela q vai fazer a relação, atributo da tabela == atributo da tabela q vai fazer a relação, "tipo de join")
vendas.join(clientes, vendas.ClienteID == clientes.ClienteID, "inner").groupBy(clientes.Status).agg(sum("Total")).orderBy(Func.col("sum(Total)").desc()).show()

+--------+------------------+
|  Status|        sum(Total)|
+--------+------------------+
|  Silver|        3014291.36|
|    Gold|27286.690000000002|
|Platinum|          12584.68|
+--------+------------------+



In [None]:
# importando a função 'soma' do pyspark para evitar erros
from pyspark.sql.functions import sum as SUM

# criando um novo dataframe
dfAtv3 = vendas.join(clientes, vendas.ClienteID == clientes.ClienteID, "inner").groupBy(clientes.Status).agg(SUM("Total")).orderBy(Func.col("sum(Total)").desc())
dfAtv3.show()

+--------+------------------+
|  Status|        sum(Total)|
+--------+------------------+
|  Silver|        3014291.36|
|    Gold|27286.690000000002|
|Platinum|          12584.68|
+--------+------------------+



# Spark SQL

In [None]:
# importações desta etapa
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [None]:
# comando SQL: spark.sql("comando vem aqui")
# o comando funciona como uma consulta, logo vai retorna um dataframe
# logo temos q usar o 'show' ou atribuir a uma variavel
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [None]:
# criando tabela no SQL
spark.sql("create database desp")

DataFrame[]

In [None]:
# verifiando os bancos existentes
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
|     desp|
+---------+



In [None]:
# conectando ao banco
spark.sql("use desp").show()

++
||
++
++



In [None]:
# verificando dataframe
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# transformando o dataframe para transformar em uma tabela
despachantes.write.saveAsTable("Despachantes")
# criamos uma tabela gerenciada! Equivalente a um banco relacional com dados e metadados

In [None]:
# apagar tabela se necessário
# spark.sql("drop table despachantes")

In [None]:
spark.sql("show tables").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|     desp|despachantes|      false|
+---------+------------+-----------+



In [None]:
# 'mode('overwrite')' para recriar a tabela (sobrescrever)
despachantes.write.mode("overwrite").saveAsTable("Despachantes")

# se houver novos registros usar o:
# 'mode('append')' (pesquisar para ver se essa é a escrtia)

In [None]:
# OBS: ao encerrar a sessão spark os dataframes se perdem, mas os objetos
# como tabelas de banco criadas usando SQL, permanecem!

In [None]:
# caso a gente perca o dataframe por encerra a sessão spark, podemos recuperá-lo
# fazendo uma consulta, ja q o mesmo esta como objeto SQL

# exemplo:
### despachantes = spark.sql("select * from Despachantes").show()
# não executado, pois o ambiente do colab sempre é executado td após reiniciado

In [None]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# ao informar o caminho, o spark ja entende q se trata de uma tabela externa (não gerenciada)
# 'option' ('path' 'caminho do arquivo')
despachantes.write.option("path", "/content/dfimportparquet/despachantes.parquet").saveAsTable("Despachantes_ng")

In [None]:
# conferindo as tabelas
spark.sql("show tables").show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|     desp|   despachantes|      false|
|     desp|despachantes_ng|      false|
+---------+---------------+-----------+



In [None]:
# como saber se uma tabela é gerenciada ou externa

# PRIMEIRA FORMA
# tabela gerenciada
spark.sql("show create table Despachantes").show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                             |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE spark_catalog.desp.Despachantes (\n  id INT,\n  nome STRING,\n  status STRING,\n  cidade STRING,\n  vendas INT,\n  data DATE)\nUSING parquet\n|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+



In [None]:
# tabela externa
spark.sql("show create table Despachantes_ng").show(truncate=False)
# se aparece um 'location' é uma tabela externa

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE spark_catalog.desp.Despachantes_ng (\n  id INT,\n  nome STRING,\n  status STRING,\n  cidade STRING,\n  vendas INT,\n  data DATE)\nUSING parquet\nLOCATION 'file:/content/dfimportparquet/despachantes.parquet'\n|
+-------------------------------------------------------------------------------------------------------

In [None]:
# SEGUNDA FORMA
spark.catalog.listTables()

[Table(name='despachantes', catalog='spark_catalog', namespace=['desp'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='despachantes_ng', catalog='spark_catalog', namespace=['desp'], description=None, tableType='EXTERNAL', isTemporary=False)]

In [None]:
# sabemos como ver onde estão as tabelas externas, pois passamos os caminhos
# na sua criação.

### mas onde estão as gerenciadas?

In [None]:
# VIEWS - APELIDO PARA UMA OU MAIS TABELAS, PARA O USUARIO VAI PARECER SO UMA TABELA

# view temporaria
despachantes.createOrReplaceTempView("Despachantes_view1")

In [None]:
spark.sql("select * from despachantes_view1").show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# View global
despachantes.createOrReplaceGlobalTempView("Despachantes_view2")

In [None]:
# 'view' global precisa de um pré fixo
# 'global_temp.' + nome da view
spark.sql("select * from global_temp.Despachantes_view2").show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# criando views com SQL
spark.sql("create or replace temp view Desp_view as select * from despachantes")

DataFrame[]

In [None]:
spark.sql("select * from desp_view").show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# criando view global com SQL
spark.sql("create or replace global temp view Desp_view2 as select * from despachantes")

DataFrame[]

In [None]:
spark.sql("select * from global_temp.Desp_view2").show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# tudo q se consegue fazer com 'select', se consegue tb usando a API do dataframe
from pyspark.sql import functions as Func
from pyspark.sql.functions import *

# SQL
spark.sql("select * from despachantes").show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# API
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|      Noêmia Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
spark.sql("select nome, vendas from despachantes").show()

+-------------------+------+
|               nome|vendas|
+-------------------+------+
|   Carminda Pestana|    23|
|    Deolinda Vilela|    34|
|   Emídio Dornelles|    34|
|Felisbela Dornelles|    36|
|     Graça Ornellas|    12|
|   Matilde Rebouças|    22|
|      Noêmia Orriça|    45|
|      Roque Vásquez|    65|
|      Uriel Queiroz|    54|
|   Viviana Sequeira|     0|
+-------------------+------+



In [None]:
despachantes.select("nome", "vendas").show()

+-------------------+------+
|               nome|vendas|
+-------------------+------+
|   Carminda Pestana|    23|
|    Deolinda Vilela|    34|
|   Emídio Dornelles|    34|
|Felisbela Dornelles|    36|
|     Graça Ornellas|    12|
|   Matilde Rebouças|    22|
|      Noêmia Orriça|    45|
|      Roque Vásquez|    65|
|      Uriel Queiroz|    54|
|   Viviana Sequeira|     0|
+-------------------+------+



In [None]:
spark.sql("select nome, vendas from despachantes where vendas > 20").show()

+-------------------+------+
|               nome|vendas|
+-------------------+------+
|   Carminda Pestana|    23|
|    Deolinda Vilela|    34|
|   Emídio Dornelles|    34|
|Felisbela Dornelles|    36|
|   Matilde Rebouças|    22|
|      Noêmia Orriça|    45|
|      Roque Vásquez|    65|
|      Uriel Queiroz|    54|
+-------------------+------+



In [None]:
despachantes.select("nome", "vendas").where(Func.col("vendas") > 20).show()

+-------------------+------+
|               nome|vendas|
+-------------------+------+
|   Carminda Pestana|    23|
|    Deolinda Vilela|    34|
|   Emídio Dornelles|    34|
|Felisbela Dornelles|    36|
|   Matilde Rebouças|    22|
|      Noêmia Orriça|    45|
|      Roque Vásquez|    65|
|      Uriel Queiroz|    54|
+-------------------+------+



In [None]:
# 'order by 2' é para ordenar pela coluna de indice '2'
spark.sql("select cidade, sum(vendas) from despachantes group by cidade order by 2 desc").show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
| Porto Alegre|        223|
|  Santa Maria|         68|
|Novo Hamburgo|         34|
+-------------+-----------+



In [None]:
despachantes.groupBy("cidade").agg(sum("vendas")).orderBy(Func.col("sum(vendas)").desc()).show()


+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
| Porto Alegre|        223|
|  Santa Maria|         68|
|Novo Hamburgo|         34|
+-------------+-----------+

