In [143]:
import findspark
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F
from typing import Any
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
import pandas as pd

import os

os.makedirs("data", exist_ok=True)

In [156]:

findspark.init()

spark = SparkSession.builder.appName('teste_pratico')\
.config('spark.master', 'local[4]')\
.config('spark.shuffle.sql.partitions', 2)\
.getOrCreate()

spark

## Parte 1: Manipulação de Dados
### Criação de DataFrame

In [145]:
# Dados
data = [
    ("Alice", 34, "Data Scientist"),
    ("Bob", 45, "Data Engineer"),
    ("Cathy", 29, "Data Analyst"),
    ("David", 35, "Data Scientist")
]
columns = ["Name", "Age", "Occupation"]

# Criação do DataFrame em Spark
df = spark.createDataFrame(data, columns)
df.show()

+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|Cathy| 29|  Data Analyst|
|David| 35|Data Scientist|
+-----+---+--------------+



### Filtro de DataFrame (Age>30)

In [146]:
df_age30Plus = df.select("Name", "Age").filter(df["Age"] > 30)
df_age30Plus.show()


+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|David| 35|
+-----+---+



### Agrupamento e Ordenação de DataFrame (Age: Médias)

In [147]:
df_occMean = df.groupBy("Occupation").agg(F.avg("Age").alias("Average Age")).orderBy("Average Age", ascending=False)
df_occMean.show()

+--------------+-----------+
|    Occupation|Average Age|
+--------------+-----------+
| Data Engineer|       45.0|
|Data Scientist|       34.5|
|  Data Analyst|       29.0|
+--------------+-----------+



## Parte 2: Funções Avançadas
### Uso de UDFs (User Defined Functions)
#### Categorização das idades usando udfs (Jovem; Adulto; Senior)

In [148]:
def catAge(age):
    if age < 30:
        return "Jovem"
    elif 30 <= age <= 40:
        return "Adulto"
    else:
        return "Senior"


catAge_udf = F.udf(catAge, StringType())

df_catAge = df.withColumn("Age Category", catAge_udf(df["Age"]))
df_catAge.show()

+-----+---+--------------+------------+
| Name|Age|    Occupation|Age Category|
+-----+---+--------------+------------+
|Alice| 34|Data Scientist|      Adulto|
|  Bob| 45| Data Engineer|      Senior|
|Cathy| 29|  Data Analyst|       Jovem|
|David| 35|Data Scientist|      Adulto|
+-----+---+--------------+------------+



#### Adicionando a diferença de idades por individuos, usando a function Window 

In [149]:


winOcc = Window.partitionBy("Occupation")

df_winOcc_avg = df.withColumn("Average Age", F.avg("Age").over(winOcc))

# Cálculo da diferença entre a idade individual e a média
df_ageDiff = df_winOcc_avg.withColumn("Age Diff", F.col("Age") - F.col("Average Age"))
df_ageDiff.show()

+-----+---+--------------+-----------+--------+
| Name|Age|    Occupation|Average Age|Age Diff|
+-----+---+--------------+-----------+--------+
|Cathy| 29|  Data Analyst|       29.0|     0.0|
|  Bob| 45| Data Engineer|       45.0|     0.0|
|Alice| 34|Data Scientist|       34.5|    -0.5|
|David| 35|Data Scientist|       34.5|     0.5|
+-----+---+--------------+-----------+--------+



## Parte 3: Performance e Otimização
### Particionamento

#### O particionamento melhora a performance ao distribuir os dados em diferentes partições, o que permite que operações de leitura e escrita sejam executadas em paralelo essa performance é escalavel quando ultrapassamos esse exemplo e deparamos com um cenário de big data, onde o custo de fazer um filtro simples aumenta o recurso computacional disposto. Segue um exemplo de partição para os dados:

In [150]:
df_part = df.repartition(4, "Occupation")
df_part.show()

+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Cathy| 29|  Data Analyst|
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|David| 35|Data Scientist|
+-----+---+--------------+



#### O broadcast se beneficia da performance, ao atuarmos com sistemas distribuidos onde é uma técnica de otimização onde um DataFrame pequeno é enviado para todos os nós de um cluster, permitindo que o join seja realizado localmente em cada nó, diminuindo o fluxo de dados por nós, assim reduzindo a movimentação de dados.

In [151]:

df_broadCast = spark.createDataFrame([("Data Scientist", 100000), ("Data Engineer", 95000)], ["Occupation", "Salary"])


# broadCast Join usando o `Occupation` como indexador 
result_df = df.join(F.broadcast(df_broadCast), "Occupation")
result_df.show()

+--------------+-----+---+------+
|    Occupation| Name|Age|Salary|
+--------------+-----+---+------+
|Data Scientist|Alice| 34|100000|
| Data Engineer|  Bob| 45| 95000|
|Data Scientist|David| 35|100000|
+--------------+-----+---+------+



## Parte 4: Integração com Outras Tecnologias
### Leitura e Escrita de Dados

In [152]:
df.show()

+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|Cathy| 29|  Data Analyst|
|David| 35|Data Scientist|
+-----+---+--------------+



In [157]:
# Para o exemplo vou utilizar o própio df que criei no começo do teste 
df2= pd.DataFrame(data, columns=columns)
df2.to_csv("data/dim_colaborador.csv", index=False)

# Leitura de CSV
df_csv = spark.read.csv("data/dim_colaborador.csv", header=True, inferSchema=True)

# Escrita em Parquet
# df_csv.write.mode("overwrite").save("dim_colaborador.parquet")
# df_csv.show()

## Para exemplo de interação com o HDFS seguem o script em questão onde retiro do Escrevo uma tabela de exemplo no hadoop

```{python3}
spark.conf.set("fs.defaultFS", "hdfs://namenode:8020")
# Leitura 
df_hdfs = spark.read.csv("hdfs://namenode:8020/input/path/file.csv", header=True, inferSchema=True)

# Filtro
df_filtered = df_hdfs.filter(df_hdfs["column"] > 100)

# Escrita 
df_filtered.write.csv("hdfs://namenode:8020/output/path/file.csv")
```

In [155]:
'''
spark.conf.set("fs.defaultFS", "hdfs://namenode:8020")
# Leitura 
df_hdfs = spark.read.csv("hdfs://namenode:8020/input/path/file.csv", header=True, inferSchema=True)

# Filtro
df_filtered = df_hdfs.filter(df_hdfs["column"] > 100)

# Escrita 
df_filtered.write.csv("hdfs://namenode:8020/output/path/file.csv")
'''



## Parte 5: Problema de Caso
### Processamento de Logs

## Demonstrando a leitura do Log solicitado com exemplos 

In [None]:
'''
# Demonstrando Leitura de log 
df_logs = spark.read.csv("hdfs://namenode:8020/data/log.csv", header=True, inferSchema=True)

# Contagem por usuário
df_userId = df_logs.groupBy("user_id").count()

# Identificação dos 10 usuários mais ativos
df_userId10 = df_userId.orderBy("count", ascending=False).limit(10)

# Salvamento do resultado em um arquivo CSV
df_userId10.write.csv("hdfs://namenode:8020/data/df_userId10.csv")
'''

# Obs: 
## A partir da interação com o hadoop ultilizei o cluster no ambiente databricks com as minhas credenciais  (não serão disponibilizadas), mas as configurações foram essas: 
```{python}
spark.conf.set("fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net", "<your-access-key>")

df_hdfs = spark.read.csv("wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/teste.csv", header=True, inferSchema=True)

```