## Week 07

In this week, we want to analyze `BroadcastLogs_2018_Q3_M8.CSV` and 
find out the duration ratio between commercials and the tv programs.   
But before we do that, we need to understand
1. Tabular data
2. Joining and grouping in pyspark

In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os

In [3]:
spark = (SparkSession
  .builder
  .master("local[*]")   # optional
  .appName("Processing Tabular Data")
  .getOrCreate())


Read broadcast data

In [6]:
# DIRECTORY = "./data/broadcast_logs"     # use this in the class
DIRECTORY = "../rioux-2022/data/broadcast_logs"   

broadcast_logs_filename = "BroadcastLogs_2018_Q3_M8_sample.CSV"
# broadcast_logs_filename = "BroadcastLogs_2018_Q3_M8.CSV"

logs = spark.read.csv(
  os.path.join(DIRECTORY, broadcast_logs_filename),
  sep="|",
  header=True,
  inferSchema=True,
  timestampFormat="yyyy-MM-dd")

Print structure representation of the dataset `logs`

In [7]:
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable 

Show some rows

In [8]:
logs.show(10)

+--------------+------------+----------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+
|BroadcastLogID|LogServiceID|   LogDate|SequenceNO|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|
+--------------+------------+----------+----------+-------------------

Print only column names

In [9]:
logs.columns

['BroadcastLogID',
 'LogServiceID',
 'LogDate',
 'SequenceNO',
 'AudienceTargetAgeID',
 'AudienceTargetEthnicID',
 'CategoryID',
 'ClosedCaptionID',
 'CountryOfOriginID',
 'DubDramaCreditID',
 'EthnicProgramID',
 'ProductionSourceID',
 'ProgramClassID',
 'FilmClassificationID',
 'ExhibitionID',
 'Duration',
 'EndTime',
 'LogEntryDate',
 'ProductionNO',
 'ProgramTitle',
 'StartTime',
 'Subtitle',
 'NetworkAffiliationID',
 'SpecialAttentionID',
 'BroadcastOriginPointID',
 'CompositionID',
 'Producer1',
 'Producer2',
 'Language1',
 'Language2']

Remove column using `.drop()`.   
We remove `BroadcastLogID` and `SequenceNO` because they have no purpose
in our problem 

In [10]:
logs_clean = logs.drop("BroadcastLogID", "SequenceNO")

# Testing if we effectively got rid of the columns
print("BroadcastLogID" in logs_clean.columns)
print("SequenceNo" in logs_clean.columns)

False
False


View `Duration` column

In [11]:
logs_clean.select(F.col("Duration")).show(5)

print(logs_clean.select(F.col("Duration")).dtypes)

+----------------+
|        Duration|
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 5 rows

[('Duration', 'string')]


Parse hour, minute and second for each row in `logs_clean`.   
We use `.distinct()` just for the sake of viewing the data (not changing
the internal structure of the data)

In [13]:
logs_clean.select(
  F.col("Duration"),
  F.col("Duration").substr(1, 2).cast("int").alias("dur_hours"),
  F.col("Duration").substr(4, 2).cast("int").alias("dur_minutes"),
  F.col("Duration").substr(7, 2).cast("int").alias("dur_seconds"),
).distinct().show(5)

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:04:52.0000000|        0|          4|         52|
|00:10:06.0000000|        0|         10|          6|
|00:09:52.0000000|        0|          9|         52|
|00:04:26.0000000|        0|          4|         26|
|00:14:59.0000000|        0|         14|         59|
+----------------+---------+-----------+-----------+
only showing top 5 rows



Create a column `Duration_sseconds`

In [14]:
logs_clean_v2 = logs_clean.withColumn(
  "Duration_seconds", 
  (F.col("Duration").substr(1, 2).cast("int") * 60 * 60
   + F.col("Duration").substr(4, 2).cast("int") * 60
   + F.col("Duration").substr(7, 2).cast("int"))
)

logs_clean_v2.select(F.col("Duration"), F.col("duration_seconds")).distinct().show(10)

+----------------+----------------+
|        Duration|duration_seconds|
+----------------+----------------+
|01:59:30.0000000|            7170|
|00:31:00.0000000|            1860|
|00:28:08.0000000|            1688|
|00:32:00.0000000|            1920|
|00:30:00.0000000|            1800|
|00:00:35.0000000|              35|
|00:01:39.0000000|              99|
|00:55:03.0000000|            3303|
|00:10:47.0000000|             647|
|00:04:00.0000000|             240|
+----------------+----------------+
only showing top 10 rows



Renaming the column from `duration_seconds` to `DurationSeconds`

In [15]:
logs_clean_v3 = logs_clean_v2.withColumnRenamed("duration_seconds", "DurationSeconds")

logs_clean_v3.select(F.col("Duration"), F.col("DurationSeconds")).distinct().show(10)

+----------------+---------------+
|        Duration|DurationSeconds|
+----------------+---------------+
|01:59:30.0000000|           7170|
|00:31:00.0000000|           1860|
|00:28:08.0000000|           1688|
|00:32:00.0000000|           1920|
|00:30:00.0000000|           1800|
|00:00:35.0000000|             35|
|00:01:39.0000000|             99|
|00:55:03.0000000|           3303|
|00:10:47.0000000|            647|
|00:04:00.0000000|            240|
+----------------+---------------+
only showing top 10 rows



Statistical description (count, mean, stddev, min, max) of each column

In [16]:
num_of_cols = 3

for i in logs_clean_v3.columns[:num_of_cols]:
  logs_clean_v3.describe(i).show()

+-------+------------------+
|summary|      LogServiceID|
+-------+------------------+
|  count|            238945|
|   mean| 3450.890284375065|
| stddev|199.50673962554765|
|    min|              3157|
|    max|              3925|
+-------+------------------+

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+

+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|  count|              16112|
|   mean| 3.4929245283018866|
| stddev| 1.0415963394745125|
|    min|                  1|
|    max|                  4|
+-------+-------------------+



Statistical description (count, mean, stddev, min, 25%, 50%, 75%, max) of each column

In [20]:
num_of_cols = 3

for col_name in logs_clean_v3.columns[:num_of_cols]:
  logs_clean_v3.select(col_name).summary().show()

+-------+------------------+
|summary|      LogServiceID|
+-------+------------------+
|  count|            238945|
|   mean| 3450.890284375065|
| stddev|199.50673962554765|
|    min|              3157|
|    25%|              3287|
|    50%|              3379|
|    75%|              3627|
|    max|              3925|
+-------+------------------+

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    25%|
|    50%|
|    75%|
|    max|
+-------+

+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|  count|              16112|
|   mean| 3.4929245283018866|
| stddev| 1.0415963394745125|
|    min|                  1|
|    25%|                  4|
|    50%|                  4|
|    75%|                  4|
|    max|                  4|
+-------+-------------------+



**Exercise:**   
How to show `describe` or `summary` for date data type?

### Joining and grouping

Now we want to learn how to join two tables (or more) into one table.   
In pyspark, we are encouraged to use denormalization (joining all tables into a single big table)

First, let us read the other three tables from `ReferenceTables` directory

In [23]:
log_identifier = spark.read.csv(
  os.path.join(DIRECTORY, "ReferenceTables/LogIdentifier.csv"),
  sep="|", header=True, inferSchema=True
  ).where(F.col("PrimaryFG") == 1)

cd_category = spark.read.csv(
  os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
  sep="|", header=True, inferSchema=True
  ).select("CategoryID", "CategoryCD", 
           F.col("EnglishDescription").alias("CategoryDescription"))

cd_program_class = spark.read.csv(
  os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
  sep="|", header=True, inferSchema=True
  ).select("ProgramClassID", "ProgramClassCD", 
           F.col("EnglishDescription").alias("ProgramClass_Description"))

In [29]:
# Column `LogIdentifierID` is the same as the short channel name 
# (compared to the short names of TV channel in Indoneesia 
#  like SCTV, RCTI, ANTV, etc., )
log_identifier.show(5), cd_category.show(5), cd_program_class.show(5);

+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
+---------------+------------+---------+
only showing top 5 rows

+----------+----------+--------------------+
|CategoryID|CategoryCD| CategoryDescription|
+----------+----------+--------------------+
|         1|       010|                NEWS|
|         2|       02 |CANREC  ANALYSIS ...|
|         3|       02A|ANALYSIS AND INTE...|
|         4|       02B|LONG-FORM DOCUMEN...|
|         5|       030|REPORTING & ACTUA...|
+----------+----------+--------------------+
only showing top 5 rows

+--------------+--------------+------------------------+
|ProgramClassID|ProgramClassCD|ProgramClass_Description|
+--------------+--------------+------------------------+
|    

In the center table `logs`, we also have the columns 
`LogServiceID`, `CategoryID`, and `ProgramClassID`

In [32]:
(logs_clean_v3.select("LogServiceID", "CategoryID", "ProgramClassID", 
                     "DurationSeconds")
              .show(5))

+------------+----------+--------------+---------------+
|LogServiceID|CategoryID|ProgramClassID|DurationSeconds|
+------------+----------+--------------+---------------+
|        3157|        13|            19|           7200|
|        3157|      NULL|            20|             30|
|        3157|      NULL|             3|             15|
|        3157|      NULL|             3|             15|
|        3157|      NULL|             3|             15|
+------------+----------+--------------+---------------+
only showing top 5 rows



First, we perform **inner join** between `logs_clean_v3`
and `log_identifier`

In [42]:
logs_clean_v3.count(), log_identifier.count()

(238945, 758)

In [48]:
# No NULL values in `LogServiceID`
logs_clean_v3.filter(F.col("LogServiceID").isNull()).select("LogServiceID").show(5)

+------------+
|LogServiceID|
+------------+
+------------+



Because there is no NULL value in "LogServiceID" column, the result of using
**inner join** and **left join** are the same. For the other two tables, 
we use **left join** to keep the NULL values.

We do not need to set  `how="inner"` because it is set by default

In [39]:
logs_and_channels = logs_clean_v3.join(
    log_identifier, on="LogServiceID")

# show only distinct values in `LogServiceID`
logs_and_channels.dropDuplicates(["LogServiceID"]).show(5)

+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+---------------+---------------+---------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|DurationSeconds|LogIdentifierID|PrimaryFG|
+------------+----------+-------------

In [43]:
logs_and_channels.count()

238945

Now perform **left join** for the remaining link tables: `cd_category` and `cd_program_class`.  
Because there are some NULL values in `CategoryID` and `ProgramClassID` in 
the `logs_and_channels` tables, we will have the result that still contains
NULL values in `full_logs`

In [53]:
full_logs = (logs_and_channels
  .join(cd_category, on="CategoryID", how="left")
  .join(cd_program_class, on="ProgramClassID", how="left"))

full_logs.select(
    "ProgramClassID", "CategoryID", "LogServiceID", "DurationSeconds",
    "LogIdentifierID", "PrimaryFG",
    "CategoryCD", "CategoryDescription",
    "ProgramClassCD", "ProgramClass_Description"
).dropDuplicates(["ProgramClassID", "CategoryID"]).show(10)

+--------------+----------+------------+---------------+---------------+---------+----------+--------------------+--------------+------------------------+
|ProgramClassID|CategoryID|LogServiceID|DurationSeconds|LogIdentifierID|PrimaryFG|CategoryCD| CategoryDescription|ProgramClassCD|ProgramClass_Description|
+--------------+----------+------------+---------------+---------------+---------+----------+--------------------+--------------+------------------------+
|             3|      NULL|        3157|             15|           13ST|        1|      NULL|                NULL|          COM |      COMMERCIAL MESSAGE|
|             4|         6|        3781|           NULL|         VISION|        1|       040|            RELIGION|          COR |             CORNERSTONE|
|             4|        11|        3781|           NULL|         VISION|        1|       07A|ONGOING DRAMATIC ...|          COR |             CORNERSTONE|
|             4|        12|        3781|           NULL|         VISIO

Now we need to find out which `ProgramClasCD` is a commercial or not   
by using grouping and aggregation

In [54]:
(full_logs
 .groupby("ProgramClassCD", "ProgramClass_Description")
 .agg(F.sum("DurationSeconds").alias("DurationTotal"))
 .orderBy("DurationTotal", ascending=False).show(100, False))

+--------------+--------------------------------------+-------------+
|ProgramClassCD|ProgramClass_Description              |DurationTotal|
+--------------+--------------------------------------+-------------+
|PGR           |PROGRAM                               |20992510     |
|COM           |COMMERCIAL MESSAGE                    |3519163      |
|PFS           |PROGRAM FIRST SEGMENT                 |1344762      |
|SEG           |SEGMENT OF A PROGRAM                  |1205998      |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|880600       |
|PGI           |PROGRAM INFOMERCIAL                   |679182       |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |335701       |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |142279       |
|ID            |NETWORK IDENTIFICATION MESSAGE        |74926        |
|NRN           |No recognized nationality             |59686        |
|MAG           |MAGAZINE PROGRAM                      |57622        |
|PSA           |PUBL

From the above table, we can identify manually from `ProgramClass_Description`  
which `ProgramClassCD` is a commercial or not.  We have the following list
```md
["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]

Now we can calculate the ratio between commerical and tv program

In [58]:
answer = (full_logs
  .groupby("LogIdentifierID")     # The short TV channel name
  .agg(
    F.sum(
      F.when(
        F.trim(F.col("ProgramClassCD"))
          .isin(["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]),
        F.col("DurationSeconds")
      ).otherwise(0)
    ).alias("DurationCommercial"),
    F.sum("DurationSeconds").alias("DurationTotal"))
  .withColumn(
    "CommercialRatio", F.col("DurationCommercial") / F.col("DurationTotal"))
)

print(answer.count())
answer.orderBy("CommercialRatio", ascending=False).show(1000, False)

324
+---------------+------------------+-------------+---------------------+
|LogIdentifierID|DurationCommercial|DurationTotal|CommercialRatio      |
+---------------+------------------+-------------+---------------------+
|CIMT           |775               |775          |1.0                  |
|MSET           |2700              |2700         |1.0                  |
|TLNSP          |15480             |15480        |1.0                  |
|TELENO         |17790             |17790        |1.0                  |
|HPITV          |13                |13           |1.0                  |
|TANG           |8125              |8125         |1.0                  |
|MMAX           |23333             |23582        |0.9894410991434145   |
|MPLU           |20587             |20912        |0.9844586840091814   |
|INVST          |20094             |20470        |0.9816316560820714   |
|ZT�L�          |21542             |21965        |0.9807420896881403   |
|RAPT           |17916             |18279      

**Exercise**:   
Can you fix an unrecognized characters in the row ZT�L�?

### Handling NULL

Drop rows with NULL

In [57]:
answer_no_null = answer.dropna(subset=["CommercialRatio"])

print(answer_no_null.count())
answer_no_null.orderBy(
  "CommercialRatio", ascending=False).show(1000, False)

322
+---------------+------------------+-------------+---------------------+
|LogIdentifierID|DurationCommercial|DurationTotal|CommercialRatio      |
+---------------+------------------+-------------+---------------------+
|CIMT           |775               |775          |1.0                  |
|MSET           |2700              |2700         |1.0                  |
|TLNSP          |15480             |15480        |1.0                  |
|TELENO         |17790             |17790        |1.0                  |
|HPITV          |13                |13           |1.0                  |
|TANG           |8125              |8125         |1.0                  |
|MMAX           |23333             |23582        |0.9894410991434145   |
|MPLU           |20587             |20912        |0.9844586840091814   |
|INVST          |20094             |20470        |0.9816316560820714   |
|ZT�L�          |21542             |21965        |0.9807420896881403   |
|RAPT           |17916             |18279      

Fill in NULL with specific values

In [59]:
answer_no_null = answer.fillna(0)

print(answer_no_null.count())
answer_no_null.orderBy(
  "CommercialRatio", ascending=False).show(1000, False)

324
+---------------+------------------+-------------+---------------------+
|LogIdentifierID|DurationCommercial|DurationTotal|CommercialRatio      |
+---------------+------------------+-------------+---------------------+
|CIMT           |775               |775          |1.0                  |
|MSET           |2700              |2700         |1.0                  |
|TLNSP          |15480             |15480        |1.0                  |
|TELENO         |17790             |17790        |1.0                  |
|HPITV          |13                |13           |1.0                  |
|TANG           |8125              |8125         |1.0                  |
|MMAX           |23333             |23582        |0.9894410991434145   |
|MPLU           |20587             |20912        |0.9844586840091814   |
|INVST          |20094             |20470        |0.9816316560820714   |
|ZT�L�          |21542             |21965        |0.9807420896881403   |
|RAPT           |17916             |18279      