In [0]:
#criar um data frame simples, sem schema
from pyspark.sql import SparkSession

## Dataframe

In [0]:
df1 = spark.createDataFrame([("Pedro",10),("Maria",20),("José",40)])

In [0]:
#show é ação, então tudo o que foi feito anteriormente é executado, lazzy
df1.show()

+-----+---+
|   _1| _2|
+-----+---+
|Pedro| 10|
|Maria| 20|
| José| 40|
+-----+---+



#### Dataframe com Schema

In [0]:
#criar df com schema
schema = "Id INT, Nome STRING"
dados = [[1,"Pedro"],[2,"Maria"]]
df2 = spark.createDataFrame(dados, schema)
df2.show()

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
+---+-----+



In [0]:
df2.display()

Id,Nome
1,Pedro
2,Maria


In [0]:
df2.show(1) 

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
+---+-----+
only showing top 1 row



In [0]:
#com transformação
from pyspark.sql.functions import sum
schema2 = "Produtos STRING, Vendas INT"
vendas = [["Caneta",10],["Lápis",20],["Caneta",40]]
df3 = spark.createDataFrame(vendas , schema2 )

##### Agrupando

In [0]:
agrupado = df3.groupBy("Produtos").agg(sum("Vendas"))
agrupado.show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



In [0]:
#podemos contatenar as operações, neste caso sem persitir
df3.groupBy("Produtos").agg(sum("Vendas")).show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



##### Selecionando colunas específicas

In [0]:
#selecionar colunas específicas
df3.select("Produtos").show()

+--------+
|Produtos|
+--------+
|  Caneta|
|   Lápis|
|  Caneta|
+--------+



In [0]:
df3.select("Produtos","Vendas").show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    20|
|  Caneta|    40|
+--------+------+



##### Coluna Nova com expessão (calculo)

In [0]:
from pyspark.sql.functions import expr


In [0]:
df3.select("Produtos", "Vendas", expr("Vendas * 0.2")).show()

+--------+------+--------------+
|Produtos|Vendas|(Vendas * 0.2)|
+--------+------+--------------+
|  Caneta|    10|           2.0|
|   Lápis|    20|           4.0|
|  Caneta|    40|           8.0|
+--------+------+--------------+



##### Visualização

In [0]:
#para ver o schema
df3.schema

Out[57]: StructType([StructField('Produtos', StringType(), True), StructField('Vendas', IntegerType(), True)])

In [0]:
#ver colunas
df3.columns

Out[58]: ['Produtos', 'Vendas']

In [0]:
#visualização 
df3.show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    20|
|  Caneta|    40|
+--------+------+



In [0]:
#visualização 
df3.display()

Produtos,Vendas
Caneta,10
Lápis,20
Caneta,40


#### Injestão de dados

In [0]:
#importar dados definindo schema
from pyspark.sql.types import *

In [0]:
#vamos deixar a data como string de propósito
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"

In [0]:
#o caminho pode mudar, download é a pasta que você baixou com dados de exemplo
despachantes = spark.read.csv('/FileStore/tables/despachantes-3.csv', header=False, schema=arqschema)
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [0]:
despachantes2 = spark.read.format('csv').load('/FileStore/tables/despachantes-3.csv', header=False, schema=arqschema)
#display(despachantes2)
despachantes2.display()

id,nome,status,cidade,vendas,data
1,Carminda Pestana,Ativo,Santa Maria,23,2020-08-11
2,Deolinda Vilela,Ativo,Novo Hamburgo,34,2020-03-05
3,Emídio Dornelles,Ativo,Porto Alegre,34,2020-02-05
4,Felisbela Dornelles,Ativo,Porto Alegre,36,2020-02-05
5,Graça Ornellas,Ativo,Porto Alegre,12,2020-02-05
6,Matilde Rebouças,Ativo,Porto Alegre,22,2019-01-05
7,Noêmia Orriça,Ativo,Santa Maria,45,2019-10-05
8,Roque Vásquez,Ativo,Porto Alegre,65,2020-03-05
9,Uriel Queiroz,Ativo,Porto Alegre,54,2018-05-05
10,Viviana Sequeira,Ativo,Porto Alegre,0,2020-09-05


In [0]:
#outro exemplo, inferindo schema, usando load e informado tipo - schema automatico no pyspark
desp_autoschema = spark.read.load("/FileStore/tables/despachantes-3.csv",
                     format="csv", sep=",", inferSchema=True, header=False)
desp_autoschema.show()

+---+-------------------+-----+-------------+---+----------+
|_c0|                _c1|  _c2|          _c3|_c4|       _c5|
+---+-------------------+-----+-------------+---+----------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05|
+---+-------------------+-----+-------------+---+----------+



#### Comparando os schemas

In [0]:
# SPARK definiu o schema
desp_autoschema.schema

Out[66]: StructType([StructField('_c0', IntegerType(), True), StructField('_c1', StringType(), True), StructField('_c2', StringType(), True), StructField('_c3', StringType(), True), StructField('_c4', IntegerType(), True), StructField('_c5', DateType(), True)])

In [0]:
# Schema definido manualmente
despachantes.schema

Out[67]: StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True)])

#### Funções

In [0]:
# importando
from pyspark.sql import functions as Func

Condições

In [0]:
#condição lógica com where
despachantes.select("id","nome","vendas").where(Func.col("vendas") > 20).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



In [0]:
#& para and, | para or, e ~ para not
despachantes.select("id","nome","vendas").where((Func.col("vendas") > 20) & (Func.col("vendas") < 40)).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



#### Renomear Colunas

In [0]:
#renomear coluna
novodf = despachantes.withColumnRenamed("nome","nomes")
novodf.columns

Out[71]: ['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

#### Mudando Formato

In [0]:
from pyspark.sql.functions import *

In [0]:
#coluna data está como string, vamos transformar em texto
despachantes2 = despachantes.withColumn("data2", to_timestamp(Func.col("data"),"yyyy-MM-dd"))

In [0]:
despachantes2.schema

Out[74]: StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True), StructField('data2', TimestampType(), True)])

#### Operações com Datas

In [0]:
despachantes2.select(year("data")).show()

+----------+
|year(data)|
+----------+
|      2020|
|      2020|
|      2020|
|      2020|
|      2020|
|      2019|
|      2019|
|      2020|
|      2018|
|      2020|
+----------+



In [0]:
despachantes2.select(year("data")).distinct().show()

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+



In [0]:
despachantes2.select("nome",year("data")).orderBy("nome").show()

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Carminda Pestana|      2020|
|    Deolinda Vilela|      2020|
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|     Graça Ornellas|      2020|
|   Matilde Rebouças|      2019|
|    Noêmia   Orriça|      2019|
|      Roque Vásquez|      2020|
|      Uriel Queiroz|      2018|
|   Viviana Sequeira|      2020|
+-------------------+----------+



In [0]:
despachantes2.select("data").groupBy(year("data")).count().show()

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+



In [0]:
despachantes2.select(Func.sum("vendas")).show()

+-----------+
|sum(vendas)|
+-----------+
|        325|
+-----------+



#### Ações e Transfrmações

In [0]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [0]:
despachantes.display()

id,nome,status,cidade,vendas,data
1,Carminda Pestana,Ativo,Santa Maria,23,2020-08-11
2,Deolinda Vilela,Ativo,Novo Hamburgo,34,2020-03-05
3,Emídio Dornelles,Ativo,Porto Alegre,34,2020-02-05
4,Felisbela Dornelles,Ativo,Porto Alegre,36,2020-02-05
5,Graça Ornellas,Ativo,Porto Alegre,12,2020-02-05
6,Matilde Rebouças,Ativo,Porto Alegre,22,2019-01-05
7,Noêmia Orriça,Ativo,Santa Maria,45,2019-10-05
8,Roque Vásquez,Ativo,Porto Alegre,65,2020-03-05
9,Uriel Queiroz,Ativo,Porto Alegre,54,2018-05-05
10,Viviana Sequeira,Ativo,Porto Alegre,0,2020-09-05


In [0]:
# Retorna uma lista
despachantes.take(1)

Out[82]: [Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11')]

In [0]:
# retorna todos os dados em lista
despachantes.collect()

Out[83]: [Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11'),
 Row(id=2, nome='Deolinda Vilela', status='Ativo', cidade='Novo Hamburgo', vendas=34, data='2020-03-05'),
 Row(id=3, nome='Emídio Dornelles', status='Ativo', cidade='Porto Alegre', vendas=34, data='2020-02-05'),
 Row(id=4, nome='Felisbela Dornelles', status='Ativo', cidade='Porto Alegre', vendas=36, data='2020-02-05'),
 Row(id=5, nome='Graça Ornellas', status='Ativo', cidade='Porto Alegre', vendas=12, data='2020-02-05'),
 Row(id=6, nome='Matilde Rebouças', status='Ativo', cidade='Porto Alegre', vendas=22, data='2019-01-05'),
 Row(id=7, nome='Noêmia   Orriça', status='Ativo', cidade='Santa Maria', vendas=45, data='2019-10-05'),
 Row(id=8, nome='Roque Vásquez', status='Ativo', cidade='Porto Alegre', vendas=65, data='2020-03-05'),
 Row(id=9, nome='Uriel Queiroz', status='Ativo', cidade='Porto Alegre', vendas=54, data='2018-05-05'),
 Row(id=10, nome='Viviana Sequeira', status='

#### Contagem

In [0]:
# Números de registros (linhas)
despachantes.count()

Out[84]: 10

#### Ordenação

In [0]:
#ordenar por uma coluna em ordem crescente
despachantes.orderBy('vendas').show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
+---+-------------------+------+-------------+------+----------+



In [0]:
#ordenar por uma coluna em ordem decrescente, utiliza o (func.col<coluna>)
despachantes.orderBy(Func.col('vendas').desc()).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [0]:
#ordenar dois campos e agregação
despachantes.orderBy(Func.col('cidade').desc(),Func.col('vendas').desc()).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
+---+-------------------+------+-------------+------+----------+



#### Agrupar

In [0]:
#Total de Vendas por cidade
despachantes.groupBy('cidade').agg(sum('vendas')).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
|  Santa Maria|         68|
|Novo Hamburgo|         34|
| Porto Alegre|        223|
+-------------+-----------+



In [0]:
#agrupar e ordenar de forma descecente
despachantes.groupBy('cidade').agg(sum('vendas')).orderBy(Func.col('sum(vendas)').desc()).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
| Porto Alegre|        223|
|  Santa Maria|         68|
|Novo Hamburgo|         34|
+-------------+-----------+



#### Filtro

In [0]:
despachantes.filter(Func.col('nome') == 'Deolinda Vilela').show()

+---+---------------+------+-------------+------+----------+
| id|           nome|status|       cidade|vendas|      data|
+---+---------------+------+-------------+------+----------+
|  2|Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
+---+---------------+------+-------------+------+----------+



#### Exportando - Salvando com outros formatos, são diretórios

In [0]:
despachantes.write.format("parquet").save("/home/fernando/dfimportparquet")



In [0]:
despachantes.write.format("csv").save("/home/fernando/dfimportcsv")



In [0]:
despachantes.write.format("json").save("/home/fernando/dfimportjson")



In [0]:
despachantes.write.format("orc").save("/home/fernando/dfimportorc")



#### Leitura

In [0]:
par = spark.read.format("parquet").load("/home/fernando/dfimportparquet/despachantes.parquet")
par.show()
par.schema()



In [0]:
js = spark.read.format("json").load("/home/fernando/dfimportjson/despachantes.json")
js.show()
js.schema()



In [0]:
or = spark.read.format("orc").load("/home/fernando/dfimportorc/despachantes.orc")
or.show()
or.schema()



In [0]:
cs = spark.read.format("csv").load("/home/fernando/dfimportcsv/despachantes.csv")
cs.show()
cs.schema()



#### Puxando Arquivo

In [0]:
# defininco schema
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"



In [0]:
# definindo schema
cs2 = spark.read.format("csv").load("/home/fernando/dfimportcsv/despachantes.csv", schema=arqschema)

