In [3]:
import os
import findspark
findspark.init(os.environ['SPARK_HOME'])

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [4]:
# Inicializando a sessão do Spark
spark = SparkSession.builder.appName("Exemplo").getOrCreate()

## Data Transformation
---

In [5]:
# Criando um DataFrame de exemplo
data = [("Produto1", "CategoriaA", 100, 50),
        ("Produto2", "CategoriaB", 150, None),
        ("Produto3", "CategoriaA", 200, 30)]

columns = ["Produto", "Categoria", "Preco", "QuantidadeVendida"]

df = spark.createDataFrame(data, columns)

# Exibindo o DataFrame original
print("DataFrame Original:")
df.show()

DataFrame Original:
+--------+----------+-----+-----------------+
| Produto| Categoria|Preco|QuantidadeVendida|
+--------+----------+-----+-----------------+
|Produto1|CategoriaA|  100|               50|
|Produto2|CategoriaB|  150|             NULL|
|Produto3|CategoriaA|  200|               30|
+--------+----------+-----+-----------------+



* **Seleção de Colunas:** Escolher quais colunas são relevantes para a análise ou para o modelo.

In [9]:
df_selecionado = df.select("Produto", "Preco")
print("1. Seleção de Colunas:")
df_selecionado.show()

1. Seleção de Colunas:
+--------+-----+
| Produto|Preco|
+--------+-----+
|Produto1|  100|
|Produto2|  150|
|Produto3|  200|
+--------+-----+



* **Filtragem de Linhas:** Excluir ou manter linhas com base em condições específicas.

In [10]:
df_filtrado = df.filter(col("QuantidadeVendida").isNotNull())
print("2. Filtragem de Linhas:")
df_filtrado.show()

2. Filtragem de Linhas:
+--------+----------+-----+-----------------+
| Produto| Categoria|Preco|QuantidadeVendida|
+--------+----------+-----+-----------------+
|Produto1|CategoriaA|  100|               50|
|Produto3|CategoriaA|  200|               30|
+--------+----------+-----+-----------------+



* **Tratamento de Valores Nulos:** Lidar com valores nulos, seja removendo, preenchendo ou substituindo.

In [11]:
df_tratado_nulos = df.fillna(0, subset=["QuantidadeVendida"])
print("3. Tratamento de Valores Nulos:")
df_tratado_nulos.show()

3. Tratamento de Valores Nulos:
+--------+----------+-----+-----------------+
| Produto| Categoria|Preco|QuantidadeVendida|
+--------+----------+-----+-----------------+
|Produto1|CategoriaA|  100|               50|
|Produto2|CategoriaB|  150|                0|
|Produto3|CategoriaA|  200|               30|
+--------+----------+-----+-----------------+



* **Criar Novas Colunas:** Derivar novas colunas com base em cálculos ou transformações de colunas existentes.

In [12]:
df_com_nova_coluna = df.withColumn("TotalVendas", col("Preco") * col("QuantidadeVendida"))
print("4. Criar Novas Colunas:")
df_com_nova_coluna.show()

4. Criar Novas Colunas:
+--------+----------+-----+-----------------+-----------+
| Produto| Categoria|Preco|QuantidadeVendida|TotalVendas|
+--------+----------+-----+-----------------+-----------+
|Produto1|CategoriaA|  100|               50|       5000|
|Produto2|CategoriaB|  150|             NULL|       NULL|
|Produto3|CategoriaA|  200|               30|       6000|
+--------+----------+-----+-----------------+-----------+



* **Agregação e Resumo:** Resumir dados usando funções agregadas como soma, média, mínimo, máximo, etc.

In [13]:
df_resumido = df.groupBy("Categoria").agg({"Preco": "avg", "QuantidadeVendida": "sum"})
print("5. Agregação e Resumo:")
df_resumido.show()

5. Agregação e Resumo:
+----------+----------------------+----------+
| Categoria|sum(QuantidadeVendida)|avg(Preco)|
+----------+----------------------+----------+
|CategoriaA|                    80|     150.0|
|CategoriaB|                  NULL|     150.0|
+----------+----------------------+----------+



* **Join e Concatenação:** Combinar dados de diferentes fontes usando operações de join ou concatenando DataFrames.

In [15]:
df_categorias = spark.createDataFrame([("CategoriaA", "Descrição1"), ("CategoriaB", "Descrição2")], ["Categoria", "Descricao"])
df_categorias.show()

+----------+----------+
| Categoria| Descricao|
+----------+----------+
|CategoriaA|Descrição1|
|CategoriaB|Descrição2|
+----------+----------+



In [16]:
df_junto = df.join(df_categorias, on="Categoria", how="left_outer")
print("6. Join e Concatenação:")
df_junto.show()

6. Join e Concatenação:
+----------+--------+-----+-----------------+----------+
| Categoria| Produto|Preco|QuantidadeVendida| Descricao|
+----------+--------+-----+-----------------+----------+
|CategoriaA|Produto1|  100|               50|Descrição1|
|CategoriaB|Produto2|  150|             NULL|Descrição2|
|CategoriaA|Produto3|  200|               30|Descrição1|
+----------+--------+-----+-----------------+----------+



* **Ordenação e Classificação:** Ordenar os dados com base em uma ou mais colunas.

In [19]:
df_ordenado = df.sort(col("Preco").desc())
print("7. Ordenação e Classificação:")
df_ordenado.show()

AssertionError: 

In [18]:
spark.stop()