##### Bibliotecas necessárias e configurações de ambiente

In [1]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc, udf, broadcast
from pyspark.sql.types import StringType
from pyspark.sql.window import Window

# Set variáveis de ambiente
os.environ['PYSPARK_PYTHON']=sys.executable
os.environ['PYSPARK_DRIVER_PYTHON']=sys.executable

# Cria uma sessão Spark
spark=SparkSession.builder \
    .appName("actdigital") \
    .getOrCreate()

In [2]:
# Dados disponibilizados
data=[
    ("Alice", 34, "Data Scientist"),
    ("Bob", 45, "Data Engineer"),
    ("Cathy", 29, "Data Analyst"),
    ("David", 35, "Data Scientist")
]
columns=["Name", "Age", "Occupation"]

##### Parte 1: Manipulação de Dados

In [3]:
# Crie um DataFrame a partir dos dados fornecidos
df=spark.createDataFrame(data, schema=columns)
df.show()

+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|Cathy| 29|  Data Analyst|
|David| 35|Data Scientist|
+-----+---+--------------+



In [4]:
# Selecione apenas as colunas "Name" e "Age" 
df.select("name", "age").show()

+-----+---+
| name|age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|Cathy| 29|
|David| 35|
+-----+---+



In [5]:
# Filtre as linhas onde a "Age" é maior que 30
df.filter("Age > 30").show()

+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|David| 35|Data Scientist|
+-----+---+--------------+



In [6]:
# Agrupe os dados pelo campo "Occupation" e calcule a média de "Age" para cada grupo
df.groupBy("Occupation").agg(
    avg("Age")
    ).show()

+--------------+--------+
|    Occupation|avg(Age)|
+--------------+--------+
|Data Scientist|    34.5|
| Data Engineer|    45.0|
|  Data Analyst|    29.0|
+--------------+--------+



In [7]:
# Ordene o DataFrame resultante da questão anterior pela média de "Age" em ordem decrescente
df.groupBy("Occupation").agg(
    avg("Age")).orderBy(desc(
    "avg(Age)")).show()

+--------------+--------+
|    Occupation|avg(Age)|
+--------------+--------+
| Data Engineer|    45.0|
|Data Scientist|    34.5|
|  Data Analyst|    29.0|
+--------------+--------+



##### Parte 2: Funções Avançadas

In [8]:
# Crie uma função em Python que converte idades para categorias
# e aplique esta função ao DataFrame usando uma UDF

# Função python
def age_category(age):
    if age < 30:
        return "Jovem"
    elif 30 < age < 40:
        return "Adulto"
    else:
        return "Senior"

# Converter para UDF
age_category_udf=udf(age_category, StringType())

# Criar coluna com categorias
df=df.withColumn("AgeCategory", age_category_udf(df["Age"]))
df.show()

+-----+---+--------------+-----------+
| Name|Age|    Occupation|AgeCategory|
+-----+---+--------------+-----------+
|Alice| 34|Data Scientist|     Adulto|
|  Bob| 45| Data Engineer|     Senior|
|Cathy| 29|  Data Analyst|      Jovem|
|David| 35|Data Scientist|     Adulto|
+-----+---+--------------+-----------+



In [9]:
# Adicione uma coluna que mostre a diferença de idade entre
# cada indivíduo e a média de idade do seu "Occupation"

# Especificar condicional da janela
windowSpec=Window.partitionBy("Occupation")

# Calcular a idade média da ocupação
df=df.withColumn("AvgOccupationAge", avg(
    "Age").over(windowSpec))

# Diferença em relação à média da ocupação
df=df.withColumn("AgeDifference", col(
    "Age") - col("AvgOccupationAge"))
df.show()

+-----+---+--------------+-----------+----------------+-------------+
| Name|Age|    Occupation|AgeCategory|AvgOccupationAge|AgeDifference|
+-----+---+--------------+-----------+----------------+-------------+
|Cathy| 29|  Data Analyst|      Jovem|            29.0|          0.0|
|  Bob| 45| Data Engineer|     Senior|            45.0|          0.0|
|Alice| 34|Data Scientist|     Adulto|            34.5|         -0.5|
|David| 35|Data Scientist|     Adulto|            34.5|          0.5|
+-----+---+--------------+-----------+----------------+-------------+



##### Parte 3: Performance e Otimização

Explique como o particionamento pode ser usado para melhorar a performance em operações de leitura e escrita de dados em PySpark.
- No PySpark, o particionamento de dados refere-se ao processo de divisão de um grande conjunto de dados em pedaços ou partições menores, que podem ser processados ​​simultaneamente. Este é um aspecto importante da computação distribuída, pois permite que grandes conjuntos de dados sejam processados ​​de forma mais eficiente, dividindo a carga de trabalho entre várias máquinas ou processadores. Cada partição pode ser processada em paralelo por diferentes executores no cluster, permitindo que grandes conjuntos de dados sejam manipulados de forma mais eficiente.

Dê um exemplo de código que particiona um DataFrame por uma coluna específica.

In [None]:
# Exemplo de particionamento na escrita de dados
# utilizando como critério de partição a ocupação
df.write.partitionBy("Occupation").format("parquet").save("data/")

Descreva o conceito de Broadcast Join em PySpark e como ele pode ser usado para otimizar operações de join.
- O Broadcast Join é uma técnica de otimização em PySpark que melhora o desempenho das operações de junção (joins) quando uma das tabelas envolvidas é pequena o suficiente para caber na memória do executador. Em vez de realizar a junção de forma distribuída, que pode ser ineficiente e lenta, o Broadcast Join permite que a tabela pequena seja replicada e distribuída para todos os nós do cluster. Isso reduz a necessidade de shuffling e comunicação entre os nós durante o processo de junção.

Implemente um exemplo de Broadcast Join entre dois DataFrames.

In [10]:
# Criar uma sessão Spark
spark=SparkSession.builder.appName("BroadcastJoinExample").getOrCreate()

# Criar DataFrames de exemplo
data1=[
    (1, "John", 30),
    (2, "Jane", 25),
    (3, "Mike", 35),
    (4, "Lisa", 28)
]
columns1=["id", "name", "age"]
df1=spark.createDataFrame(data1, columns1)

data2=[
    (1, "IT"),
    (2, "HR"),
    (3, "Finance")
]
columns2=["id", "department"]
df2=spark.createDataFrame(data2, columns2)

# Realizar o Broadcast Join
result = df1.join(broadcast(df2), on="id", how="inner")
result.show()

+---+----+---+----------+
| id|name|age|department|
+---+----+---+----------+
|  1|John| 30|        IT|
|  2|Jane| 25|        HR|
|  3|Mike| 35|   Finance|
+---+----+---+----------+



##### Parte 4: Integração com Outras Tecnologias

Demonstre como ler dados de um arquivo CSV e escrever o resultado em um formato Parquet.

In [None]:
# Criar uma sessão Spark
spark=SparkSession.builder \
    .appName("CSVparaParquet") \
    .getOrCreate()

# Caminho para o arquivo CSV
csv_file_path='data/professions.csv'

# Ler arquivo CSV em um DataFrame
df=spark.read.csv(csv_file_path, header=True, inferSchema=True)
df.show()

In [None]:
# Escrever dados em parquet
df.write.parquet('data/professions.parquet', mode='overwrite')
spark.stop()

Explique como PySpark se integra com o Hadoop HDFS para leitura e escrita de dados.
- O PySpark se integra com o Hadoop HDFS (Hadoop Distributed File System) para leitura e escrita de dados, permitindo que o Spark aproveite a escalabilidade e a resiliência do HDFS. A integração entre PySpark e HDFS é uma característica fundamental para trabalhar com grandes volumes de dados em ambientes distribuídos.

Dê um exemplo de código que leia um arquivo do HDFS e salve o resultado de volta no HDFS.

In [None]:
# Criar uma sessão Spark
spark=SparkSession.builder \
    .appName("ReadAndWriteToHDFS") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()

# Caminho do arquivo CSV no HDFS
arquivo="hdfs://namenode:9000/path/to/input.csv"

# Caminho para salvar o arquivo Parquet no HDFS
pasta_salvar="hdfs://namenode:9000/path/to/output.parquet"

# Ler dados do arquivo CSV no HDFS
df=spark.read.csv(arquivo, header=True, inferSchema=True)

# Realizar alguma transformação, como por exemplo,
# adicionar uma nova coluna com valor constante
df=df.withColumn("nova_coluna", df["coluna_antiga"] + 10)

# Escrever o DataFrame transformado de volta para o HDFS no formato Parquet
df.write.parquet(pasta_salvar, mode='overwrite')
spark.stop()

##### Parte 5: Problema de Caso

Considere que você tem um grande arquivo de log com as seguintes colunas: "timestamp", "user_id", "action". Cada linha representa uma ação realizada por um usuário em um determinado momento.

Carregue o arquivo de log em um DataFrame.

In [12]:
# Inicializa a SparkSession
spark=SparkSession.builder \
    .appName("LogAnalysis") \
    .getOrCreate()

# Carrega o arquivo de log em um DataFrame
file_path="data/log_data.csv"
log_df=spark.read.csv(file_path, header=True, inferSchema=True)
log_df.show()

+-------------------+-------+--------+
|          timestamp|user_id|  action|
+-------------------+-------+--------+
|2023-04-13 18:22:23|     41|purchase|
|2023-05-12 06:46:37|     27|purchase|
|2023-01-07 11:30:50|     37|purchase|
|2023-04-27 00:31:34|     12|    view|
|2023-12-01 05:14:59|     75|purchase|
|2023-12-28 11:11:23|     18|  logout|
|2023-01-27 13:19:35|     29|   login|
|2023-02-22 19:01:09|     35|  logout|
|2023-07-21 04:08:05|     75|purchase|
|2023-08-20 23:59:29|     61|   click|
|2023-07-05 19:23:02|     88|   login|
|2023-09-11 02:49:11|     46|  logout|
|2023-02-12 11:02:10|     48|  logout|
|2023-06-24 22:14:24|     64|   click|
|2023-02-08 16:34:45|      4|purchase|
|2023-09-21 15:31:24|     16|   login|
|2023-03-05 02:07:22|     44|    view|
|2023-01-05 06:41:35|     79|purchase|
|2023-11-02 19:44:41|      2|    view|
|2023-02-08 09:20:24|     94|   login|
+-------------------+-------+--------+
only showing top 20 rows



Conte o número de ações realizadas por cada usuário.

In [14]:
# Conta o número de ações realizadas por cada usuário
user_actions_df=log_df.groupBy("user_id").count()
user_actions_df.show()

+-------+-----+
|user_id|count|
+-------+-----+
|     31|   22|
|     85|   28|
|     65|   16|
|     53|   32|
|     78|   30|
|     34|   19|
|     81|   24|
|     28|   16|
|     76|   20|
|     27|   20|
|     26|   15|
|     44|   20|
|     12|   19|
|     91|   25|
|     22|   15|
|     93|   21|
|     47|   15|
|      1|   21|
|     52|   20|
|     13|   31|
+-------+-----+
only showing top 20 rows



Encontre os 10 usuários mais ativos.

In [15]:
# Encontra os 10 usuários mais ativos
top_users_df=user_actions_df.orderBy(col("count").desc()).limit(10)
top_users_df.show()

+-------+-----+
|user_id|count|
+-------+-----+
|     53|   32|
|     13|   31|
|     78|   30|
|     75|   30|
|     61|   29|
|      8|   29|
|     50|   29|
|     85|   28|
|     68|   28|
|     15|   27|
+-------+-----+



Salve o resultado em um arquivo CSV.

In [None]:
# Salva o resultado em um arquivo CSV
output_path = "data/logs.csv"
top_users_df.write.csv(output_path, header=True, mode='overwrite')
# Fecha a SparkSession
spark.stop()