In [4]:
# Carregando libs e iniciando Session
from pyspark.sql import SparkSession, functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local[1]").appName("Testing").getOrCreate()

In [5]:
# Importando dados com definicao de schema
schema = " Id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"
vend = spark.read.csv('arquivos/despachantes.csv', header=False,schema=schema)
vend.show()

+---+-------------------+------+-------------+------+----------+
| Id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [123]:
# Importando dados com load e sem definicao do schema
vend2 = spark.read.load('arquivos/despachantes.csv', header=False, format='csv', sep=',', inferSchema=True)
vend2.show()

+---+-------------------+-----+-------------+---+-------------------+
|_c0|                _c1|  _c2|          _c3|_c4|                _c5|
+---+-------------------+-----+-------------+---+-------------------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05 00:00:00|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05 00:00:00|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05 00:00:00|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05 00:00:00|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05 00:00:00|
+---+-------------------+-----+-------------+---+-------------------+



In [124]:
# Conferindo Schema inferido
for i in range(0,len(vend2.schema)):
    if vend2.schema.fields[i].dataType == vend.schema.fields[i].dataType:
        print(f'Para a coluna {i}, o type inferido foi igual ao definido.\n')
    else:
        print(f'O type da coluna {i} definido foi diferente do inferido:')
        print(f'Definido {vend.schema.fields[i].dataType}')
        print(f'Inferido {vend2.schema.fields[i].dataType}')

Para a coluna 0, o type inferido foi igual ao definido.

Para a coluna 1, o type inferido foi igual ao definido.

Para a coluna 2, o type inferido foi igual ao definido.

Para a coluna 3, o type inferido foi igual ao definido.

Para a coluna 4, o type inferido foi igual ao definido.

O type da coluna 5 definido foi diferente do inferido:
Definido StringType()
Inferido TimestampType()


In [6]:
# Dados em forma de lista
vend.take(1)

[Row(Id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11')]

In [7]:
# Todos os dados em forma de lista
vend.collect()

[Row(Id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11'),
 Row(Id=2, nome='Deolinda Vilela', status='Ativo', cidade='Novo Hamburgo', vendas=34, data='2020-03-05'),
 Row(Id=3, nome='Emídio Dornelles', status='Ativo', cidade='Porto Alegre', vendas=34, data='2020-02-05'),
 Row(Id=4, nome='Felisbela Dornelles', status='Ativo', cidade='Porto Alegre', vendas=36, data='2020-02-05'),
 Row(Id=5, nome='Graça Ornellas', status='Ativo', cidade='Porto Alegre', vendas=12, data='2020-02-05'),
 Row(Id=6, nome='Matilde Rebouças', status='Ativo', cidade='Porto Alegre', vendas=22, data='2019-01-05'),
 Row(Id=7, nome='Noêmia   Orriça', status='Ativo', cidade='Santa Maria', vendas=45, data='2019-10-05'),
 Row(Id=8, nome='Roque Vásquez', status='Ativo', cidade='Porto Alegre', vendas=65, data='2020-03-05'),
 Row(Id=9, nome='Uriel Queiroz', status='Ativo', cidade='Porto Alegre', vendas=54, data='2018-05-05'),
 Row(Id=10, nome='Viviana Sequeira', status='Ativo', c

In [11]:
# Contagem de registros
vend.count()

10

In [12]:
# Ordenando
vend.orderBy('Vendas').show()

+---+-------------------+------+-------------+------+----------+
| Id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
+---+-------------------+------+-------------+------+----------+



In [13]:
# Ordenando em ordem decrescente
vend.orderBy(func.col('vendas').desc()).show()

+---+-------------------+------+-------------+------+----------+
| Id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [14]:
# Ordenando por dois parametros
vend.orderBy(('cidade'), func.col('vendas').desc()).show()

+---+-------------------+------+-------------+------+----------+
| Id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
+---+-------------------+------+-------------+------+----------+



In [125]:
# Importando apenas vendas > 20
vend.select('Id','nome','vendas').where(func.col('vendas')>20).show()

+---+-------------------+------+
| Id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



In [126]:
# Importando apenas vendas > 20 e < 40
vend.select('Id','nome','vendas').where((func.col('vendas')>20)&(func.col('vendas')<40)).show()

+---+-------------------+------+
| Id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



In [127]:
# Importando apenas vendas > 40 OU de Santa Maria
vend.select('Id','nome','vendas','cidade').where((func.col('vendas')>40)|(func.col('cidade')=='Santa Maria')).show()

+---+----------------+------+------------+
| Id|            nome|vendas|      cidade|
+---+----------------+------+------------+
|  1|Carminda Pestana|    23| Santa Maria|
|  7| Noêmia   Orriça|    45| Santa Maria|
|  8|   Roque Vásquez|    65|Porto Alegre|
|  9|   Uriel Queiroz|    54|Porto Alegre|
+---+----------------+------+------------+



In [128]:
# Renomeando coluna
# Bom lembrar que NAO E POSSIVEL mudar o tipo no DF original, portanto um novo deve ser criado
novodf = vend.withColumnRenamed('nome','Nome')
novodf.show()

+---+-------------------+------+-------------+------+----------+
| Id|               Nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [129]:
# Alterando o tipo da coluna
print(vend.schema)
novodf2 = vend.withColumn('data2', to_timestamp(func.col('data'), 'yyyy-MM-dd'))
novodf2.show()
print(novodf2.schema)

StructType([StructField('Id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True)])
+---+-------------------+------+-------------+------+----------+-------------------+
| Id|               nome|status|       cidade|vendas|      data|              data2|
+---+-------------------+------+-------------+------+----------+-------------------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|2020-08-11 00:00:00|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|2020-03-05 00:00:00|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|2020-02-05 00:00:00|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|2020-02-05 00:00:00|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|2020-02-05 00:00:00|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|20

In [130]:
# Select em transformacoes de datas
vend.select(year('data'), month('data')).show()

+----------+-----------+
|year(data)|month(data)|
+----------+-----------+
|      2020|          8|
|      2020|          3|
|      2020|          2|
|      2020|          2|
|      2020|          2|
|      2019|          1|
|      2019|         10|
|      2020|          3|
|      2018|          5|
|      2020|          9|
+----------+-----------+



In [131]:
# Select em transformacoes de datas
vend.select(year('data')).distinct().show()

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+



In [132]:
# Total de Vendas por Mes ordenadas pelo total vendido
vend.groupBy(month('data')).sum('vendas').orderBy(sum('vendas')).show()

+-----------+-----------+
|month(data)|sum(vendas)|
+-----------+-----------+
|          9|          0|
|          1|         22|
|          8|         23|
|         10|         45|
|          5|         54|
|          2|         82|
|          3|         99|
+-----------+-----------+



In [133]:
# Contagem de Ano em ordem decrescente
vend.groupBy(year('data')).count().orderBy('count', ascending=False).show()

+----------+-----+
|year(data)|count|
+----------+-----+
|      2020|    7|
|      2019|    2|
|      2018|    1|
+----------+-----+



In [134]:
# Total de Vendas por cidade ordenadas pelo total vendido considerando apenas cidades que venderam mais de 50
vend.groupBy('cidade').sum('vendas').orderBy(sum('vendas')).where(func.col('sum(vendas)')>50).show()

+------------+-----------+
|      cidade|sum(vendas)|
+------------+-----------+
| Santa Maria|         68|
|Porto Alegre|        223|
+------------+-----------+



In [18]:
# Total de Vendas por cidade ordenadas pelo total vendido considerando apenas as vendas de Id inferior a 8
vend.filter(func.col('Id')<8).groupBy('cidade').sum('vendas').orderBy(sum('vendas')).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
|Novo Hamburgo|         34|
|  Santa Maria|         68|
| Porto Alegre|        104|
+-------------+-----------+

