In [80]:
from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,FloatType

In [81]:
spark = SparkSession.builder.appName("Spark_builder").getOrCreate()

In [82]:
dados_list = [0,"Isabela"],[1, "Enrico"]
dataFrame1 = spark.createDataFrame(dados_list)
dataFrame1.show()

+---+-------+
| _1|     _2|
+---+-------+
|  0|Isabela|
|  1| Enrico|
+---+-------+



Criando Dataframe com Schema

In [83]:
# schema = StructType([
#     StructField("Index",IntegerType()),
#     StructField("Nome",StringType())
# ])
schema = "Index INT, NOME STRING"
dados = [0,"Enrico"],[1,"Cassio"],[2,"Yuri ALberto"],[3,"Isabela"]

In [84]:
dataFrame2 = spark.createDataFrame(dados,schema=schema)

In [85]:
dataFrame2.show()

+-----+------------+
|Index|        NOME|
+-----+------------+
|    0|      Enrico|
|    1|      Cassio|
|    2|Yuri ALberto|
|    3|     Isabela|
+-----+------------+



Usando Sum para uma funcao de agg

In [86]:
vendas = [
    {"Loja": "Loja A", "Produto": "Produto 1", "Quantidade": 20, "Valor": 30.50},
    {"Loja": "Loja B", "Produto": "Produto 2", "Quantidade": 15, "Valor": 25.75},
    {"Loja": "Loja C", "Produto": "Produto 3", "Quantidade": 10, "Valor": 15.20},
    {"Loja": "Loja D", "Produto": "Produto 4", "Quantidade": 25, "Valor": 50.00},
    {"Loja": "Loja E", "Produto": "Produto 5", "Quantidade": 12, "Valor": 40.80},
    {"Loja": "Loja F", "Produto": "Produto 1", "Quantidade": 18, "Valor": 35.25},
    {"Loja": "Loja G", "Produto": "Produto 2", "Quantidade": 22, "Valor": 28.90},
    {"Loja": "Loja H", "Produto": "Produto 3", "Quantidade": 30, "Valor": 20.15},
    {"Loja": "Loja I", "Produto": "Produto 4", "Quantidade": 17, "Valor": 45.60},
    {"Loja": "Loja J", "Produto": "Produto 5", "Quantidade": 14, "Valor": 55.30}
]
vendas.extend([
    {"Loja": "Loja A", "Produto": "Produto 6", "Quantidade": 8, "Valor": 18.75},
    {"Loja": "Loja B", "Produto": "Produto 7", "Quantidade": 19, "Valor": 22.40},
    {"Loja": "Loja C", "Produto": "Produto 8", "Quantidade": 14, "Valor": 33.90},
    {"Loja": "Loja D", "Produto": "Produto 9", "Quantidade": 27, "Valor": 48.50},
    {"Loja": "Loja E", "Produto": "Produto 10", "Quantidade": 11, "Valor": 38.60},
    {"Loja": "Loja F", "Produto": "Produto 6", "Quantidade": 23, "Valor": 29.75},
    {"Loja": "Loja G", "Produto": "Produto 7", "Quantidade": 16, "Valor": 16.20},
    {"Loja": "Loja H", "Produto": "Produto 8", "Quantidade": 32, "Valor": 42.80},
    {"Loja": "Loja I", "Produto": "Produto 9", "Quantidade": 21, "Valor": 55.00},
    {"Loja": "Loja J", "Produto": "Produto 10", "Quantidade": 13, "Valor": 27.45},
])

In [87]:
schema = StructType([
    StructField("Loja",StringType()),
    StructField("Produto",StringType()),
    StructField("Quantidade",IntegerType()),
    StructField("Valor",FloatType()),
])
dados_vendas = spark.createDataFrame(vendas, schema=schema)

In [88]:
dados_vendas.show(5)

+------+---------+----------+-----+
|  Loja|  Produto|Quantidade|Valor|
+------+---------+----------+-----+
|Loja A|Produto 1|        20| 30.5|
|Loja B|Produto 2|        15|25.75|
|Loja C|Produto 3|        10| 15.2|
|Loja D|Produto 4|        25| 50.0|
|Loja E|Produto 5|        12| 40.8|
+------+---------+----------+-----+
only showing top 5 rows



In [89]:
df_vendas_lojas = dados_vendas.select("Loja","Produto",f.expr("Quantidade * Valor").alias("Valor_Total_Produto"))

In [90]:
df_vendas_lojas.show(5)

+------+---------+-------------------+
|  Loja|  Produto|Valor_Total_Produto|
+------+---------+-------------------+
|Loja A|Produto 1|              610.0|
|Loja B|Produto 2|             386.25|
|Loja C|Produto 3|              152.0|
|Loja D|Produto 4|             1250.0|
|Loja E|Produto 5|          489.59998|
+------+---------+-------------------+
only showing top 5 rows



In [91]:
df_vendas_totais_por_loja = df_vendas_lojas.groupBy("Loja").agg(f.sum("Valor_Total_Produto").alias("Rendimento_da_Loja"))
df_vendas_totais_por_loja = df_vendas_totais_por_loja.withColumn("Rendimento_da_Loja",f.round("Rendimento_da_Loja",2))
df_vendas_totais_por_loja = df_vendas_totais_por_loja.sort("Loja")

In [92]:
df_vendas_totais_por_loja.show(5)

+------+------------------+
|  Loja|Rendimento_da_Loja|
+------+------------------+
|Loja A|             760.0|
|Loja B|            811.85|
|Loja C|             626.6|
|Loja D|            2559.5|
|Loja E|             914.2|
+------+------------------+
only showing top 5 rows



In [93]:
df_vendas_totais_por_loja.printSchema()

root
 |-- Loja: string (nullable = true)
 |-- Rendimento_da_Loja: double (nullable = true)



Lendo os arquivos e criando um Schema

In [94]:
schema = StructType([
    StructField("ID",IntegerType()),
    StructField("Nome",StringType()),
    StructField("Status",StringType()),
    StructField("Localizacao",StringType()),
    StructField("Vendas",IntegerType()),
    StructField("Data",StringType()),
])

In [95]:
dados = spark.read.csv("/home/enricolm/Documents/curso_spark_udemy/data/despachantes.csv",sep=",",schema=schema)

In [96]:
dados.show(5)

+---+-------------------+------+-------------+------+----------+
| ID|               Nome|Status|  Localizacao|Vendas|      Data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
+---+-------------------+------+-------------+------+----------+
only showing top 5 rows



In [97]:
dados.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Localizacao: string (nullable = true)
 |-- Vendas: integer (nullable = true)
 |-- Data: string (nullable = true)



In [98]:
dados.select("Data","Nome","Vendas").orderBy(f.desc("Vendas")).show()

+----------+-------------------+------+
|      Data|               Nome|Vendas|
+----------+-------------------+------+
|2020-03-05|      Roque Vásquez|    65|
|2018-05-05|      Uriel Queiroz|    54|
|2019-10-05|    Noêmia   Orriça|    45|
|2020-02-05|Felisbela Dornelles|    36|
|2020-03-05|    Deolinda Vilela|    34|
|2020-02-05|   Emídio Dornelles|    34|
|2020-08-11|   Carminda Pestana|    23|
|2019-01-05|   Matilde Rebouças|    22|
|2020-02-05|     Graça Ornellas|    12|
|2020-09-05|   Viviana Sequeira|     0|
+----------+-------------------+------+



In [99]:
dados.select("*").where(f.col("Vendas") > 20).orderBy(f.desc("Vendas")).show()

+---+-------------------+------+-------------+------+----------+
| ID|               Nome|Status|  Localizacao|Vendas|      Data|
+---+-------------------+------+-------------+------+----------+
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
+---+-------------------+------+-------------+------+----------+



In [100]:
dados.select("*").where((f.col("Status") == "Ativo") & (f.col("Vendas") < 40)).show()

+---+-------------------+------+-------------+------+----------+
| ID|               Nome|Status|  Localizacao|Vendas|      Data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [101]:
dados.count()

10

In [102]:
dados_anos = dados.withColumn("Data_Year", f.year(f.to_timestamp("Data","yyyy-MM-dd")))

In [103]:
dados_anos.show()

+---+-------------------+------+-------------+------+----------+---------+
| ID|               Nome|Status|  Localizacao|Vendas|      Data|Data_Year|
+---+-------------------+------+-------------+------+----------+---------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|     2020|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|     2020|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|     2020|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|     2020|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|     2020|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|     2019|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|     2019|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|     2020|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|     2018|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|     2020|
+---+-------------------+

In [104]:
dados_anos.select(f.col("Data_Year")).distinct().show()

+---------+
|Data_Year|
+---------+
|     2018|
|     2019|
|     2020|
+---------+



In [105]:
dados_anos.groupBy(f.col("Data_Year"))\
    .agg(f.sum("Vendas")\
         .alias("Soma_Vendas"))\
            .orderBy(f.desc("Data_Year"))\
                .show()

+---------+-----------+
|Data_Year|Soma_Vendas|
+---------+-----------+
|     2020|        204|
|     2019|         67|
|     2018|         54|
+---------+-----------+



In [106]:
dados_anos = dados_anos.drop("Data")

In [107]:
dados_anos.select

<bound method DataFrame.select of DataFrame[ID: int, Nome: string, Status: string, Localizacao: string, Vendas: int, Data_Year: int]>

In [108]:
dados_anos.groupBy("Localizacao").agg(f.sum("Vendas").alias("Vendas")).orderBy(f.col("Vendas").desc(),f.col("Localizacao").desc()).show()

+-------------+------+
|  Localizacao|Vendas|
+-------------+------+
| Porto Alegre|   223|
|  Santa Maria|    68|
|Novo Hamburgo|    34|
+-------------+------+



In [109]:
dados_anos.select("*").groupBy("Data_Year").agg(f.sum("Vendas").alias("Vendas")).show()

+---------+------+
|Data_Year|Vendas|
+---------+------+
|     2018|    54|
|     2019|    67|
|     2020|   204|
+---------+------+



In [110]:
dados_anos.select("*").orderBy(f.col("Localizacao").desc(),f.col("Vendas").desc()).show()

+---+-------------------+------+-------------+------+---------+
| ID|               Nome|Status|  Localizacao|Vendas|Data_Year|
+---+-------------------+------+-------------+------+---------+
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|     2019|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|     2020|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|     2020|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|     2018|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|     2020|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|     2020|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|     2019|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|     2020|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|     2020|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|     2020|
+---+-------------------+------+-------------+------+---------+



In [121]:
list_dados_novos = [
    (8, "Carlos", "Ativo", "Porto Alegre", 30, 2019),
    (9, "Ana", "Inativo", "São Paulo", 20, 2019),
    (18, "Mariana", "Ativo", "Rio de Janeiro", 50, 2019),
    (11, "Pedro", "Inativo", "Belo Horizonte", 40, 2019),
    (12, "Larissa", "Ativo", "Recife", 35, 2019),
    (13, "Fernando", "Inativo", "Salvador", 25, 2019),
    (14, "Isabela", "Ativo", "Brasília", 60, 2019),
    (15, "Lucas", "Inativo", "Fortaleza", 15, 2019),
    (16, "Camila", "Ativo", "Curitiba", 55, 2019),
    (17, "Rafael", "Inativo", "Manaus", 22, 2019)
]
dados_rdd_despachantes = spark.sparkContext.parallelize(list_dados_novos)

In [122]:
dados_add = spark.createDataFrame(dados_rdd_despachantes, schema=schema)

In [123]:
dados_year = dados_add.union(dados_anos)

In [126]:
dados_year = dados_year.orderBy("ID") 


In [127]:
dados_year.show()

+---+-------------------+-------+--------------+------+----+
| ID|               Nome| Status|   Localizacao|Vendas|Data|
+---+-------------------+-------+--------------+------+----+
|  1|   Carminda Pestana|  Ativo|   Santa Maria|    23|2020|
|  2|    Deolinda Vilela|  Ativo| Novo Hamburgo|    34|2020|
|  3|   Emídio Dornelles|  Ativo|  Porto Alegre|    34|2020|
|  4|Felisbela Dornelles|  Ativo|  Porto Alegre|    36|2020|
|  5|     Graça Ornellas|  Ativo|  Porto Alegre|    12|2020|
|  6|   Matilde Rebouças|  Ativo|  Porto Alegre|    22|2019|
|  7|    Noêmia   Orriça|  Ativo|   Santa Maria|    45|2019|
|  8|             Carlos|  Ativo|  Porto Alegre|    30|2019|
|  8|      Roque Vásquez|  Ativo|  Porto Alegre|    65|2020|
|  9|                Ana|Inativo|     São Paulo|    20|2019|
|  9|      Uriel Queiroz|  Ativo|  Porto Alegre|    54|2018|
| 10|   Viviana Sequeira|  Ativo|  Porto Alegre|     0|2020|
| 11|              Pedro|Inativo|Belo Horizonte|    40|2019|
| 12|            Larissa

In [131]:
dados_year.filter((f.col("Status") == "Inativo" )& (f.col("Vendas") > 24)).show()

+---+--------+-------+--------------+------+----+
| ID|    Nome| Status|   Localizacao|Vendas|Data|
+---+--------+-------+--------------+------+----+
| 11|   Pedro|Inativo|Belo Horizonte|    40|2019|
| 13|Fernando|Inativo|      Salvador|    25|2019|
+---+--------+-------+--------------+------+----+



In [132]:
dados_year.groupBy("Localizacao").agg(f.sum("Vendas").alias("Vendas")).orderBy(f.col("Localizacao").desc()).show()

+--------------+------+
|   Localizacao|Vendas|
+--------------+------+
|     São Paulo|    20|
|   Santa Maria|    68|
|      Salvador|    25|
|Rio de Janeiro|    50|
|        Recife|    35|
|  Porto Alegre|   253|
| Novo Hamburgo|    34|
|        Manaus|    22|
|     Fortaleza|    15|
|      Curitiba|    55|
|      Brasília|    60|
|Belo Horizonte|    40|
+--------------+------+

