<a href="https://colab.research.google.com/github/eotorres/teste_spark/blob/main/teste_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
## criando o drive
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


Criando a sessão e geraçao do dataframe

In [4]:
from pyspark.sql import SparkSession

# Inicialize o SparkSession
spark = SparkSession.builder.appName("TestApp").getOrCreate()

# Dados e colunas
data = [
    ("Alice", 34, "Data Scientist"),
    ("Bob", 45, "Data Engineer"),
    ("Cathy", 29, "Data Analyst"),
    ("David", 35, "Data Scientist")
]
columns = ["Name", "Age", "Occupation"]

# Crie o DataFrame
df = spark.createDataFrame(data, columns)

# Exiba o DataFrame
df.show()


+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|Cathy| 29|  Data Analyst|
|David| 35|Data Scientist|
+-----+---+--------------+



Fazendo a filtragem

In [5]:
# Selecione as colunas "Name" e "Age"
df_selected = df.select("Name", "Age")

# Filtre as linhas onde a "Age" é maior que 30
df_filtered = df_selected.filter(df_selected["Age"] > 30)

# Exiba o resultado
df_filtered.show()

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|David| 35|
+-----+---+



Executando a Media

In [6]:
from pyspark.sql.functions import avg

# Agrupe os dados pelo campo "Occupation" e calcule a média da "Age"
df_grouped = df.groupBy("Occupation").agg(avg("Age").alias("AverageAge"))

# Exiba o resultado
df_grouped.show()


+--------------+----------+
|    Occupation|AverageAge|
+--------------+----------+
|Data Scientist|      34.5|
| Data Engineer|      45.0|
|  Data Analyst|      29.0|
+--------------+----------+



Fazendo a ordenação

In [7]:
# Ordene o DataFrame pela média da "Age" em ordem decrescente
df_sorted = df_grouped.orderBy("AverageAge", ascending=False)

# Exiba o resultado
df_sorted.show()


+--------------+----------+
|    Occupation|AverageAge|
+--------------+----------+
| Data Engineer|      45.0|
|Data Scientist|      34.5|
|  Data Analyst|      29.0|
+--------------+----------+



Usando UDFs

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Defina a função de categorização
def age_category(age):
    if age < 30:
        return "Jovem"
    elif 30 <= age <= 40:
        return "Adulto"
    else:
        return "Senior"


age_category_udf = udf(age_category, StringType())
df_with_category = df.withColumn("AgeCategory", age_category_udf(df["Age"]))
df_with_category.show()


+-----+---+--------------+-----------+
| Name|Age|    Occupation|AgeCategory|
+-----+---+--------------+-----------+
|Alice| 34|Data Scientist|     Adulto|
|  Bob| 45| Data Engineer|     Senior|
|Cathy| 29|  Data Analyst|      Jovem|
|David| 35|Data Scientist|     Adulto|
+-----+---+--------------+-----------+



Diferença de idades entre pessoas

In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, mean

# Defina a janela para a média de idade por ocupação
window_spec = Window.partitionBy("Occupation")

# Calcule a média de idade por ocupação e adicione a coluna de diferença
df_with_age_diff = df.withColumn("AverageAgeByOccupation", mean("Age").over(window_spec)) \
                     .withColumn("AgeDifference", col("Age") - col("AverageAgeByOccupation"))

# Exiba o resultado
df_with_age_diff.show()


+-----+---+--------------+----------------------+-------------+
| Name|Age|    Occupation|AverageAgeByOccupation|AgeDifference|
+-----+---+--------------+----------------------+-------------+
|Cathy| 29|  Data Analyst|                  29.0|          0.0|
|  Bob| 45| Data Engineer|                  45.0|          0.0|
|Alice| 34|Data Scientist|                  34.5|         -0.5|
|David| 35|Data Scientist|                  34.5|          0.5|
+-----+---+--------------+----------------------+-------------+



Particionamento

In [13]:
output_path = '/content/drive/My Drive/particionado_por_occupation'
df.write.partitionBy("Occupation").format("parquet").save(output_path)



Brodcast join

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

# Inicializa a sessão Spark
spark = SparkSession.builder.appName("BroadcastJoinExample").getOrCreate()

# Cria dois DataFrames para o exemplo
data1 = [("Alice", "Data Scientist"),
         ("Bob", "Data Engineer"),
         ("Cathy", "Data Analyst")]

data2 = [("Alice", 34),
         ("Bob", 45),
         ("Cathy", 29)]

columns1 = ["Name", "Occupation"]
columns2 = ["Name", "Age"]

df1 = spark.createDataFrame(data1, columns1)
df2 = spark.createDataFrame(data2, columns2)

# Realiza o join usando Broadcast
df_joined = df1.join(broadcast(df2), "Name")
df_joined.show()

# Salvando o resultado do Broadcast Join no Google Drive como CSV
df_joined.write.csv('/content/drive/My Drive/broadcast_join_result.csv', header=True)


+-----+--------------+---+
| Name|    Occupation|Age|
+-----+--------------+---+
|Alice|Data Scientist| 34|
|  Bob| Data Engineer| 45|
|Cathy|  Data Analyst| 29|
+-----+--------------+---+



In [15]:
# Leitura de um arquivo CSV
df = spark.read.csv('/content/drive/My Drive/broadcast_join_result.csv', header=True, inferSchema=True)
# Escrever o DataFrame em formato Parquet
df.write.parquet('/content/drive/My Drive/broadcast_join_result')


In [22]:
## criando log
import pandas as pd
import random
from datetime import datetime, timedelta

# Função para gerar uma lista de timestamps
def generate_timestamps(start_time, num_entries, interval_minutes=1):
    return [(start_time + timedelta(minutes=i * interval_minutes)).strftime('%Y-%m-%d %H:%M:%S') for i in range(num_entries)]

# Parâmetros para a geração de dados
num_entries = 5000  # Número de entradas no log
num_users = 100  # Número de usuários distintos
actions = ["login", "logout", "purchase", "view", "click"]

# Gerando os dados
data = {
    "timestamp": generate_timestamps(datetime(2024, 9, 1, 12, 0, 0), num_entries),
    "user_id": [random.randint(1, num_users) for _ in range(num_entries)],
    "action": [random.choice(actions) for _ in range(num_entries)]
}

# Criando o DataFrame
df = pd.DataFrame(data)

# Salvando o DataFrame como CSV no Google Drive
df.to_csv('/content/drive/My Drive/log_file_t.csv', index=False)

print("Arquivo de log criado com sucesso.")



Arquivo de log criado com sucesso.


In [23]:
from pyspark.sql import SparkSession

# Inicializando a sessão Spark
spark = SparkSession.builder.appName("LogProcessing").getOrCreate()

# Montar o Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Caminho para o arquivo de log no Google Drive
log_file_path = '/content/drive/My Drive/log_file_t.csv'

# Leitura do arquivo de log
df_log = spark.read.csv(log_file_path, header=True, inferSchema=True)

# Contagem do número de ações por usuário
df_actions_count = df_log.groupBy("user_id").count()

# Encontrando os 10 usuários mais ativos
df_top_users = df_actions_count.orderBy("count", ascending=False).limit(10)

# Caminho para salvar o resultado no Google Drive
output_path = '/content/drive/My Drive/top_users_t.csv'

# Salvando o resultado em um arquivo CSV
df_top_users.write.csv(output_path, header=True)

print("Resultados salvos com sucesso.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Resultados salvos com sucesso.
