Parte 1: Manipulação de Dados

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [21]:
spark = SparkSession.builder \
    .master("local[8]")\  # Usa 2 núcleos de CPU
    .appName("Inter Test") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()



SyntaxError: unexpected character after line continuation character (3993656162.py, line 2)

In [3]:
data = [("Alice", 34, "Data Scientist"),
        ("Bob", 45, "Data Engineer"),
        ("Cathy", 29, "Data Analyst"),
        ("David", 35, "Data Scientist")]

columns = ["Name", "Age", "Occupation"]

In [4]:
df = spark.createDataFrame(data, columns)

In [5]:
df.show()

+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|Cathy| 29|  Data Analyst|
|David| 35|Data Scientist|
+-----+---+--------------+



In [6]:
df_filtrado = df.select("Name", "Age").filter(col("Age") > 30)

In [7]:
df_filtrado.show()

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|David| 35|
+-----+---+



In [8]:
df_agrupado = df.groupBy("Occupation").avg("Age")

In [9]:
df_agrupado.show()

+--------------+--------+
|    Occupation|avg(Age)|
+--------------+--------+
|Data Scientist|    34.5|
| Data Engineer|    45.0|
|  Data Analyst|    29.0|
+--------------+--------+



In [10]:
df_agrupado_ordenado = df_agrupado.orderBy(col("avg(Age)").desc())

In [11]:
df_agrupado_ordenado.show()

+--------------+--------+
|    Occupation|avg(Age)|
+--------------+--------+
| Data Engineer|    45.0|
|Data Scientist|    34.5|
|  Data Analyst|    29.0|
+--------------+--------+



Parte 2: Funções Avançadas

In [12]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [13]:
def ano_categorizado(age):
    if age < 30:
        return "Jovem"
    elif 30 <= age <= 40:
        return "Adulto"
    else:
        return "Senior"

In [14]:
ano_categorizado_udf = udf(ano_categorizado, StringType())

In [15]:
df_por_categoria = df.withColumn("AnoCategorizado", ano_categorizado_udf(col("Age")))

In [20]:
df_por_categoria.show()

+-----+---+--------------+---------------+
| Name|Age|    Occupation|AnoCategorizado|
+-----+---+--------------+---------------+
|Alice| 34|Data Scientist|         Adulto|
|  Bob| 45| Data Engineer|         Senior|
|Cathy| 29|  Data Analyst|          Jovem|
|David| 35|Data Scientist|         Adulto|
+-----+---+--------------+---------------+



In [22]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

In [23]:
windowSpec = Window.partitionBy("Occupation")

In [25]:
df_with_diff = df.withColumn("AgeDiff", col("Age") - avg("Age").over(windowSpec))

In [26]:
df_with_diff.show()

+-----+---+--------------+-------+
| Name|Age|    Occupation|AgeDiff|
+-----+---+--------------+-------+
|Cathy| 29|  Data Analyst|    0.0|
|  Bob| 45| Data Engineer|    0.0|
|Alice| 34|Data Scientist|   -0.5|
|David| 35|Data Scientist|    0.5|
+-----+---+--------------+-------+



Parte 3: Performance e Otimização

O particionamento segmenta os dados em frações menores, permitindo uma otimização das operações distribuídas em clusters. Por exemplo, ao dividir um DataFrame com base numa coluna, como "Occupation", podemos aceder e processar apenas as partições que são pertinentes em consultas futuras.

In [28]:
df_partiicionado = df.repartition(4, col("Occupation"))



In [30]:
df_partiicionado.write.partitionBy("Occupation").parquet("output_path")

O Broadcast Join envia pequenos DataFrames para todos os nós, facilitando a junção sem a necessidade de redistribuir dados, o que torna o processo mais eficiente.

In [31]:
from pyspark.sql.functions import broadcast

In [32]:
small_df = spark.createDataFrame([("Data Scientist", "Tech")], ["Occupation", "Industry"])

In [33]:
df_joined = df.join(broadcast(small_df), "Occupation")

In [34]:
df_joined.show()

+--------------+-----+---+--------+
|    Occupation| Name|Age|Industry|
+--------------+-----+---+--------+
|Data Scientist|Alice| 34|    Tech|
|Data Scientist|David| 35|    Tech|
+--------------+-----+---+--------+



Parte 4: Integração com Outras Tecnologias

In [36]:

output_path = "c:/Users/lacer/teste_act_inter/df_output.csv"

df.write.csv(output_path, header=True, mode="overwrite")

In [37]:
df_read = spark.read.csv(output_path, header=True, inferSchema=True)

In [38]:
df_read.show()


+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|David| 35|Data Scientist|
|Cathy| 29|  Data Analyst|
|  Bob| 45| Data Engineer|
+-----+---+--------------+



O PySpark se une ao Hadoop HDFS para efetuar leitura e gravação eficientes de dados. Pois o Spark é capaz de ler e gravar dados diretamente em sistemas de arquivos distribuídos, como o HDFS, sem precisar mover os dados para o armazenamento local do nó Spark. Isso torna possível executar operações de E/S em uma máquina muito grande.

In [None]:
df_hdfs = spark.read.csv("hdfs://namenode:8020/path_to_csv", header=True)

In [None]:
df_hdfs.write.csv("hdfs://namenode:8020/output_path.csv")

Parte 5: Problema de Caso

In [40]:
# Codigo para criar log de exemplo para problema de caso
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Gerar dados fictícios
np.random.seed(0)  # Para reprodutibilidade
n_rows = 5000  # Ajuste o número de linhas conforme necessário
start_date = datetime.now() - timedelta(days=30)

timestamps = [start_date + timedelta(minutes=np.random.randint(0, 43200)) for _ in range(n_rows)]  # 30 dias
user_ids = np.random.randint(1, 101, size=n_rows)  # IDs de usuário de 1 a 100
actions = np.random.choice(['login', 'logout', 'purchase', 'view'], size=n_rows)

# Criar DataFrame
df = pd.DataFrame({
    'timestamp': timestamps,
    'user_id': user_ids,
    'action': actions
})

# Salvar em um arquivo CSV
file_path = 'log_file.csv'
df.to_csv(file_path, index=False)

print(f"Arquivo de log criado: {file_path}")


Arquivo de log criado: log_file.csv


In [41]:
file_path = 'log_file.csv'

In [42]:
df = pd.read_csv(file_path)

In [43]:
user_action_counts = df['user_id'].value_counts()

In [44]:
top_10_users = user_action_counts.head(10)

In [45]:
top_10_users.to_csv('top_10_users.csv', header=['action_count'])

A função pd.read_csv(file_path) é utilizada para carregar o ficheiro de log num DataFrame. Para contabilizar as ações, df['user_id'].value_counts() realiza a contagem do número de ações por cada utilizador. Para identificar os 10 utilizadores mais ativos, usa-se user_action_counts.head(10). Por fim, para gravar os dados num ficheiro CSV, aplica-se top_10_users.to_csv('top_10_users.csv', header=['action_count']). Assegure-se de modificar o caminho do ficheiro de log (file_path) e o nome do ficheiro CSV que será gerado (top_10_users.csv) conforme necessário.