<a href="https://colab.research.google.com/github/WILLIANSCASACOLA/Meus-Codigos/blob/main/PegarDataMinMaxDF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [36]:
from pyspark.sql import SparkSession # Importa a classe SparkSession, que é o ponto de entrada para usar PySpark
from pyspark.sql.window import Window # Importa a classe Window, usada para definir janelas de particionamento e ordenação
import pyspark.sql.functions as F # Importa o módulo de funções do PySpark com o apelido F (facilita a escrita de funções como F.min, F.max, etc.)
from datetime import datetime

In [37]:
spark = SparkSession.builder.appName('test').getOrCreate() # Cria ou acessa uma sessão Spark

In [38]:
# Define os dados de exemplo como uma lista de tuplas (protocolo, data)
dados = [
    ("A123", "2023-01-01"),
    ("A123", "2023-01-05"),
    ("B456", "2023-02-10"),
    ("B456", "2023-02-15"),
    ("A123", "2023-01-03"),
    ("A123", "2023-01-03"),
    ("A123", "2023-01-03"),
    ("A1234", "2023-01-03"),
    ("A1234", "2023-01-04")
]


In [39]:
# Cria um DataFrame a partir dos dados
df = spark.createDataFrame(dados, ["protocolo", "data"])


In [40]:
# Mostrar os dados
df.show()

+---------+----------+
|protocolo|      data|
+---------+----------+
|     A123|2023-01-01|
|     A123|2023-01-05|
|     B456|2023-02-10|
|     B456|2023-02-15|
|     A123|2023-01-03|
|     A123|2023-01-03|
|     A123|2023-01-03|
|    A1234|2023-01-03|
|    A1234|2023-01-04|
+---------+----------+



In [41]:
# Converte a coluna 'data' de string para tipo Date
df = df.withColumn("data", F.col("data").cast("date"))

In [42]:
df.show()

+---------+----------+
|protocolo|      data|
+---------+----------+
|     A123|2023-01-01|
|     A123|2023-01-05|
|     B456|2023-02-10|
|     B456|2023-02-15|
|     A123|2023-01-03|
|     A123|2023-01-03|
|     A123|2023-01-03|
|    A1234|2023-01-03|
|    A1234|2023-01-04|
+---------+----------+



In [43]:
# Define uma janela que particiona os dados por 'protocolo'
janela = Window.partitionBy("protocolo").orderBy("data")

In [44]:
# Adiciona duas novas colunas ao DataFrame:
# - 'data_minima': menor data dentro da janela por protocolo
# - 'data_maxima': maior data dentro da janela por protocolo
df = df.withColumn("data_minima", F.min("data").over(janela)) \
       .withColumn("data_maxima", F.max("data").over(janela))


In [45]:
df.show()

+---------+----------+-----------+-----------+
|protocolo|      data|data_minima|data_maxima|
+---------+----------+-----------+-----------+
|     A123|2023-01-01| 2023-01-01| 2023-01-01|
|     A123|2023-01-03| 2023-01-01| 2023-01-03|
|     A123|2023-01-03| 2023-01-01| 2023-01-03|
|     A123|2023-01-03| 2023-01-01| 2023-01-03|
|     A123|2023-01-05| 2023-01-01| 2023-01-05|
|    A1234|2023-01-03| 2023-01-03| 2023-01-03|
|    A1234|2023-01-04| 2023-01-03| 2023-01-04|
|     B456|2023-02-10| 2023-02-10| 2023-02-10|
|     B456|2023-02-15| 2023-02-10| 2023-02-15|
+---------+----------+-----------+-----------+



In [46]:

# Define o nome base do arquivo
nome_base = "arquivo"

# Pega data e hora atual formatada (ex: 2025-09-17_20-23)
data_hora = datetime.now().strftime("%Y-%m-%d_%H-%M")

# Concatena nome + data/hora + extensão
nome_arquivo = f"{nome_base}_{data_hora}.csv"

# Converte o DataFrame Spark para pandas
df_pandas = df.toPandas()

# Exporta para CSV com pandas usando o nome dinâmico
df_pandas.to_csv(f"/content/drive/MyDrive/Colab Notebooks/DataFrames/{nome_arquivo}", index=False)

In [47]:
print("Dados processados com êxito")

Dados processados com êxito
