# Grouping Data & Handling Missing Data

Data manipulation such as grouping data and handling missing data is crucial step in data analysis. PySpark offers many method to perform data manipulation. In this notebook, I'll cover data grouping and handling missing data. Let's create a spark session first.

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()

## Reading the Tables

Let's create the logs and log_identifier and then join them.

In [4]:
import os
DIRECTORY = "./data/broadcast_logs"

In [5]:
logs = spark.read.csv(
    os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
    sep="|",
    header=True,
    inferSchema=True,
)
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: string (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: string (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nulla

In [7]:
log_identifier = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)
log_identifier.printSchema()

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)



## Finding the Duration Second

Let's create a new column with the `withColumn` method that shows seconds.

In [8]:
logs = logs.withColumn(
    "Duration_seconds",(
    F.col("Duration").substr(1, 2).cast("int") * 60 * 60
    + F.col("Duration").substr(4, 2).cast("int") * 60
    + F.col("Duration").substr(7, 2).cast("int")),)
logs.select(F.col("Duration_seconds")).show(5)

+----------------+
|Duration_seconds|
+----------------+
|            7200|
|              30|
|              15|
|              15|
|              15|
+----------------+
only showing top 5 rows



## Joining Tables

Let's join the log_and_channels column with the log_identifier column.

In [12]:
logs_and_channels = logs.join(
    log_identifier,
    on="LogServiceID",
    how="inner" 
)
logs_and_channels.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogDate: string (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: string (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nulla

Next, let me use two other tables: the CategoryId and ProgramClassID. The CategoryID table includes information about the types of programs, and the ProgramClassID table includes the data that allows us to pinpoint the commercials. Let's read these tables first.

In [9]:
cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"), 
)
cd_category.show(5)

+----------+----------+--------------------+
|CategoryID|CategoryCD|Category_Description|
+----------+----------+--------------------+
|         1|       010|                NEWS|
|         2|       02 |CANREC  ANALYSIS ...|
|         3|       02A|ANALYSIS AND INTE...|
|         4|       02B|LONG-FORM DOCUMEN...|
|         5|       030|REPORTING & ACTUA...|
+----------+----------+--------------------+
only showing top 5 rows



In [10]:
cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"), 
)
cd_program_class.show(5)

+--------------+--------------+------------------------+
|ProgramClassID|ProgramClassCD|ProgramClass_Description|
+--------------+--------------+------------------------+
|             1|          AUT |           AUTOPROMOTION|
|             2|          BAL |     BALANCE PROGRAMMING|
|             3|          COM |      COMMERCIAL MESSAGE|
|             4|          COR |             CORNERSTONE|
|             5|          DOC |             DOCUMENTARY|
+--------------+--------------+------------------------+
only showing top 5 rows



Let's join the logs_and_channels table with the cd_category and cd_program_class tables.

In [13]:
full_log = logs_and_channels\
                .join(cd_category, "CategoryID", how="left")\
                .join( cd_program_class, "ProgramClassID", how="left")

print("The number of full_log columns:", len(full_log.columns))
full_log.printSchema()

The number of full_log columns: 37
root
 |-- ProgramClassID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogDate: string (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: string (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable =

## A Simple Groupby Blueprint

You can use the `groupby` method to group the records. You can also use one or more aggregate functions from the pyspark.sql.functions module into the `agg` method along with the `groupby` method. After applying the application of the `aggregate` method on our GroupedData object, you can call the `orderBy` method to order the data.

In [14]:
(full_log
    .groupby("ProgramClassCD", "ProgramClass_Description")
    .agg(F.sum("Duration_seconds").alias("duration_total"))
    .orderBy("duration_total", ascending=False).show(50, False)
 )

+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|PGR           |PROGRAM                               |29440180      |
|COM           |COMMERCIAL MESSAGE                    |4959005       |
|PFS           |PROGRAM FIRST SEGMENT                 |1897637       |
|SEG           |SEGMENT OF A PROGRAM                  |1535873       |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|1359433       |
|PGI           |PROGRAM INFOMERCIAL                   |765074        |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |416717        |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |187304        |
|ID            |NETWORK IDENTIFICATION MESSAGE        |117735        |
|MAG           |MAGAZINE PROGRAM                      |75624         |
|NRN           |No recognized nationality             |72195         |
|PSA  

## Using agg() with Custom Column Definitions

You can apply the `agg` method to custom columns you want. Let's compute only the commercial time for each program in our table with the `when` method. First, let's take a look at how to use the `when` method. The following command extracts the duration seconds of specified values.

In [19]:
F.when(
    F.trim(F.col("ProgramClassCD")).isin(
        ["COM", "PRC", "PGI", "PRO", "PSA", "MAG", "LOC", "SPO", "MER", "SOL"]
    ),
    F.col("Duration_seconds"),
).otherwise(0)

Column<b'CASE WHEN (trim(ProgramClassCD) IN (COM, PRC, PGI, PRO, PSA, MAG, LOC, SPO, MER, SOL)) THEN duration_seconds ELSE 0 END'>

Let's find the sum of the LogIdentifierID of these values.

In [22]:
full_log.groupby("LogIdentifierID")\
    .agg(F.sum(
        F.when(
            F.trim(F.col("ProgramClassCD")).isin(
                ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]), 
            F.col("Duration_seconds"), 
        ).otherwise(0)
    ).alias("duration_commercial")).show(5)

+---------------+-------------------+
|LogIdentifierID|duration_commercial|
+---------------+-------------------+
|           CJCO|              16672|
|          BRAVO|              22370|
|         HSTORM|               1607|
|            MMM|              21445|
|             CI|              22567|
+---------------+-------------------+
only showing top 5 rows



Nice! Let's now calculate the commercial ratio using the duration commercial and duration total.

In [23]:
answer = (
    full_log.groupby("LogIdentifierID")
    .agg(F.sum( 
        F.when( 
            F.trim(F.col("ProgramClassCD")).isin(
                ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]), 
            F.col("duration_seconds"), 
        ).otherwise(0)
    ).alias("duration_commercial"),
         F.sum("duration_seconds").alias("duration_total"),
        )
    .withColumn(
        "commercial_ratio", 
        F.col("duration_commercial") / F.col("duration_total")
    )
)

# Ordering the commercial ratio
answer.orderBy("commercial_ratio", ascending=False).show(50, False)

+---------------+-------------------+--------------+-------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio   |
+---------------+-------------------+--------------+-------------------+
|TELENO         |17790              |17790         |1.0                |
|TLNSP          |15480              |15480         |1.0                |
|HPITV          |13                 |13            |1.0                |
|CIMT           |775                |775           |1.0                |
|TRN            |13                 |13            |1.0                |
|TANG           |8125               |8125          |1.0                |
|MSET           |2700               |2700          |1.0                |
|MUSIMAX        |23333              |23582         |0.9894410991434145 |
|MMAX           |23333              |23582         |0.9894410991434145 |
|MUSIP          |20587              |20912         |0.9844586840091814 |
|MPLU           |20587              |20912         

## Handling Missing Data

Null values refer to the absence of value. To deal with missing data, you can use the `dropna` or `fillna` methods. Let's find only the records that have a commercial_ratio and that are non-null with the `dropna` and `subset` method.

In [35]:
answer_no_null = answer.dropna(subset=["commercial_ratio"])
answer_no_null.orderBy( "commercial_ratio", ascending=False).show()

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|  commercial_ratio|
+---------------+-------------------+--------------+------------------+
|         TELENO|              17790|         17790|               1.0|
|            TRN|                 13|            13|               1.0|
|           MSET|               2700|          2700|               1.0|
|          HPITV|                 13|            13|               1.0|
|           CIMT|                775|           775|               1.0|
|          TLNSP|              15480|         15480|               1.0|
|           TANG|               8125|          8125|               1.0|
|        MUSIMAX|              23333|         23582|0.9894410991434145|
|           MMAX|              23333|         23582|0.9894410991434145|
|     MUSIQUE PL|              20587|         20912|0.9844586840091814|
|          MUSIP|              20587|         20912|0.9844586840

### Filling Values

You can use the `fill` method to fill missing data. Let's set missing data to zero in answer table.

In [38]:
answer_no_null = answer.fillna(0)
answer_no_null.orderBy("commercial_ratio", ascending=False).show()

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|  commercial_ratio|
+---------------+-------------------+--------------+------------------+
|          HPITV|                 13|            13|               1.0|
|           TANG|               8125|          8125|               1.0|
|           CIMT|                775|           775|               1.0|
|          TLNSP|              15480|         15480|               1.0|
|           MSET|               2700|          2700|               1.0|
|            TRN|                 13|            13|               1.0|
|         TELENO|              17790|         17790|               1.0|
|        MUSIMAX|              23333|         23582|0.9894410991434145|
|           MMAX|              23333|         23582|0.9894410991434145|
|           MPLU|              20587|         20912|0.9844586840091814|
|          MUSIP|              20587|         20912|0.9844586840

Data manipulation is crucial step in data analysis. PySpark have power methods to perform data manipulation. In this notebook, I talked about data grouping and how to handle missing data. I showed you how to use the `groupby`, `agg`, `dropna`, and `fillna` methods.

## Resource

- Data Analysis with Python and Spark

Don't forget to follow us on [YouTube](http://youtube.com/tirendazacademy) | [Medium](http://tirendazacademy.medium.com) | [Twitter](http://twitter.com/tirendazacademy) | [GitHub](http://github.com/tirendazacademy) | [Linkedin](https://www.linkedin.com/in/tirendaz-academy) | [Kaggle](https://www.kaggle.com/tirendazacademy) 😎