#### This chapter covers
- Joining two data frames together
- Selecting the right type of join for your use case
- Grouping data and understanding the
- GroupedData transitional object
- Breaking the GroupedData with an aggregation method
- Filling null values in your data frame

In [0]:
import pyspark.sql.functions as F

path_logs = ("/Volumes/workspace/dataanalysispysparkbook/bronze_files/eda/")
path_reference_tables = ("/Volumes/workspace/dataanalysispysparkbook/bronze_files/eda/ReferenceTables/")

logs = (spark.read.format("csv")
                .option("sep", "|")
                .option("inferSchema", "true")
                .option("header", "true")
                .load(path_logs + "BroadcastLogs_2018_Q3_M8_sample.CSV")
)                
display(logs.limit(5))

log_identifier = (spark.read.format("csv")
                .option("sep", "|")
                .option("inferSchema", "true")
                .option("header", "true")
                .load(path_reference_tables + "LogIdentifier.csv")
)                
display(log_identifier.limit(5))



#### Estructura de los JOIN
[LEFT].join(
  [RIGHT],
  on=[PREDICATES]
  how=[METHOD]
)

#### Inner Join

In [0]:
# Forma Sencilla
result = (logs.join(log_identifier, "LogServiceID", "inner")
                .select(logs.LogServiceID, log_identifier.LogServiceID, "BroadcastLogID", "LogIdentifierID")
)

# Forma con parametros
result_2 = (logs.join(log_identifier, 
                    on =  "LogServiceID", 
                    how = "inner"
                )
                .select(logs.LogServiceID, log_identifier.LogServiceID, "BroadcastLogID", "LogIdentifierID")
)

# Forma con más explicita
logs_and_channels_verbose = logs.join(log_identifier, 
                                        logs["LogServiceID"] == log_identifier["LogServiceID"]
                                )

result.limit(2).display()
result_2.limit(2).display()
logs_and_channels_verbose.printSchema()

In [0]:
result = (logs.join(log_identifier, "LogServiceID", "inner")
            .groupBy("LogServiceID")
            .count()
            #.agg(F.count("LogServiceID").alias("Count_by_LogServiceID"))
            .orderBy(F.asc("LogServiceID"))
)
result.limit(5).display()

LEFT JOIN

In [0]:
result = (logs.join(log_identifier, "LogServiceID", "left")
                .select(logs.LogServiceID, log_identifier.LogServiceID, "BroadcastLogID", "LogIdentifierID")
)          
result.limit(5).display()

#### Varios condicionales

In [0]:
# DataFrame 1
data1 = [
    (1, "Ana", 2022),
    (2, "Luis", 2023),
    (3, "Carlos", 2022),
    (4, "Julia", 2024)
]
df1 = spark.createDataFrame(data1, ["id", "nombre", "anio"])

# DataFrame 2
data2 = [
    (1, "Lima", 2022),
    (2, "Arequipa", 2022),
    (3, "Trujillo", 2023),
    (4, "Cusco", 2024)
]
df2 = spark.createDataFrame(data2, ["id", "ciudad", "anio"])

# forma usando tabla.columna
df_join = df1.join(
    df2,
    ((df1.id == df2.id) & (df1.anio == df2.anio)),
    "inner"
)
df_join.show(2)

# Forma con el nombre de las tablas
df_join_1 = df1.join(
    df2,
    (df1["id"] == df2["id"]) & (df1["anio"] == df2["anio"]),
    how="inner"
)
df_join_1.show(2)

# Usando Alias para las tablas
df1_alias = df1.alias("a")
df2_alias = df2.alias("b")

df_join_alias = df1_alias.join(
    df2_alias,
    (F.col("a.id") == F.col("b.id")) & (F.col("a.anio") == F.col("b.anio")),
    "inner"
)
df_join_alias.show(2)

# Operador OR
df_join_2 = df1.join(
    df2,
    (df1["id"] == df2["id"]) | (df1["anio"] == df2["anio"]),
    how="inner"
)
df_join_2.show(2)



Columns with the same name, para evitar la ambiguedad se debe colar de predicado la tabla a la que pertenece. Tabla.Columna

In [0]:
from pyspark.errors import AnalysisException

try:
    logs_and_channels_verbose.select("LogServiceID")
except AnalysisException as err:
    print(err)

logs_and_channels = logs.join(log_identifier, "LogServiceID")
logs_and_channels.limit(2).display()

Hacer JOIN con más de una tabla

In [0]:
cd_category = spark.read.csv(
  path_reference_tables + "CD_Category.csv",
  sep="|",
  header=True,
  inferSchema=True,
).select(
  "CategoryID",
  "CategoryCD",
  F.col("EnglishDescription").alias("Category_Description"),
)

cd_program_class = spark.read.csv(
  path_reference_tables + "CD_ProgramClass.csv",
  sep="|",
  header=True,
  inferSchema=True,
).select(
  "ProgramClassID",
  "ProgramClassCD",
  F.col("EnglishDescription").alias("ProgramClass_Description"),
)

full_log = (logs_and_channels.join(cd_category, "CategoryID", how="left")
                             .join(cd_program_class, "ProgramClassID", how="left")
)
full_log.limit(2).display()

#### Exercise 5.1
Assume two tables, left and right, each containing a column named my_column.  
What is the result of this code?  
one = left.join(right, how="left_semi", on="my_column").  
two = left.join(right, how="left_anti", on="my_column").  
one.union(two)

#### Exercise 5.2
Assume two data frames, red and blue. Which is the appropriate join to use in red.join(blue, …) if you want to join red and blue and keep all the records satisfying the predicate?
1. Left
2. Right
3. Inner "Correct"
4. Theta
5. Cross

#### Exercise 5.3
Assume two data frames, red and blue. Which is the appropriate join to use in red.join(blue, …) if you want to join red and blue and keep all the records satisfying the predicate and the records in the blue table?.
1. Left
2. Right "Correct"
3. Inner
4. Theta
5. Cross

#### Agrupando por varios campos

In [0]:
from pyspark.sql import functions as F

# 1. Parseamos la columna ISO 8601 como timestamp real
full_log = full_log.withColumn("Duration_ts", F.to_timestamp("Duration"))

# 2. Extraemos solo la hora, minuto, segundo de ese timestamp
full_log = full_log.withColumn("hours", F.hour("Duration_ts"))
full_log = full_log.withColumn("minutes", F.minute("Duration_ts"))
full_log = full_log.withColumn("seconds", F.second("Duration_ts"))

# 3. Calculamos total en segundos
full_log = full_log.withColumn(
    "Duration_seconds",
    F.col("hours") * 3600 + F.col("minutes") * 60 + F.col("seconds")
)

# 4. Agrupamos y sumamos como antes
agg_df = full_log.groupBy("ProgramClassCD", "ProgramClass_Description") \
    .agg(F.sum("Duration_seconds").alias("duration_total")) \
    .orderBy(F.desc("duration_total"))

# 5. Mostramos el resultado
agg_df.show(5, truncate=False)


In [0]:
from pyspark.sql import functions as F

answer = (
    full_log.groupBy("LogIdentifierID")
    .agg(
        F.sum(
            F.when(
                F.trim(F.col("ProgramClassCD")).isin(
                    ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
                ),
                F.col("Duration_seconds"),
            ).otherwise(0)
        ).alias("duration_commercial"),
        F.sum("Duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio",
        F.when(F.col("duration_total") != 0,
               F.col("duration_commercial") / F.col("duration_total")
        ).otherwise(None)  # o 0.0 si prefieres
    )
)

answer.orderBy("commercial_ratio", ascending=False).show(5, False)