In [1]:
!pip3 install pyspark
!pip3 install findspark



In [5]:
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [None]:
spark = SparkSession.builder.appName('app_spark').getOrCreate()

### DataFrames

In [6]:
df1 = spark.createDataFrame([('Pedro', 10), ('Maria', 15), ('Higor', 22)])
df1.show()

NameError: name 'spark' is not defined

### Criando Schema

In [None]:
schema = 'id INT, nome STRING'
dados = [[1, 'Pedro'], [2, 'Maria']]

df2 = spark.createDataFrame(dados, schema)
df2.show()

+---+-----+
| id| nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
+---+-----+



### Utilizando agregações

In [None]:
from pyspark.sql.functions import sum

In [None]:
schema2 = 'produtos STRING, qtd_vendida INT'
vendas = [['Caneta', 10], ['Lápis', 30], ['Caneta', 30]]
df3 = spark.createDataFrame(vendas, schema2)
df3.show()

+--------+-----------+
|produtos|qtd_vendida|
+--------+-----------+
|  Caneta|         10|
|   Lápis|         30|
|  Caneta|         30|
+--------+-----------+



In [None]:
agrupado = df3.groupBy('produtos').agg(sum('qtd_vendida'))
agrupado.show()

+--------+----------------+
|produtos|sum(qtd_vendida)|
+--------+----------------+
|  Caneta|              40|
|   Lápis|              30|
+--------+----------------+



### Selecionar colunas

In [None]:
df3.select('produtos').show()

+--------+
|produtos|
+--------+
|  Caneta|
|   Lápis|
|  Caneta|
+--------+



### Criando cálculos (expressões)

In [None]:
from pyspark.sql.functions import expr

In [None]:
df3.select('produtos', 'qtd_vendida', expr('qtd_vendida * 0.2')).show()

+--------+-----------+-------------------+
|produtos|qtd_vendida|(qtd_vendida * 0.2)|
+--------+-----------+-------------------+
|  Caneta|         10|                2.0|
|   Lápis|         30|                6.0|
|  Caneta|         30|                6.0|
+--------+-----------+-------------------+



### Checar schema e colunas

In [None]:
df3.schema

StructType([StructField('produtos', StringType(), True), StructField('qtd_vendida', IntegerType(), True)])

In [None]:
df3.columns

['produtos', 'qtd_vendida']

### Coletando dados de csv

In [4]:
from pyspark.sql.types import *

In [6]:
csv_schema = 'id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING'
despachantes = spark.read.csv('dados_de_exemplo\despachantes.csv', header=False, schema=csv_schema)
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
despachantes_auto_schema = spark.read.load('dados_de_exemplo\despachantes.csv', header=False, format='csv', sep=',', inferSchema=True)
despachantes_auto_schema.show()

+---+-------------------+-----+-------------+---+-------------------+
|_c0|                _c1|  _c2|          _c3|_c4|                _c5|
+---+-------------------+-----+-------------+---+-------------------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05 00:00:00|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05 00:00:00|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05 00:00:00|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05 00:00:00|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05 00:00:00|
+---+-------------------+-----+-------------+---+-------------------+



### Comparando os schemas

In [None]:
despachantes.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True)])

In [None]:
despachantes_auto_schema.schema

StructType([StructField('_c0', IntegerType(), True), StructField('_c1', StringType(), True), StructField('_c2', StringType(), True), StructField('_c3', StringType(), True), StructField('_c4', IntegerType(), True), StructField('_c5', TimestampType(), True)])

### Filtrando com pySpark

In [8]:
from pyspark.sql import functions

In [13]:
despachantes.select('*').where(functions.col('vendas') > 20).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
+---+-------------------+------+-------------+------+----------+



In [20]:
despachantes.select('*').where((functions.col('vendas') > 20) & (functions.col('vendas') < 40)).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
+---+-------------------+------+-------------+------+----------+



## Renomear colunas e alterar tipo da coluna

In [25]:
novodf = despachantes.withColumnRenamed('nome', 'nomes')
novodf.columns

['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

In [27]:
from pyspark.sql.functions import *

In [28]:
despachantes2 = despachantes.withColumn('data2', to_timestamp(functions.col('data'), 'yyyy-MM-dd'))
despachantes2.show()

+---+-------------------+------+-------------+------+----------+-------------------+
| id|               nome|status|       cidade|vendas|      data|              data2|
+---+-------------------+------+-------------+------+----------+-------------------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|2020-08-11 00:00:00|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|2020-03-05 00:00:00|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|2020-02-05 00:00:00|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|2020-02-05 00:00:00|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|2020-02-05 00:00:00|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|2019-10-05 00:00:00|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|2020-03-05 00:00:00|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|2

### Algumas funções com o select

In [30]:
despachantes2.select(year('data')).show()

+----------+
|year(data)|
+----------+
|      2020|
|      2020|
|      2020|
|      2020|
|      2020|
|      2019|
|      2019|
|      2020|
|      2018|
|      2020|
+----------+



In [33]:
despachantes2.select(year('data')).distinct().show()

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+



In [34]:
despachantes2.select('nome', year('data')).orderBy('nome').show()

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Carminda Pestana|      2020|
|    Deolinda Vilela|      2020|
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|     Graça Ornellas|      2020|
|   Matilde Rebouças|      2019|
|    Noêmia   Orriça|      2019|
|      Roque Vásquez|      2020|
|      Uriel Queiroz|      2018|
|   Viviana Sequeira|      2020|
+-------------------+----------+



In [35]:
despachantes2.select('data').groupBy(year('data')).count().show()

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+



In [36]:
despachantes2.select(functions.sum('vendas')).show()

+-----------+
|sum(vendas)|
+-----------+
|        325|
+-----------+



In [48]:
despachantes.groupBy(year('data')).sum('vendas').show()

+----------+-----------+
|year(data)|sum(vendas)|
+----------+-----------+
|      2018|         54|
|      2019|         67|
|      2020|        204|
+----------+-----------+



### Principais ações e transformações

In [49]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [51]:
despachantes.take(2)

[Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11'),
 Row(id=2, nome='Deolinda Vilela', status='Ativo', cidade='Novo Hamburgo', vendas=34, data='2020-03-05')]

In [58]:
despachantes.collect()

[Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11'),
 Row(id=2, nome='Deolinda Vilela', status='Ativo', cidade='Novo Hamburgo', vendas=34, data='2020-03-05'),
 Row(id=3, nome='Emídio Dornelles', status='Ativo', cidade='Porto Alegre', vendas=34, data='2020-02-05'),
 Row(id=4, nome='Felisbela Dornelles', status='Ativo', cidade='Porto Alegre', vendas=36, data='2020-02-05'),
 Row(id=5, nome='Graça Ornellas', status='Ativo', cidade='Porto Alegre', vendas=12, data='2020-02-05'),
 Row(id=6, nome='Matilde Rebouças', status='Ativo', cidade='Porto Alegre', vendas=22, data='2019-01-05'),
 Row(id=7, nome='Noêmia   Orriça', status='Ativo', cidade='Santa Maria', vendas=45, data='2019-10-05'),
 Row(id=8, nome='Roque Vásquez', status='Ativo', cidade='Porto Alegre', vendas=65, data='2020-03-05'),
 Row(id=9, nome='Uriel Queiroz', status='Ativo', cidade='Porto Alegre', vendas=54, data='2018-05-05'),
 Row(id=10, nome='Viviana Sequeira', status='Ativo', c

In [59]:
despachantes.count()

10

### Ordenação

In [60]:
despachantes.orderBy('vendas').show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
+---+-------------------+------+-------------+------+----------+



In [64]:
despachantes.orderBy('cidade', 'vendas', ascending=False).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
+---+-------------------+------+-------------+------+----------+



### Group by

In [65]:
despachantes.groupBy('cidade').sum('vendas').show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
|  Santa Maria|         68|
|Novo Hamburgo|         34|
| Porto Alegre|        223|
+-------------+-----------+



In [70]:
despachantes.groupBy('cidade').sum('vendas').orderBy('sum(vendas)', ascending=False).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
| Porto Alegre|        223|
|  Santa Maria|         68|
|Novo Hamburgo|         34|
+-------------+-----------+



### Filtrando valores

In [71]:
despachantes.filter('vendas > 30').show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
+---+-------------------+------+-------------+------+----------+



In [76]:
despachantes.filter(functions.col('nome') == 'Felisbela Dornelles').show()

+---+-------------------+------+------------+------+----------+
| id|               nome|status|      cidade|vendas|      data|
+---+-------------------+------+------------+------+----------+
|  4|Felisbela Dornelles| Ativo|Porto Alegre|    36|2020-02-05|
+---+-------------------+------+------------+------+----------+



In [79]:
despachantes.filter(despachantes.nome == 'Felisbela Dornelles').show()

+---+-------------------+------+------------+------+----------+
| id|               nome|status|      cidade|vendas|      data|
+---+-------------------+------+------------+------+----------+
|  4|Felisbela Dornelles| Ativo|Porto Alegre|    36|2020-02-05|
+---+-------------------+------+------------+------+----------+



### Exportando dados

In [None]:
despachantes.write.format('parquet').save('exports\parquet')
despachantes.write.format('csv').save('exports\csv')
despachantes.write.format('orc').save('exports\orc')
despachantes.write.format('json').save('exports\json')

### Importando dados

In [None]:
parquet = spark.read.format('parquet').load('exports\parquet\part-00000-4c0ba10e-6d1f-49a5-81c8-d53b482a33a4-c000.snappy.parquet')
parquet.show()

In [None]:
json = spark.read.format('json').load('exports\json\part-00000-d78431d1-ed00-4726-805a-de30b55fef34-c000.json')
json.show()

In [None]:
orc = spark.read.format('orc').load('exports\orc\part-00000-60a09352-ddfb-476d-8c5d-efd6b763d1af-c000.snappy.orc')
orc.show()

In [None]:
schema = 'id INT, nome STRING, status STRING, cidade STRING, vendas INT, data DATE'
csv = spark.read.format('csv').load('exports\csv\part-00000-74c0faf7-4102-4e76-bc77-3179e7385592-c000.csv', schema=schema)
csv.show()
csv.schema