Example tabular data processing (structured relational data) with pyspark

look at `broadcast_logs/` folder
* CRTC (Canadian Radio-Television and Telecommunications Commission). Every broadcaster is mandated to provide a complete log of the programs and
commercials showcased to the Canadian public.

In [72]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F

In [73]:
spark = SparkSession.builder.getOrCreate()

In [74]:
my_grocery_list = [
    ["Banana", 2, 1.74],
    ["Apple", 4, 2.04],
    ["Carrot", 1, 1.09],
    ["Cake", 1, 10.99],
]

In [75]:
# Let Spark infer the schema
df_grocery_list: DataFrame = spark.createDataFrame(
    data=my_grocery_list, schema=["Item", "Quantity", "Price"]
)

In [76]:
df_grocery_list.printSchema()

root
 |-- Item: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Price: double (nullable = true)



In [77]:
# or you can be explicit

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Define the schema
schema = StructType([
    StructField(name="Item", dataType=StringType(), nullable=True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", FloatType(), True),
])

df_grocery_list = spark.createDataFrame(
    my_grocery_list, schema
)

In [78]:
df_grocery_list.printSchema()

root
 |-- Item: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: float (nullable = true)



* PySpark doesn’t provide any charting capabilities and doesn’t play with other charting libraries (like Matplotlib, seaborn, Altair, or plot.ly), and this makes a lot of sense: PySpark distributes your data over many computers. It doesn’t make much sense to
distribute a chart creation. The usual solution will be to transform your data using PySpark, use the toPandas() method to transform your PySpark data frame into a pandas data frame, and then use your favorite charting library.
* You do not want to move your data between a pandas and a PySpark data frame all the time. Reserve toPandas() for either discrete operations or for moving your data into a pandas data frame once and for all. If you need pandas functionality on a Spark data frame, check out pandas UDFs

csv file format
* a row delimiter. The row delimiter splits the file into logical records. The newline character (`\n`) is the de facto record delimiter
* field delimiter. Each record is made up of an identical number of fields, and the field delimiter tells where one field starts and ends. The comma character (`,`) is the most frequent field delimiter

In [79]:
using_GoogleColab = True
if using_GoogleColab:
  # to work with files inside Google Colab,
  from google.colab import drive
  drive.mount('/content/drive')
  # and right-click copy path of the files you want
else:
  # idk man
  pass

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [80]:
from pathlib import Path

In [81]:
# Define the directory and file path using pathlib
dir_path = Path("drive/MyDrive/Colab Notebooks/broadcast_logs")
file_path = dir_path / "BroadcastLogs_2018_Q3_M8_sample.CSV"
file_path

PosixPath('drive/MyDrive/Colab Notebooks/broadcast_logs/BroadcastLogs_2018_Q3_M8_sample.CSV')

In [82]:
logs = spark.read.csv(
    str(file_path),  # Convert the Path object to a string
    sep="|",
    header=True,
    inferSchema=True,
    timestampFormat="yyyy-MM-dd",
)

In [83]:
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable 

Selecting columns with .select()

In [84]:
logs.select("BroadcastLogID", "LogServiceID", "LogDate").show(5, False)

+--------------+------------+----------+
|BroadcastLogID|LogServiceID|LogDate   |
+--------------+------------+----------+
|1196192316    |3157        |2018-08-01|
|1196192317    |3157        |2018-08-01|
|1196192318    |3157        |2018-08-01|
|1196192319    |3157        |2018-08-01|
|1196192320    |3157        |2018-08-01|
+--------------+------------+----------+
only showing top 5 rows



In [85]:
logs.select(*["BroadCastLogID", "LogServiceID", "LogDate"]).show(5) # same thing

+--------------+------------+----------+
|BroadCastLogID|LogServiceID|   LogDate|
+--------------+------------+----------+
|    1196192316|        3157|2018-08-01|
|    1196192317|        3157|2018-08-01|
|    1196192318|        3157|2018-08-01|
|    1196192319|        3157|2018-08-01|
|    1196192320|        3157|2018-08-01|
+--------------+------------+----------+
only showing top 5 rows



In [86]:
logs.select(
F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")
).show(5) # same thing

+--------------+------------+----------+
|BroadCastLogID|LogServiceID|   LogDate|
+--------------+------------+----------+
|    1196192316|        3157|2018-08-01|
|    1196192317|        3157|2018-08-01|
|    1196192318|        3157|2018-08-01|
|    1196192319|        3157|2018-08-01|
|    1196192320|        3157|2018-08-01|
+--------------+------------+----------+
only showing top 5 rows



In [87]:
import numpy as np

In [88]:
# see every column in groups of three
column_split = np.array_split(
    np.array(logs.columns), len(logs.columns) // 3
)

print(column_split)

for x in column_split:
    logs.select(*x).show(5)

[array(['BroadcastLogID', 'LogServiceID', 'LogDate'], dtype='<U22'), array(['SequenceNO', 'AudienceTargetAgeID', 'AudienceTargetEthnicID'],
      dtype='<U22'), array(['CategoryID', 'ClosedCaptionID', 'CountryOfOriginID'], dtype='<U22'), array(['DubDramaCreditID', 'EthnicProgramID', 'ProductionSourceID'],
      dtype='<U22'), array(['ProgramClassID', 'FilmClassificationID', 'ExhibitionID'],
      dtype='<U22'), array(['Duration', 'EndTime', 'LogEntryDate'], dtype='<U22'), array(['ProductionNO', 'ProgramTitle', 'StartTime'], dtype='<U22'), array(['Subtitle', 'NetworkAffiliationID', 'SpecialAttentionID'],
      dtype='<U22'), array(['BroadcastOriginPointID', 'CompositionID', 'Producer1'],
      dtype='<U22'), array(['Producer2', 'Language1', 'Language2'], dtype='<U22')]
+--------------+------------+----------+
|BroadcastLogID|LogServiceID|   LogDate|
+--------------+------------+----------+
|    1196192316|        3157|2018-08-01|
|    1196192317|        3157|2018-08-01|
|    1196192318|

In [89]:
logs = logs.select(
    *[x for x in logs.columns if x not in ["BroadcastLogID", "SequenceNO"]]
)

Dropping columns with .drop() and with .select()

In [90]:
logs = logs.drop("BroadcastLogID", "SequenceNO")

In [91]:
print("BroadcastLogID" in logs.columns)

False


In [92]:
logs = logs.select(
    *[x for x in logs.columns if x not in ["BroadcastLogID", "SequenceNO"]]
)

Creating new columns with .withColumn() and with .select()

In [93]:
logs.select(F.col("Duration")).show(5)

+----------------+
|        Duration|
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 5 rows



In [94]:
logs.select(F.col("Duration")).dtypes # string is formatted like HH:MM:SS.mmmmmm

[('Duration', 'string')]

In [95]:
# Extracting the hours, minutes, and seconds from the Duration column
logs.select(
    F.col("Duration"),
    F.col("Duration").substr(1, 2).cast("int").alias("dur_hours"),
    F.col("Duration").substr(4, 2).cast("int").alias("dur_minutes"),
    F.col("Duration").substr(7, 2).cast("int").alias("dur_seconds"),
).distinct().show(5)

# We could rely on the datetime and timedelta Python constructs through a UDF.
# Depending on the type of UDF (simple versus vectorized), the performance can
# be slower or comparable to using this approach

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:04:52.0000000|        0|          4|         52|
|00:10:06.0000000|        0|         10|          6|
|00:26:41.0000000|        0|         26|         41|
|00:05:29.0000000|        0|          5|         29|
|00:08:18.0000000|        0|          8|         18|
+----------------+---------+-----------+-----------+
only showing top 5 rows



In [96]:
# Creating a new column - duration of the program in seconds
logs.select(
    F.col("Duration"),
    (
    F.col("Duration").substr(1, 2).cast("int") * 60 * 60
    + F.col("Duration").substr(4, 2).cast("int") * 60
    + F.col("Duration").substr(7, 2).cast("int")
    ).alias("Duration_seconds"),
).distinct().show(5)

+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|01:59:30.0000000|            7170|
|00:31:00.0000000|            1860|
|00:28:08.0000000|            1688|
|00:10:30.0000000|             630|
|00:32:00.0000000|            1920|
+----------------+----------------+
only showing top 5 rows



In [97]:
# the other way, with .withColumn()
logs = logs.withColumn(
    "Duration_seconds",
    (
    F.col("Duration").substr(1, 2).cast("int") * 60 * 60
    + F.col("Duration").substr(4, 2).cast("int") * 60
    + F.col("Duration").substr(7, 2).cast("int")
    ),
)

In [98]:
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

Renaming columns with .withColumnRenamed() and .toDF()

In [99]:
logs = logs.withColumnRenamed("Duration_seconds", "duration_seconds")
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [100]:
logs.toDF(*[x.lower() for x in logs.columns]).printSchema()

root
 |-- logserviceid: integer (nullable = true)
 |-- logdate: date (nullable = true)
 |-- audiencetargetageid: integer (nullable = true)
 |-- audiencetargetethnicid: integer (nullable = true)
 |-- categoryid: integer (nullable = true)
 |-- closedcaptionid: integer (nullable = true)
 |-- countryoforiginid: integer (nullable = true)
 |-- dubdramacreditid: integer (nullable = true)
 |-- ethnicprogramid: integer (nullable = true)
 |-- productionsourceid: integer (nullable = true)
 |-- programclassid: integer (nullable = true)
 |-- filmclassificationid: integer (nullable = true)
 |-- exhibitionid: integer (nullable = true)
 |-- duration: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- logentrydate: date (nullable = true)
 |-- productionno: string (nullable = true)
 |-- programtitle: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- subtitle: string (nullable = true)
 |-- networkaffiliationid: integer (nullable = true)
 |-- specialattentionid: inte

summary statistics with .describe() and .summary()

In [101]:
for i in logs.columns:
    logs.describe(i).show()

+-------+------------------+
|summary|      LogServiceID|
+-------+------------------+
|  count|            238945|
|   mean| 3450.890284375065|
| stddev|199.50673962555592|
|    min|              3157|
|    max|              3925|
+-------+------------------+

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+

+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|  count|              16112|
|   mean| 3.4929245283018866|
| stddev| 1.0415963394745125|
|    min|                  1|
|    max|                  4|
+-------+-------------------+

+-------+----------------------+
|summary|AudienceTargetEthnicID|
+-------+----------------------+
|  count|                  1710|
|   mean|    120.56432748538012|
| stddev|     71.98694059436133|
|    min|                     4|
|    max|                   337|
+-------+----------------------+

+-------+------------------+
|summary|        CategoryID|
+-------+-----------

In [102]:
# .summary() gives more information
for i in logs.columns:
    logs.select(i).summary("min", "10%", "90%", "max").show()

+-------+------------+
|summary|LogServiceID|
+-------+------------+
|    min|        3157|
|    10%|        3236|
|    90%|        3709|
|    max|        3925|
+-------+------------+

+-------+
|summary|
+-------+
|    min|
|    10%|
|    90%|
|    max|
+-------+

+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|    min|                  1|
|    10%|                  1|
|    90%|                  4|
|    max|                  4|
+-------+-------------------+

+-------+----------------------+
|summary|AudienceTargetEthnicID|
+-------+----------------------+
|    min|                     4|
|    10%|                    74|
|    90%|                   258|
|    max|                   337|
+-------+----------------------+

+-------+----------+
|summary|CategoryID|
+-------+----------+
|    min|         1|
|    10%|         3|
|    90%|        29|
|    max|        29|
+-------+----------+

+-------+---------------+
|summary|ClosedCaptionID|
+------

joining tables
* syntax
```none
[LEFT].join(
    [RIGHT],
    on=[PREDICATES]
    how=[METHOD]
)
```
* types
    * A cross join (how="cross") returns a record for every record pair, regardless of the value the predicates return. Cross joins are seldom the operation you want, but they are useful when you want a table that contains every possible combination.

    * an inner join (how="inner") returns a record if the predicate is true and drops it if false.
    
    * a left (also called a left outer) join will add the unmatched records from the left table in the joined table, filling the columns coming from the right table with null.
    
    * a right (also called a right outer) join will add the unmatched records from the right in the joined table, filling the columns coming from the left table with null.
    
    * a full outer (how="outer", how="full", or how="full_outer") join is simply the fusion of a left and right join. It will add the unmatched records from the left and the right table, padding with null.

    * A left semi-join (how="left_semi") is the same as an inner join, but keeps the columns in the left table. It also won’t duplicate the records in the left table if they fulfill the predicate with more than one record in the right table. Its main purpose is to filter records from a table based on a predicate that is depending on another table.

    * A left anti-join (how="left_anti") is the opposite of an inner join. It will keep only the records from the left table that do not match the predicate with any record in the right table. If a record from the left table matches a record from the right table, it gets dropped from the join operation.

In [103]:
file_path = dir_path / "ReferenceTables/LogIdentifier.csv"
file_path

PosixPath('drive/MyDrive/Colab Notebooks/broadcast_logs/ReferenceTables/LogIdentifier.csv')

In [104]:
log_identifier = spark.read.csv(
    str(file_path),
    sep="|",
    header=True,
    inferSchema=True,
)

In [105]:
log_identifier.printSchema()

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)



In [106]:
logs_and_channels = logs.join(
    log_identifier,
    on=logs["LogServiceID"] == log_identifier["LogServiceID"],
    how="inner" # this is the default
)

In [107]:
logs_and_channels.show(5)

+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+---------------+------------+---------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|duration_seconds|LogIdentifierID|LogServiceID|PrimaryFG|
+---------

In [108]:
# same thing
logs_and_channels = logs.join(
    log_identifier,
    on="LogServiceID", # if the column name is the same in both tables
    how="inner"
)

# same thing
# logs_and_channels = logs.join(log_identifier, "LogServiceID")

PySpark will not allow two columns to be named the same. If it already exists, it will overwrite (or shadow) the column. If they have different names and you want to keep only one, you could do it like

```python
logs_and_channels_verbose = logs.join(
log_identifier,
logs["LogServiceID"] == log_identifier["LogServiceID2"]
)
logs_and_channels.drop(log_identifier["LogServiceID2"]).select("LogServiceID")
```

In [109]:
logs_and_channels.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [110]:
# Linking the category and program class tables using two left joins

file_path = dir_path / "ReferenceTables/CD_Category.csv"

cd_category = spark.read.csv(
    str(file_path),
    sep="|",
    header=True,
    inferSchema=True,
)

file_path = dir_path / "ReferenceTables/CD_ProgramClass.csv"
cd_program_class = spark.read.csv(
    str(file_path),
    sep="|",
    header=True,
    inferSchema=True,
)

In [111]:
# aliasing columns
cd_category = cd_category.select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)

cd_program_class = cd_program_class.select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)

In [112]:
full_log = logs_and_channels.join(
    cd_category,
    "CategoryID",
    how="left"
    ).join(
    cd_program_class,
    "ProgramClassID",
    how="left"
)

In [113]:
full_log.show(5)

+--------------+----------+------------+----------+-------------------+----------------------+---------------+-----------------+----------------+---------------+------------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+---------------+---------+----------+--------------------+--------------+------------------------+
|ProgramClassID|CategoryID|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|duration

In [114]:
# most popular types of programs - grouping and aggregating
(full_log
.groupby("ProgramClassCD", "ProgramClass_Description")
.agg(F.sum("duration_seconds").alias("duration_total"))
.orderBy("duration_total", ascending=False).show(100, False)
)

# You can also use groupby(), with the apply() (Spark 2.3+) and applyInPandas()
# (Spark 3.0+)

+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|PGR           |PROGRAM                               |29440180      |
|COM           |COMMERCIAL MESSAGE                    |4959005       |
|PFS           |PROGRAM FIRST SEGMENT                 |1897637       |
|SEG           |SEGMENT OF A PROGRAM                  |1535873       |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|1359433       |
|PGI           |PROGRAM INFOMERCIAL                   |765074        |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |416717        |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |187304        |
|ID            |NETWORK IDENTIFICATION MESSAGE        |117735        |
|MAG           |MAGAZINE PROGRAM                      |75624         |
|NRN           |No recognized nationality             |72195         |
|PSA  

In [115]:
# commercial time for each program - aggregate on custom columns
F.when(
    F.trim(F.col("ProgramClassCD")).isin(
    ["COM", "PRC", "PGI", "PRO", "PSA", "MAG", "LOC", "SPO", "MER", "SOL"]
    ),
    F.col("duration_seconds"),
).otherwise(0)

Column<'CASE WHEN (trim(ProgramClassCD) IN (COM, PRC, PGI, PRO, PSA, MAG, LOC, SPO, MER, SOL)) THEN duration_seconds ELSE 0 END'>

In [117]:
answer = (
full_log.groupby("LogIdentifierID")
.agg(F.sum(
    F.when(
        F.trim(F.col("ProgramClassCD")).isin(
        ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
        ),
        F.col("duration_seconds"),
    ).otherwise(0)
    ).alias("duration_commercial"),
    F.sum("duration_seconds").alias("duration_total"),
)
.withColumn(
"commercial_ratio", F.col("duration_commercial") / F.col("duration_total"))
)
answer.orderBy("commercial_ratio", ascending=False).show(15, False)

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio  |
+---------------+-------------------+--------------+------------------+
|CIMT           |775                |775           |1.0               |
|MSET           |2700               |2700          |1.0               |
|TLNSP          |15480              |15480         |1.0               |
|TELENO         |17790              |17790         |1.0               |
|TRN            |13                 |13            |1.0               |
|HPITV          |13                 |13            |1.0               |
|TANG           |8125               |8125          |1.0               |
|MUSIMAX        |23333              |23582         |0.9894410991434145|
|MMAX           |23333              |23582         |0.9894410991434145|
|MUSIP          |20587              |20912         |0.9844586840091814|
|MPLU           |20587              |20912         |0.9844586840

In [118]:
# simplified version (without the nested logic)

# Define the commercial program classes
commercial_classes = ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]

# Add a column for commercial duration
full_log = full_log.withColumn(
    "commercial_duration",
    F.when(F.trim(F.col("ProgramClassCD")).isin(commercial_classes), F.col("duration_seconds")).otherwise(0)
)

# Group by LogIdentifierID and calculate aggregates
answer = (
    full_log.groupBy("LogIdentifierID")
    .agg(
        F.sum("commercial_duration").alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total")
    )
    .withColumn(
        "commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
    )
)

# Order by commercial ratio
answer.orderBy("commercial_ratio", ascending=False).show(15, False)

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio  |
+---------------+-------------------+--------------+------------------+
|CIMT           |775                |775           |1.0               |
|MSET           |2700               |2700          |1.0               |
|TLNSP          |15480              |15480         |1.0               |
|TELENO         |17790              |17790         |1.0               |
|TRN            |13                 |13            |1.0               |
|HPITV          |13                 |13            |1.0               |
|TANG           |8125               |8125          |1.0               |
|MUSIMAX        |23333              |23582         |0.9894410991434145|
|MMAX           |23333              |23582         |0.9894410991434145|
|MUSIP          |20587              |20912         |0.9844586840091814|
|MPLU           |20587              |20912         |0.9844586840

Dealing with null values - dropna() and fillna()

In [122]:
answer.count()

446

In [123]:
answer_no_null = answer.dropna(subset=["commercial_ratio"])
answer_no_null.orderBy("commercial_ratio", ascending=False).show(15, False)

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio  |
+---------------+-------------------+--------------+------------------+
|CIMT           |775                |775           |1.0               |
|MSET           |2700               |2700          |1.0               |
|TLNSP          |15480              |15480         |1.0               |
|TELENO         |17790              |17790         |1.0               |
|TRN            |13                 |13            |1.0               |
|HPITV          |13                 |13            |1.0               |
|TANG           |8125               |8125          |1.0               |
|MUSIMAX        |23333              |23582         |0.9894410991434145|
|MMAX           |23333              |23582         |0.9894410991434145|
|MUSIP          |20587              |20912         |0.9844586840091814|
|MPLU           |20587              |20912         |0.9844586840

In [124]:
answer_no_null.count()

444

In [125]:
answer_no_null = answer.fillna(0)
answer_no_null.orderBy("commercial_ratio", ascending=False).show(15, False)

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio  |
+---------------+-------------------+--------------+------------------+
|CIMT           |775                |775           |1.0               |
|MSET           |2700               |2700          |1.0               |
|TLNSP          |15480              |15480         |1.0               |
|TELENO         |17790              |17790         |1.0               |
|TRN            |13                 |13            |1.0               |
|HPITV          |13                 |13            |1.0               |
|TANG           |8125               |8125          |1.0               |
|MUSIMAX        |23333              |23582         |0.9894410991434145|
|MMAX           |23333              |23582         |0.9894410991434145|
|MUSIP          |20587              |20912         |0.9844586840091814|
|MPLU           |20587              |20912         |0.9844586840

In [126]:
answer_no_null.count()

446