# Chapter 5 Data Frame Gymnastics: Joining and Group
This chapter covers
- Joining two data frames together
- Selecting the right type of join for your use case
- Grouping data and understanding the GroupedData transitional object
- Breaking the GroupedData with an aggregation method
- Filling null values in your data frame


## Start a spark session and import logs and logs identifier table

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os

# change the account name to your email account
account='sli'

# define a root path to access the data in the DataAnalysisWithPythonAndPySpark
root_path='/net/clusterhn/home/'+account+'/isa460/data/'

spark = (SparkSession.builder.appName("Analyzing tabluar data")
        .config("spark.port.maxRetries", "100")
        .getOrCreate())

# confiture the log level (defaulty is WWARN)
spark.sparkContext.setLogLevel('ERROR')

# import log file
directory=root_path+'/broadcast_logs/'

logs=spark.read.csv(os.path.join(directory, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
                                 sep="|",
                                 header=True,
                                 inferSchema=True,
                                 timestampFormat="yyyy-MM-dd",)

# add Duration seconds column
logs=logs.withColumn("Duration_seconds", F.col("Duration").substr(1,2).cast("int").alias("dur_hours")*60*60+ 
            F.col("Duration").substr(4,2).cast("int").alias("dur_minutes")*60+
            F.col("Duration").substr(7,2).cast("int").alias("dur_seconds"))

In [None]:
# import log identifier table. We only want primary channel (PrimaryFG is 1)

log_identifier=spark.read.csv(os.path.join(directory, "ReferenceTables/LogIdentifier.csv"),
                                 sep="|",
                                 header=True,
                                 inferSchema=True).filter(F.col('PrimaryFG')==1)
log_identifier.printSchema()

In [None]:
log_identifier.show(5)

## Joining tables
- Type of join: inner, left, right, full/outer, left_semi, left_anti, cross

- A left semi-join (how="left_semi") is the same as an inner join, but keeps the columns in the left table.
- A left anti-join (how="left_anti") is the opposite of an inner join. It will keep only the records from the left table that do not match the predicate with any record in the right table

In [None]:
# join logs and channels tables. inner join.

logs_and_channels=logs.join(log_identifier, on="LogServiceID", how="inner")

## Naming conventions in the joning world

In [None]:
# what happens if we join the tables with same column name

logs_and_channels_verbose = logs.join(log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"])

logs_and_channels_verbose.printSchema()

In [None]:
from pyspark.sql.utils import AnalysisException

try:
    logs_and_channels_verbose.select("LogServiceID")
except AnalysisException as err:
    print(err)   

### Note:
PySpark happily joins the two data frames but fails when we try to work with the ambiguous column. This is a common situation when working with data that follows the same convention for column naming. To solve this problem, in this section I show three methods, from the easiest to the most general.

In [None]:
# method 1. 
# use the following join. PySpark kept only the first referred column

logs_and_channels = logs.join(log_identifier, "LogServiceID")
 
logs_and_channels.printSchema()

In [None]:
# method 2. Refer each column by adding table name. Drop one of the columns with the same name.

logs_and_channels_verbose = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)
 
logs_and_channels.drop(log_identifier["LogServiceID"]).select(
    "LogServiceID") 

In [None]:
# method 3. alias() our tables when performing the join

logs_and_channels_verbose = logs.alias("left").join(
    log_identifier.alias("right"),                     
    logs["LogServiceID"] == log_identifier["LogServiceID"],
)
 
logs_and_channels_verbose.drop(F.col("right.LogServiceID")).select(
    "LogServiceID"
)             

### join two more tables

we will link two additional tables to continue our data discovery and processing. The CategoryID table contains information about the types of programs, and the ProgramClassID table contains the data that allows us to pinpoint the commercials.

In [None]:
# import log file
directory=root_path+'/broadcast_logs/'

logs=spark.read.csv(os.path.join(directory, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
                                 sep="|",
                                 header=True,
                                 inferSchema=True,
                                 timestampFormat="yyyy-MM-dd",)

# add Duration second column
logs=logs.withColumn("Duration_seconds", F.col("Duration").substr(1,2).cast("int").alias("dur_hours")*60*60+ 
            F.col("Duration").substr(4,2).cast("int").alias("dur_minutes")*60+
            F.col("Duration").substr(7,2).cast("int").alias("dur_seconds"))

# import log identifier
log_identifier=spark.read.csv(os.path.join(directory, "ReferenceTables/LogIdentifier.csv"),
                                 sep="|",
                                 header=True,
                                 inferSchema=True).filter(F.col('PrimaryFG')==1)

# join log and channel
logs_and_channels = logs.join(log_identifier, "LogServiceID")

# import category table and select needed columns
cd_category = spark.read.csv(
    os.path.join(directory, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"), 
)

# import program class and select needed columns
cd_program_class = spark.read.csv(
    os.path.join(directory, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)

# join log and channels tables with cd_category and cd_program_class tables

full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left")

## Summarizing the data via groupBy and GroupedData
**what are the channels with the greatest and least proportion of commercials?**

list of commercial codes: "COM", "PRC", "PGI", "PRO", "PSA", "MAG", "LOC", "SPO", "MER", "SOL"

In [None]:
# display average program duration by program class

(full_log.groupBy("ProgramClassCD", "ProgramClass_Description")
 .agg(F.sum("Duration_seconds").alias("duration_total"))
 .orderBy("duration_total",ascending=False)
).show(5, False)

In [None]:
log1=full_log.filter(F.col("ProgramClassCD").isin(["COM", "PRC", "PGI", "PRO", "PSA", "MAG", "LOC", "SPO", "MER", "SOL"]))

#log1=full_log.filter(F.trim(F.col("ProgramClassCD")).isin(["COM", "PRC", "PGI", "PRO", "PSA", "MAG", "LOC", "SPO", "MER", "SOL"]))

log1.select("ProgramClassCD", "ProgramClass_Description").distinct().show(100,False)

In [None]:
answer = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(                                                              
            F.when(                                                        
                F.trim(F.col("ProgramClassCD")).isin(                       
                    ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
                ),                                                          
                F.col("duration_seconds"),                                  
            ).otherwise(0)                                                  
        ).alias("duration_commercial"),                                     
        F.sum("duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio", F.col(
            "duration_commercial") / F.col("duration_total")
    )
)

answer.show()

In [None]:
# channels with the most commercial

answer.orderBy(F.desc("commercial_ratio")).show(10)

In [None]:
# channel with the least commerical

answer.orderBy("commercial_ratio").show(10)

## Deal with null values
dropna(), fillna()


### dropna()
dropna() is pretty easy to use. This data frame method takes three parameters:

- how, which can take the value any or all. If any is selected, PySpark will drop records where at least one of the fields is null. In the case of all, only the records where all fields are null will be removed. By default, PySpark will take the any mode.

- thresh takes an integer value. If set (its default is None), PySpark will ignore the how parameter and only drop the records with less than thresh non-null values.

- subset will take an optional list of columns that dropna() will use to make its decision.

In [None]:
# drop the records that have commerical_ratio is null

answer_no_null=answer.dropna(subset=["commercial_ratio"])

In [None]:
answer_no_null.orderBy("commercial_ratio").show(10)

### fillna()

This data frame method takes two parameters:

- The value, which is a Python int, float, string, or bool. PySpark will only fill the compatible columns; for instance, if we were to fillna("zero"), our commercial_ratio, being a double, would not be filled.
- The same subset parameter we encountered in dropna(). We can limit the scope of our filling to only the columns we want.

In [None]:
nswer_no_null = answer.fillna(0)
 
answer_no_null.orderBy(
    "commercial_ratio", ascending=False).show(5, False)

In [None]:
# Alternative method 
#Filling our numerical records with zero using the fillna() method and a dict

answer_no_null = answer.fillna(
    {"duration_commercial": 0, "duration_total": 0, "commercial_ratio": 0}
)

## Putting everyting together: develop an end-to-end program

In [None]:
#  commercials.py #############################################################
#
# This program computes the commercial ratio for each channel present in the
# dataset.
#
###############################################################################

import os

import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(
    "Getting the Canadian TV channels with the highest/lowest proportion of commercials."
).getOrCreate()

spark.sparkContext.setLogLevel("WARN")

###############################################################################
# Reading all the relevant data sources
###############################################################################

# change the account name to your email account
account='sli'

# define a root path to access the data in the DataAnalysisWithPythonAndPySpark
root_path='/net/clusterhn/home/'+account+'/isa460/Data/'

DIRECTORY = root_path+'/broadcast_logs/'

logs = spark.read.csv(
    os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
    sep="|",
    header=True,
    inferSchema=True,
    timestampFormat="yyyy-MM-dd"
)

log_identifier = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)

cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)

cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)

###############################################################################
# Data processing
###############################################################################

logs = logs.drop("BroadcastLogID", "SequenceNO")

logs = logs.withColumn(
    "duration_seconds",
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ),
)

log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)

logs_and_channels = logs.join(log_identifier, "LogServiceID")

full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)

full_log.groupby("LogIdentifierID").agg(
    F.sum(
        F.when(
            F.trim(F.col("ProgramClassCD")).isin(
                ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
            ),
            F.col("duration_seconds"),
        ).otherwise(0)
    ).alias("duration_commercial"),
    F.sum("duration_seconds").alias("duration_total"),
).withColumn(
    "commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
).orderBy(
    "commercial_ratio", ascending=False
).show(
    10, False
)

## In Class Exercise

### Exercise 5.5

Using the data from the data/broadcast_logs/Call_Signs.csv (careful: the delimiter here is the comma, not the pipe!), add the Undertaking_Name to our final table to display a human-readable description of the channel.

### Exercise 5.6

The government of Canada is asking for your analysis, but they’d like the PRC to be weighted differently. They’d like each PRC second to be considered 0.75 commercial seconds. Modify the program to account for this change.