In [32]:
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

from IPython.core.display import HTML                                   #Função renderização html no Jupyter
display(HTML("<style>pre { white-space: pre !important; }</style>"))    #Função renderização html no Jupyter

In [33]:
spark = SparkSession.builder.appName("Examples_Spark").getOrCreate()

In [34]:
schema_other = T.StructType()\
.add("ID", T.IntegerType(), True)\
.add("INFO", T.StringType(), True)\
.add("COD_A", T.IntegerType(), True)\
.add("COD_B", T.IntegerType(), True)\
.add("COD_C", T.IntegerType(), True)\
.add("DT_REF", T.DateType(), True)\
 
data_other = [
(1, "ALFA", 10, 36, 49, datetime.strptime("2024-01-01", "%Y-%m-%d").date()),
(1, "ALFA", 10, 28, 49, datetime.strptime("2024-01-01", "%Y-%m-%d").date()),
(1, "ALFA", 10, 49, 36, datetime.strptime("2024-01-01", "%Y-%m-%d").date()),
(1, "ALFA", 20, 73, 49, datetime.strptime("2024-08-01", "%Y-%m-%d").date()),
(1, "ALFA", 20, 39, 50, datetime.strptime("2024-08-01", "%Y-%m-%d").date()),
(2, "BETA", 70, 39, 50, datetime.strptime("2024-03-01", "%Y-%m-%d").date()),
(2, "BETA", 70, 39, 50, datetime.strptime("2024-03-01", "%Y-%m-%d").date()),
(2, "BETA", 80, 39, 50, datetime.strptime("2024-10-01", "%Y-%m-%d").date()),
(2, "BETA", 80, 39, 50, datetime.strptime("2024-10-01", "%Y-%m-%d").date()),
(2, "BETA", 80, 39, 50, datetime.strptime("2024-10-01", "%Y-%m-%d").date()),
]
 
df_other = spark.createDataFrame(data=data_other, schema=schema_other)
df_other.show(truncate=False)

+---+----+-----+-----+-----+----------+
|ID |INFO|COD_A|COD_B|COD_C|DT_REF    |
+---+----+-----+-----+-----+----------+
|1  |ALFA|10   |36   |49   |2024-01-01|
|1  |ALFA|10   |28   |49   |2024-01-01|
|1  |ALFA|10   |49   |36   |2024-01-01|
|1  |ALFA|20   |73   |49   |2024-08-01|
|1  |ALFA|20   |39   |50   |2024-08-01|
|2  |BETA|70   |39   |50   |2024-03-01|
|2  |BETA|70   |39   |50   |2024-03-01|
|2  |BETA|80   |39   |50   |2024-10-01|
|2  |BETA|80   |39   |50   |2024-10-01|
|2  |BETA|80   |39   |50   |2024-10-01|
+---+----+-----+-----+-----+----------+



## Jeito Simples

In [35]:
max_dates = df_other.groupBy("ID").agg(F.max("DT_REF").alias("MAX_DT_REF"))

df_simple_result = df_other.join(max_dates, (df_other.ID == max_dates.ID) & (df_other.DT_REF == max_dates.MAX_DT_REF)).select(df_other["*"])
df_simple_result.show(truncate=False)

+---+----+-----+-----+-----+----------+
|ID |INFO|COD_A|COD_B|COD_C|DT_REF    |
+---+----+-----+-----+-----+----------+
|1  |ALFA|20   |73   |49   |2024-08-01|
|1  |ALFA|20   |39   |50   |2024-08-01|
|2  |BETA|80   |39   |50   |2024-10-01|
|2  |BETA|80   |39   |50   |2024-10-01|
|2  |BETA|80   |39   |50   |2024-10-01|
+---+----+-----+-----+-----+----------+



## Jeito Pouco Complexo

In [36]:
df_other = df_other.withColumn("ID_INFO", F.concat_ws("_", F.col("ID"), F.col("INFO")))
window_spec = Window.partitionBy("ID_INFO").orderBy(F.col("DT_REF").desc())
df_other = df_other.withColumn("MAX_DT_REF", F.max("DT_REF").over(window_spec))

df_complex_result = df_other.filter(F.col("DT_REF") == F.col("MAX_DT_REF")).drop("MAX_DT_REF", "ID_INFO")
df_complex_result.show(truncate=False)

+---+----+-----+-----+-----+----------+
|ID |INFO|COD_A|COD_B|COD_C|DT_REF    |
+---+----+-----+-----+-----+----------+
|1  |ALFA|20   |73   |49   |2024-08-01|
|1  |ALFA|20   |39   |50   |2024-08-01|
|2  |BETA|80   |39   |50   |2024-10-01|
|2  |BETA|80   |39   |50   |2024-10-01|
|2  |BETA|80   |39   |50   |2024-10-01|
+---+----+-----+-----+-----+----------+

