# Chapter 5 Joining and Grouping

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/07/26 21:27:54 WARN Utils: Your hostname, LAPTOP-CDHH1LA0 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/07/26 21:27:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/26 21:27:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# pull in the original logs dataframe
import os 
DIRECTORY = "./data/broadcast_logs"
logs = spark.read.csv(
    os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"), # the path to the target file is the only mandatory param
    sep="|", # field delimiter "," by default but this can have issues if dealing with text that contains commas
    header = True, # indicates if the file has a header row, can manually set column names with "schema" param
    inferSchema=True, # tells python to guess at schema, can manually specify schema as well
    timestampFormat = "yyyy-MM-dd", # tells python how to parse timestamps
).drop(
    "BroadcastLogID", "SequenceNO"
).withColumn("duration_seconds", (
        F.col("Duration").substr(1,2).cast("int") * 60 * 60
        + F.col("Duration").substr(4,2).cast("int") * 60
        + F.col("Duration").substr(7,2).cast("int")
    )
)
logs.printSchema()



root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

                                                                                

In [3]:
# pull in the log_identifier link table and only keep the rows where PrimaryFG == 1
log_identifier = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables", "LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True
).where(F.col("PrimaryFG") == 1)
log_identifier.printSchema()
log_identifier.show(5)

# LogIdentifierID is the channel identifier
# LogServiceID is the channel key which serves as the Foreign key used to map to the primary table
# PrimaryFG is a boolean flag indicating if the channel is the primary one (1) or not (0)

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)

+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
+---------------+------------+---------+
only showing top 5 rows



## Joining tables in PySpark

Bare bones join recipe:

```
[LEFT_DF].join(
    [RIGHT_DF],
    on = [PREDICATES],
    how = [METHOD]
)
```

- LEFT_DF and RIGHT_DF are the dataframes (tables) to join
- PREDICATES indicate how we determine which LEFT_DF records match with which RIGHT_DF records
    - records with multiple matches in the other dataframe/table are often duplicated depending on the Method
    - PREDICATES are any expression that evaluate to TRUE or FALSE indicating a match or no-match
        - "equi-joins" where you are testing for equality btw identically named columns can be specified by just listing the target columns
        - compound expressions are valid too: `(logs["LogServiceID"] == log_identifier["LogServiceID"]) & (logs["left_ col"] < log_identifier["right_col"])`
        - multiple and'd together statements can just be listed in a list: ` [left["col1"] == right["colA"], left["col2"] > right["colB"], left["col3"] != right["colC"]]`
- METHOD indicates how the join happens, basically the same as SQL INNER, OUTER, LEFT INNER ... joins

In [4]:
logs_and_channels = logs.join(
    log_identifier,
    on = "LogServiceID",
    how="inner" # inner is the default method, and thus this line could have been omitted
)

### 5.1.5 Naming conventions in the joining world

Normally PySpark won't let you create a data frame with two columns that share the same name.
If you attempt to use `withColumn` with an existing dataframe name, you will simply overwrite/shadow the existing one.
However this can break down with joining when you don't use the standard "equi-join" route.

e.g. it will happily join two dataframes with conflicting column names, but when you try to use the columns with conflicting names it is ambiguous which one you want to use leading to errors.

In [5]:
logs_and_channels_verbose_with_dup = logs.join(
    log_identifier, logs["LogServiceId"] == log_identifier["LogServiceId"]
)

# generates a dataframe with two "LogServiceId" columns
logs_and_channels_verbose_with_dup.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [6]:
from pyspark.sql.utils import AnalysisException

try:
    logs_and_channels_verbose_with_dup.select("LogServiceId")
except AnalysisException as err:
    print(err)

[AMBIGUOUS_REFERENCE] Reference `LogServiceId` is ambiguous, could be: [`LogServiceId`, `LogServiceId`].


In [7]:
# you can remove one of the duplicate columns by specifying the one to drop as origin information is retained
logs_and_channels_verbose_with_dup_dropped = logs_and_channels_verbose_with_dup.drop(log_identifier["LogServiceId"])
# with the duplicate column dropped, we can successfully select the LogServiceId column
logs_and_channels_verbose_with_dup_dropped.select("LogServiceId")
logs_and_channels_verbose_with_dup_dropped.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [8]:
# alternatively you can avoid all of this by using the simplified syntax for joining
logs_and_channels = logs.join(log_identifier, "LogServiceId")
logs_and_channels.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [9]:
# if you need to keep the duplicate columns and want to use F.col function, you can include embed src table names as aliases in the joined table.
logs_and_channels_with_aliases = logs.alias("left").join(
    log_identifier.alias("right"),
    logs["LogServiceId"] == log_identifier["LogServiceId"]
    )

# now you can use the source aliases with F.col to drop the duplicate columns
logs_and_channels_with_aliases.drop(F.col("right.LogServiceId")).select("LogServiceId")


DataFrame[LogServiceId: int]

In [10]:
#next we can join the category and program class information

DIRECTORY = "./data/broadcast_logs"

cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep = "|",
    header = True,
    inferSchema = True,
).select(
    "CategoryId",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description")
)

cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassId",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description")
)

In [11]:
full_log = (logs_and_channels
            .join(cd_category, "CategoryId", how="left")
            .join(cd_program_class, "ProgramClassID", how = "left")
            )

##### Excercises

###### 5.1
Assume two tables, left and right, each containing a column named my_column. What is the result of this code?
```python
one = left.join(right, how="left_semi", on="my_column") # acts as a filter
two = left.join(right, how="left_anti", on="my_column")
 
one.union(two) # regenerates left?
```

###### 5.2
Assume two data frames, red and blue. Which is the appropriate join to use in red.join(blue, ...) if you want to join red and blue and keep all the records satisfying the predicate?
Inner

###### 5.3
Assume two data frames, red and blue. Which is the appropriate join to use in red.join(blue, ...) if you want to join red and blue and keep all the records satisfying the predicate and the records in the blue table?
Right



## 5.2 Summarizing the data via groupby and GroupedData

Basically the summary stats from earlier but on data that has been collapsed into a smaller set of numbers with group by

In [12]:
(
    full_log.groupby("ProgramClassCD", "ProgramClass_Description")# creating a "transitional" GroupedData object that can't be shown/inspected. 
    # Basically a dict where the groupby columns are keys, and the rest of the columns (and their grouped rows) are in a "cell" awaiting a aggregation function that will promote
    # them to a bonafide column again.
    .agg( # section where we apply "aggregation" function(s) to promote columns from the holding cell
        F.sum("duration_seconds").alias("duration_total")# promotes the "duration_seconds" column to a summary column called "duration_total" 
        ) 
    .orderBy("duration_total", ascending=False) # applying sort
    .show(100, False) # inspecting the first 100 rows
)



+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|PGR           |PROGRAM                               |20992510      |
|COM           |COMMERCIAL MESSAGE                    |3519163       |
|PFS           |PROGRAM FIRST SEGMENT                 |1344762       |
|SEG           |SEGMENT OF A PROGRAM                  |1205998       |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|880600        |
|PGI           |PROGRAM INFOMERCIAL                   |679182        |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |335701        |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |142279        |
|ID            |NETWORK IDENTIFICATION MESSAGE        |74926         |
|NRN           |No recognized nationality             |59686         |
|MAG           |MAGAZINE PROGRAM                      |57622         |
|PSA  

                                                                                

In [13]:
# alternatively the "aggs" method can acept a dictionary of columnName: String -> aggregationFunctionName: String
# so the prior code can be rewritten as:

(
    full_log.groupby("ProgramClassCD", "ProgramClass_Description")# creating a "transitional" GroupedData object that can't be shown/inspected. 
    # Basically a dict where the groupby columns are keys, and the rest of the columns (and their grouped rows) are in a "cell" awaiting a aggregation function that will promote
    # them to a bonafide column again.
    .agg( # section where we apply "aggregation" function(s) to promote columns from the holding cell
        {"duration_seconds": "sum"}
    )
    .withColumnRenamed("sum(duration_seconds)", "duration_total")
    .orderBy("duration_total", ascending=False) # applying sort
    .show(100, False) # inspecting the first 100 rows
)

# this style allows for more flexible prototyping as you can use "*" to refer to all columns. Author is not a fan b/c column renaming has to happen in a separate step.

+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|PGR           |PROGRAM                               |20992510      |
|COM           |COMMERCIAL MESSAGE                    |3519163       |
|PFS           |PROGRAM FIRST SEGMENT                 |1344762       |
|SEG           |SEGMENT OF A PROGRAM                  |1205998       |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|880600        |
|PGI           |PROGRAM INFOMERCIAL                   |679182        |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |335701        |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |142279        |
|ID            |NETWORK IDENTIFICATION MESSAGE        |74926         |
|NRN           |No recognized nationality             |59686         |
|MAG           |MAGAZINE PROGRAM                      |57622         |
|PSA  

In [14]:
#Aggregations with custom column definitions (virtual columns)

commercial_time_virtual_column = ( # define a virtual column such that
    F.when(
        F.trim(F.col("ProgramClassCD")).isin( # when the trimmed ProgramClassCD value is in the following set
            ["COM", "PRC", "PGI", "PRO", "PSA", "MAG", "LOC", "SPO", "MER", "SOL"]
        ),
        F.col("duration_seconds"), # return the row associated "duration_seconds" value
    ).otherwise(0) # return zero
)

# note that multiple "when"'s can be chained together.

In [15]:
# next we can use the virtual column to determine the commercial ration

answer = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(commercial_time_virtual_column).alias("duration_commericial"),
        F.sum("duration_seconds").alias("duration_total"),
    ).withColumn(
        "commercial_ratio", F.col("duration_commericial")/F.col("duration_total")
    )
)

answer.orderBy("commercial_ratio", ascending=False).show(1000, False)

+---------------+--------------------+--------------+---------------------+
|LogIdentifierID|duration_commericial|duration_total|commercial_ratio     |
+---------------+--------------------+--------------+---------------------+
|CIMT           |775                 |775           |1.0                  |
|MSET           |2700                |2700          |1.0                  |
|TLNSP          |15480               |15480         |1.0                  |
|TELENO         |17790               |17790         |1.0                  |
|HPITV          |13                  |13            |1.0                  |
|TANG           |8125                |8125          |1.0                  |
|MMAX           |23333               |23582         |0.9894410991434145   |
|MPLU           |20587               |20912         |0.9844586840091814   |
|INVST          |20094               |20470         |0.9816316560820714   |
|ZT�L�          |21542               |21965         |0.9807420896881403   |
|RAPT       

## 5.3 Dealing with null values.

`null` values indicate missing data. Typically we can either drop the associated record, or fill (replace) the null with some value.

In [16]:
# generating test data frame with null values

has_nulls = spark.createDataFrame(
    [
        [2, "product2", 3.00, 5],
        [3, "product3", 8.99, 8],
        [None, None, None, None],
        [4, None, None, None],
        [5, "product5", None, None],
        [6, "product6", 2.99, None],

    ],
    ["id", "name", "price", "quantity"]
)

has_nulls.show()

                                                                                

+----+--------+-----+--------+
|  id|    name|price|quantity|
+----+--------+-----+--------+
|   2|product2|  3.0|       5|
|   3|product3| 8.99|       8|
|NULL|    NULL| NULL|    NULL|
|   4|    NULL| NULL|    NULL|
|   5|product5| NULL|    NULL|
|   6|product6| 2.99|    NULL|
+----+--------+-----+--------+



In [17]:
# dropping with dropna()
# drop na takes three optional arguments
# - how -> {"any", "all"}: Defaults to "any". If "any" a record is dropped if it contains any null properties. If "all" is selected, all properties must be null for the record to be dropped
# - thresh "threshold" -> {null, int}: Ignored if null, else overrides the "how" param so that only records with less than "thresh" null properties are dropped
# - subset -> {list<cols>}: optional list of columns to focus on when making the drop decision

has_nulls.dropna().show()
has_nulls.dropna(thresh=2, subset=["id", "name"]).show()

+---+--------+-----+--------+
| id|    name|price|quantity|
+---+--------+-----+--------+
|  2|product2|  3.0|       5|
|  3|product3| 8.99|       8|
+---+--------+-----+--------+

+---+--------+-----+--------+
| id|    name|price|quantity|
+---+--------+-----+--------+
|  2|product2|  3.0|       5|
|  3|product3| 8.99|       8|
|  5|product5| NULL|    NULL|
|  6|product6| 2.99|    NULL|
+---+--------+-----+--------+



In [18]:
# filling with fillna 
# fillna takes two optional arguments
# - value : the value to fill na's with, will only fill na values with compatible types
# - subset -> {list<cols>}: the list of columns to consider for filling

has_nulls.fillna(0).show() # will only fill the numeric values it can target
has_nulls.fillna(0, subset=['price', 'quantity']).show() # only fill the price and quantity columns

# fillna's can be chained
has_nulls.fillna(0, subset=['price', 'quantity']).fillna("unknown", subset=["name"]).show()

+---+--------+-----+--------+
| id|    name|price|quantity|
+---+--------+-----+--------+
|  2|product2|  3.0|       5|
|  3|product3| 8.99|       8|
|  0|    NULL|  0.0|       0|
|  4|    NULL|  0.0|       0|
|  5|product5|  0.0|       0|
|  6|product6| 2.99|       0|
+---+--------+-----+--------+

+----+--------+-----+--------+
|  id|    name|price|quantity|
+----+--------+-----+--------+
|   2|product2|  3.0|       5|
|   3|product3| 8.99|       8|
|NULL|    NULL|  0.0|       0|
|   4|    NULL|  0.0|       0|
|   5|product5|  0.0|       0|
|   6|product6| 2.99|       0|
+----+--------+-----+--------+

+----+--------+-----+--------+
|  id|    name|price|quantity|
+----+--------+-----+--------+
|   2|product2|  3.0|       5|
|   3|product3| 8.99|       8|
|NULL| unknown|  0.0|       0|
|   4| unknown|  0.0|       0|
|   5|product5|  0.0|       0|
|   6|product6| 2.99|       0|
+----+--------+-----+--------+



In [40]:
# all in one (untested)

# imports
import os 
import pyspark.sql.functions as F 
from pyspark.sql import SparkSession

# setup
spark = SparkSession.builder.appName(
    "Getting the Canadian TV channels with the highest/lowest proportion of ads"
).getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# pulling data
DIRECTORY="./data/broadcast_logs"

logs = spark.read.csv(
    os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
    sep = "|",
    header = True,
    inferSchema=True,
)

log_identifier = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)
cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)

cd_program_class = spark.read.csv(
    "./data/broadcast_logs/ReferenceTables/CD_ProgramClass.csv",
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)

# Data processing
 
logs = logs.drop("BroadcastLogID", "SequenceNO")
 
logs = logs.withColumn(
    "duration_seconds",
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ),
)
 
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)
 
logs_and_channels = logs.join(log_identifier, "LogServiceID")
 
full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)
 
answerS = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(
            F.when(
                F.trim(F.col("ProgramClassCD")).isin(
                    ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
                ),
                F.col("duration_seconds"),
            ).otherwise(0)
        ).alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
    )
    .fillna(0)
)
 
answerS.orderBy("commercial_ratio", ascending=False).show(10, False)

+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|BRAVO          |0                  |0             |0.0             |
|BBCKID         |0                  |0             |0.0             |
|BOOK           |0                  |0             |0.0             |
|CBKT           |0                  |0             |0.0             |
|CBHT           |0                  |0             |0.0             |
|CBAFT          |0                  |0             |0.0             |
|ATN9           |0                  |0             |0.0             |
|MAKE           |0                  |0             |0.0             |
|13ST           |0                  |0             |0.0             |
|BBCCND         |0                  |0             |0.0             |
+---------------+-------------------+--------------+----------------+
only showing top 10 

In [41]:
logs.show()

+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+-------------------+-------------------+------------+------------+--------------------+-------------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|           Duration|            EndTime|LogEntryDate|ProductionNO|        ProgramTitle|          StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|duration_seconds|
+------------+----------+-------------------+----------------------+--

## Practice Problems

In [28]:
# Exercise 5.4
left = spark.createDataFrame(
    [
        [1, "product2", 3.00, 5],
        [2, "product3", 8.99, 8],
        [3, None, None, None],
        [4, None, None, None],
        [5, "product5", None, None],
        [6, "product6", 2.99, None],

    ],
    ["id", "name", "price", "quantity"]
)

right = spark.createDataFrame(
    [
        [1, "product2 desc"],
        [5, "product 4 desc"],
        [8, "product 8 desc"],
    ],
    ["id", "description"]
)

left.join(right, how="left_anti", on="id").select("id").distinct().show()
# rewrite left anti join without left_anti
left.join(right, how="left", 
          on=left["id"] == right["id"]).where(
              right["id"].isNull()
              ).select(left["id"]).show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  6|
+---+

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  6|
+---+



In [33]:
# Exercise 5.5
"""
Using the data from the data/broadcast_logs/Call_Signs.csv (careful: the delimiter here is the comma, not the pipe!), 
add the Undertaking_Name to our final table to display a human-readable description of the channel.
"""

call_signs = spark.read.csv(
    os.path.join(DIRECTORY, "Call_Signs.csv"),
    sep = ",",
    header = True 
).drop("UndertakingNO")

call_signs.show()

answer.join(call_signs, on="LogIdentifierID").show()

+---------------+--------------------+
|LogIdentifierID|    Undertaking_Name|
+---------------+--------------------+
|          BRAVO|              Bravo!|
|           CBET|Canadian Broadcas...|
|          CFTV3|Southshore Broadc...|
|           CHEX|591987 B.C. Ltd.,...|
|           CICT|Corus Television ...|
|           CIMT|Télé Inter-Rives ...|
|           CKMI|Corus Television ...|
|          FIGHT|      Fight Network |
|         SCSD02|Super Channel (fo...|
|          SMITH|Smithsonian Chann...|
|        STJUICE|   Stingray Juicebox|
|            TCN| The Comedy Network |
|           TMN3|Crave (The Movie ...|
|            TRN|HPItv (formerly T...|
|         WARNER|Hollywood Suite 7...|
|           13ST|Crime + Investiga...|
|          ASIDE|A.Side (formerly ...|
|            AUX|A.Side (formerly ...|
|          CFCNL|Bell Media Inc., ...|
|           CKAL|Rogers Media Inc....|
+---------------+--------------------+
only showing top 20 rows

+---------------+-------------------+-

In [39]:
# exercise 5.6
answer2 = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(
            F.when(
                F.trim(F.col("ProgramClassCD")).isin(
                    ["COM", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
                ),
                F.col("duration_seconds"),
            ).when(
                F.trim(F.col("ProgramClassCD")).isin(
                    ["PRC"]
                ),
                (0.75 * F.col("duration_seconds"))
            ).otherwise(0)
        ).alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
    )
    # .fillna(0)
)
answer2.orderBy("commercial_ratio", ascending=False).show(10, False)
full_log.show(10)

+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|BRAVO          |0.0                |NULL          |NULL            |
|BBCKID         |0.0                |NULL          |NULL            |
|BOOK           |0.0                |NULL          |NULL            |
|CBKT           |0.0                |NULL          |NULL            |
|CBHT           |0.0                |NULL          |NULL            |
|CBAFT          |0.0                |NULL          |NULL            |
|ATN9           |0.0                |NULL          |NULL            |
|MAKE           |0.0                |NULL          |NULL            |
|13ST           |0.0                |NULL          |NULL            |
|BBCCND         |0.0                |NULL          |NULL            |
+---------------+-------------------+--------------+----------------+
only showing top 10 

24/07/26 22:04:31 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [42]:
from utils.bob import test_fun

test_fun(3)

6

In [48]:
rnd = left.select(F.round(left.price).alias("roundedPrice"))
rnd.groupby("roundedPrice").count().show()

+------------+-----+
|roundedPrice|count|
+------------+-----+
|         3.0|    2|
|         9.0|    1|
|        NULL|    3|
+------------+-----+



24/07/29 07:48:58 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 43685417 ms exceeds timeout 120000 ms
24/07/29 07:48:58 WARN SparkContext: Killing executors is not supported by current scheduler.
24/07/29 07:48:59 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint