<a href="https://colab.research.google.com/github/edsonlourenco/python_tutorials/blob/main/PySpark_Guia_Essencial_Basico.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark | Guia essencial e básico | V1 | Edson Lourenço
O objetivo desse material é introduzir o desenvolvedor no universo do **Apache Spark** com o **Python**.

## Instalando pacotes necessários

In [23]:
from IPython.display import clear_output

!pip install --upgrade pip
!pip install findspark
!pip install pyspark

clear_output(wait=False) #limpa o output (saída)

## Importando pacotes necessários

In [24]:
import findspark, pyspark
from pyspark.sql import SparkSession
from pyspark import SparkFiles

## Obtendo instância do Spark

In [25]:
findspark.init()
spark = SparkSession.builder.getOrCreate()

## Lendo fonte de dados do GitHub

In [26]:
url = 'https://raw.githubusercontent.com/edsonlourenco/public_datasets/main/Carros.csv'
spark.sparkContext.addFile(url)
csv_cars = SparkFiles.get("Carros.csv")
df_cars = spark.read.csv(csv_cars, header=True, inferSchema=True, sep=';')

## Como exibo os dados?

In [27]:
df_cars.show(truncate=False)

+-------+---------+-----------+---------------+----+-----+---------+-----------+-------+-----------+---+
|Consumo|Cilindros|Cilindradas|RelEixoTraseiro|Peso|Tempo|TipoMotor|Transmissao|Marchas|Carburadors|HP |
+-------+---------+-----------+---------------+----+-----+---------+-----------+-------+-----------+---+
|21     |6        |160        |39             |262 |1646 |0        |1          |4      |4          |110|
|21     |6        |160        |39             |2875|1702 |0        |1          |4      |4          |110|
|228    |4        |108        |385            |232 |1861 |1        |1          |4      |1          |93 |
|214    |6        |258        |308            |3215|1944 |1        |0          |3      |1          |110|
|187    |8        |360        |315            |344 |1702 |0        |0          |3      |2          |175|
|181    |6        |225        |276            |346 |2022 |1        |0          |3      |1          |105|
|143    |8        |360        |321            |357 |158

## Como contar total de linhas (rows)?

In [28]:
df_cars.count()

32

## Como seleciono os dados de uma coluna?

In [29]:
df_cars.select("tempo").show()

+-----+
|tempo|
+-----+
| 1646|
| 1702|
| 1861|
| 1944|
| 1702|
| 2022|
| 1584|
|   20|
|  229|
|  183|
|  189|
|  174|
|  176|
|   18|
| 1798|
| 1782|
| 1742|
| 1947|
| 1852|
|  199|
+-----+
only showing top 20 rows



## Como seleciono os dados de certas colunas?

In [30]:
cols = ['Cilindros', 'Cilindradas', 'HP', 'Peso']
df_cars.select(cols).show()

+---------+-----------+---+----+
|Cilindros|Cilindradas| HP|Peso|
+---------+-----------+---+----+
|        6|        160|110| 262|
|        6|        160|110|2875|
|        4|        108| 93| 232|
|        6|        258|110|3215|
|        8|        360|175| 344|
|        6|        225|105| 346|
|        8|        360|245| 357|
|        4|       1467| 62| 319|
|        4|       1408| 95| 315|
|        6|       1676|123| 344|
|        6|       1676|123| 344|
|        8|       2758|180| 407|
|        8|       2758|180| 373|
|        8|       2758|180| 378|
|        8|        472|205| 525|
|        8|        460|215|5424|
|        8|        440|230|5345|
|        4|        787| 66|  22|
|        4|        757| 52|1615|
|        4|        711| 65|1835|
+---------+-----------+---+----+
only showing top 20 rows



## Como ordeno por uma coluna?

In [31]:
cols = ['Cilindros', 'Cilindradas', 'HP', 'Peso']
df_cars.orderBy("peso").select(cols).show()

+---------+-----------+---+----+
|Cilindros|Cilindradas| HP|Peso|
+---------+-----------+---+----+
|        4|        787| 66|  22|
|        4|       1203| 91| 214|
|        4|        108| 93| 232|
|        6|        160|110| 262|
|        6|        145|175| 277|
|        4|        121|109| 278|
|        4|       1408| 95| 315|
|        8|        351|264| 317|
|        4|       1467| 62| 319|
|        6|       1676|123| 344|
|        6|       1676|123| 344|
|        8|        360|175| 344|
|        6|        225|105| 346|
|        8|        318|150| 352|
|        8|        360|245| 357|
|        8|        301|335| 357|
|        8|       2758|180| 373|
|        8|       2758|180| 378|
|        8|        350|245| 384|
|        8|       2758|180| 407|
+---------+-----------+---+----+
only showing top 20 rows



## Como agrupo?

In [32]:
# do pacote pyspark e seu subpacote sql, importamos o módulo functions
# demos a ele um alias chamado de F
from pyspark.sql import functions as F

df_cars.groupBy("Cilindros").agg(F.count("*")).show()

+---------+--------+
|Cilindros|count(1)|
+---------+--------+
|        6|       7|
|        4|      11|
|        8|      14|
+---------+--------+



## Como  faço alias de coluna?

In [33]:
from pyspark.sql import functions as F

df_cars.groupBy("Cilindros").agg(F.count("*").alias("Quantidade")).show()

+---------+----------+
|Cilindros|Quantidade|
+---------+----------+
|        6|         7|
|        4|        11|
|        8|        14|
+---------+----------+



## O código ficou grande!! Como deixo mais elegante?

In [34]:
from pyspark.sql import functions as F

(
           df_cars
                            .groupBy("Cilindros")
                            .agg(F.count("*")
                            .alias("Quantidade"))
                            .show()
)



+---------+----------+
|Cilindros|Quantidade|
+---------+----------+
|        6|         7|
|        4|        11|
|        8|        14|
+---------+----------+



## Como crio uma coluna nova?

In [35]:
from pyspark.sql import functions as F

(
           df_cars
                            .groupBy("Cilindros")
                            .agg(F.count("*")
                            .alias("Quantidade"))
                            .select(
                                              F.lit("Carros").alias("Categoria"),
                                              "*"
                                           )
                            .show()
)



+---------+---------+----------+
|Categoria|Cilindros|Quantidade|
+---------+---------+----------+
|   Carros|        6|         7|
|   Carros|        4|        11|
|   Carros|        8|        14|
+---------+---------+----------+



## Como crio uma coluna nova e especifico algumas outras?

In [36]:
from pyspark.sql import functions as F

(
           df_cars
                            .groupBy("Cilindros")
                            .agg(F.count("*")
                            .alias("Quantidade"))
                            .select(
                                              F.lit("Carros").alias("Categoria"),
                                              F.col("Cilindros"),
                                              F.col("Quantidade")
                                           )
                            .show()
)



+---------+---------+----------+
|Categoria|Cilindros|Quantidade|
+---------+---------+----------+
|   Carros|        6|         7|
|   Carros|        4|        11|
|   Carros|        8|        14|
+---------+---------+----------+



## É possível efetuar somas?

In [37]:
from pyspark.sql import functions as F

(
 df_cars
                  .groupBy("TipoMotor")
                  .agg(F.sum("Peso").alias("Peso_Total"))
                  .select(
                                    F.lit("Carros").alias("Categoria"),
                                    F.col("TipoMotor"),
                                    F.col("Peso_Total")
                                 )
                   .show()
)




+---------+---------+----------+
|Categoria|TipoMotor|Peso_Total|
+---------+---------+----------+
|   Carros|        1|     14778|
|   Carros|        0|     25471|
+---------+---------+----------+



## É possível tirar a média?

In [38]:
from pyspark.sql import functions as F

(
 df_cars
                  .groupBy("TipoMotor")
                  .agg(F.avg("Cilindradas").alias("Cilindradas_Avg"))
                  .select(
                                    F.lit("Carros").alias("Categoria"),
                                    F.col("TipoMotor"),
                                    F.col("Cilindradas_Avg")
                                 )
                   .show()
)




+---------+---------+-----------------+
|Categoria|TipoMotor|  Cilindradas_Avg|
+---------+---------+-----------------+
|   Carros|        1|816.0714285714286|
|   Carros|        0|            781.0|
+---------+---------+-----------------+



## Cilindradas_Avg ficou float, como resolvo?

In [39]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

(
 df_cars
                  .groupBy("TipoMotor")
                  .agg(
                                F.avg("Cilindradas").cast(IntegerType()
                            ).alias("Cilindradas_Avg"))
                  .select(
                                    F.lit("Carros").alias("Categoria"),
                                    F.col("TipoMotor"),
                                    F.col("Cilindradas_Avg")
                                 )
                   .show()
)




+---------+---------+---------------+
|Categoria|TipoMotor|Cilindradas_Avg|
+---------+---------+---------------+
|   Carros|        1|            816|
|   Carros|        0|            781|
+---------+---------+---------------+



## Como posso filtrar com where?

In [40]:
from pyspark.sql import functions as F

# modo 1
df_cars.where(F.col("TipoMotor") == 1).select("*").show()
# modo 2
df_cars.where(df_cars["TipoMotor"] == 1).select("*").show()
# modo 3
df_cars.where(df_cars.TipoMotor == 1).select("*").show()

+-------+---------+-----------+---------------+----+-----+---------+-----------+-------+-----------+---+
|Consumo|Cilindros|Cilindradas|RelEixoTraseiro|Peso|Tempo|TipoMotor|Transmissao|Marchas|Carburadors| HP|
+-------+---------+-----------+---------------+----+-----+---------+-----------+-------+-----------+---+
|    228|        4|        108|            385| 232| 1861|        1|          1|      4|          1| 93|
|    214|        6|        258|            308|3215| 1944|        1|          0|      3|          1|110|
|    181|        6|        225|            276| 346| 2022|        1|          0|      3|          1|105|
|    244|        4|       1467|            369| 319|   20|        1|          0|      4|          2| 62|
|    228|        4|       1408|            392| 315|  229|        1|          0|      4|          2| 95|
|    192|        6|       1676|            392| 344|  183|        1|          0|      4|          4|123|
|    178|        6|       1676|            392| 344|  1

## Como posso filtrar com filter?

In [41]:
from pyspark.sql import functions as F

# modo 1
df_cars.filter(F.col("TipoMotor") == 1).select("*").show()
# modo 2
df_cars.filter(df_cars["TipoMotor"] == 1).select("*").show()
# modo 3
df_cars.filter(df_cars.TipoMotor == 1).select("*").show()

+-------+---------+-----------+---------------+----+-----+---------+-----------+-------+-----------+---+
|Consumo|Cilindros|Cilindradas|RelEixoTraseiro|Peso|Tempo|TipoMotor|Transmissao|Marchas|Carburadors| HP|
+-------+---------+-----------+---------------+----+-----+---------+-----------+-------+-----------+---+
|    228|        4|        108|            385| 232| 1861|        1|          1|      4|          1| 93|
|    214|        6|        258|            308|3215| 1944|        1|          0|      3|          1|110|
|    181|        6|        225|            276| 346| 2022|        1|          0|      3|          1|105|
|    244|        4|       1467|            369| 319|   20|        1|          0|      4|          2| 62|
|    228|        4|       1408|            392| 315|  229|        1|          0|      4|          2| 95|
|    192|        6|       1676|            392| 344|  183|        1|          0|      4|          4|123|
|    178|        6|       1676|            392| 344|  1