#Read a CSV file

In [0]:
import os
DIRECTORY = "dbfs:/FileStore/tables/data"

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
# We are explicitly calling out a few parameters in the SparkReader object. There are I think over 20 different parameters, but these should suffice for now 
logs = spark.read.csv(
    path=os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8.CSV"),
    sep="|",
    header=True,
    inferSchema=True,
    timestampFormat="yyyy-MM-dd",
)

In [0]:
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable 

####Q1) Create a DataFrame named "Program_DF" which consists of three columns: 
####"ProgramClassID", "ProgramTitle", "Duration".  Demonstrate creation of "Program_DF" in two ways - using a) select() and b) drop()

####Using a) select()

In [0]:
# Multiple ways to using select() to acheive the same result
 
# Using the string to column conversion
Program_DF = logs.select("ProgramClassID", "ProgramTitle", "Duration")
Program_DF = logs.select(*["ProgramClassID", "ProgramTitle", "Duration"])
 
# Passing the column object explicitly
Program_DF = logs.select(F.col("ProgramClassID"), F.col("ProgramTitle"), F.col("Duration"))
Program_DF = logs.select(*[F.col("ProgramClassID"), F.col("ProgramTitle"), F.col("Duration")])

In [0]:
Program_DF.show(10)

+--------------+--------------------+----------------+
|ProgramClassID|        ProgramTitle|        Duration|
+--------------+--------------------+----------------+
|            19|   Newlywed and Dead|02:00:00.0000000|
|            20|15-SPECIALTY CHAN...|00:00:30.0000000|
|             3|3-PROCTER & GAMBL...|00:00:15.0000000|
|             3|12-CREDIT KARMA-B...|00:00:15.0000000|
|             3|3-L'OREAL CANADA-...|00:00:15.0000000|
|             3|11-YUM! BRANDS-Ch...|00:00:15.0000000|
|             3|2-PIER 1 IMPORTS ...|00:00:30.0000000|
|             3|3-HAVAS EDGE-Trav...|00:00:15.0000000|
|             3|2-AUTOTRADER-Inte...|00:00:15.0000000|
|             3|11-SLEEP COUNTRY ...|00:00:15.0000000|
+--------------+--------------------+----------------+
only showing top 10 rows



####Using b) drop()

In [0]:
Program_DF = logs.drop("BroadcastLogID", "LogServiceID", "LogDate", "SequenceNO", "AudienceTargetAgeID", "AudienceTargetEthnicID", "CategoryID", "ClosedCaptionID", "CountryOfOriginID", "DubDramaCreditID", "EthnicProgramID", "ProductionSourceID", "FilmClassificationID", "ExhibitionID", "EndTime", "LogEntryDate", "ProductionNO", "StartTime", "Subtitle", "NetworkAffiliationID", "SpecialAttentionID", "BroadcastOriginPointID", "CompositionID", "Producer1", "Producer2", "Language1", "Language2")

In [0]:
Program_DF.show(10)

+--------------+----------------+--------------------+
|ProgramClassID|        Duration|        ProgramTitle|
+--------------+----------------+--------------------+
|            19|02:00:00.0000000|   Newlywed and Dead|
|            20|00:00:30.0000000|15-SPECIALTY CHAN...|
|             3|00:00:15.0000000|3-PROCTER & GAMBL...|
|             3|00:00:15.0000000|12-CREDIT KARMA-B...|
|             3|00:00:15.0000000|3-L'OREAL CANADA-...|
|             3|00:00:15.0000000|11-YUM! BRANDS-Ch...|
|             3|00:00:30.0000000|2-PIER 1 IMPORTS ...|
|             3|00:00:15.0000000|3-HAVAS EDGE-Trav...|
|             3|00:00:15.0000000|2-AUTOTRADER-Inte...|
|             3|00:00:15.0000000|11-SLEEP COUNTRY ...|
+--------------+----------------+--------------------+
only showing top 10 rows



####Q2) Using withColumn(), create a new column "Duration_Seconds" in "Program_DF". Using the orignal column "Duration", convert it into seconds as demonstrated in the notebook we covered in class.

#### 1st Approach

In [0]:
Program_DF.select("Duration").show(10)
print(Program_DF.select("Duration").dtypes) #[('Duration', 'string')]

+----------------+
|        Duration|
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 10 rows

[('Duration', 'string')]


In [0]:
Program_DF = Program_DF.withColumn('Duration_Seconds',
                                   (F.split(F.col('Duration'), ':')[0].cast('int') * 3600) +
                                   (F.split(F.col('Duration'), ':')[1].cast('int') * 60) +
                                   (F.split(F.col('Duration'), ':')[2].cast('int')))

In [0]:
Program_DF.show(10)

+--------------+----------------+--------------------+----------------+
|ProgramClassID|        Duration|        ProgramTitle|Duration_Seconds|
+--------------+----------------+--------------------+----------------+
|            19|02:00:00.0000000|   Newlywed and Dead|            7200|
|            20|00:00:30.0000000|15-SPECIALTY CHAN...|              30|
|             3|00:00:15.0000000|3-PROCTER & GAMBL...|              15|
|             3|00:00:15.0000000|12-CREDIT KARMA-B...|              15|
|             3|00:00:15.0000000|3-L'OREAL CANADA-...|              15|
|             3|00:00:15.0000000|11-YUM! BRANDS-Ch...|              15|
|             3|00:00:30.0000000|2-PIER 1 IMPORTS ...|              30|
|             3|00:00:15.0000000|3-HAVAS EDGE-Trav...|              15|
|             3|00:00:15.0000000|2-AUTOTRADER-Inte...|              15|
|             3|00:00:15.0000000|11-SLEEP COUNTRY ...|              15|
+--------------+----------------+--------------------+----------

In [0]:
Program_DF.printSchema()

root
 |-- ProgramClassID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- Duration_Seconds: integer (nullable = true)



#### 2nd Approach

In [0]:
Program_DF = Program_DF.withColumn(
    "Duration_seconds",
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ),
)

In [0]:
Program_DF.show(10)

+--------------+----------------+--------------------+----------------+
|ProgramClassID|        Duration|        ProgramTitle|Duration_seconds|
+--------------+----------------+--------------------+----------------+
|            19|02:00:00.0000000|   Newlywed and Dead|            7200|
|            20|00:00:30.0000000|15-SPECIALTY CHAN...|              30|
|             3|00:00:15.0000000|3-PROCTER & GAMBL...|              15|
|             3|00:00:15.0000000|12-CREDIT KARMA-B...|              15|
|             3|00:00:15.0000000|3-L'OREAL CANADA-...|              15|
|             3|00:00:15.0000000|11-YUM! BRANDS-Ch...|              15|
|             3|00:00:30.0000000|2-PIER 1 IMPORTS ...|              30|
|             3|00:00:15.0000000|3-HAVAS EDGE-Trav...|              15|
|             3|00:00:15.0000000|2-AUTOTRADER-Inte...|              15|
|             3|00:00:15.0000000|11-SLEEP COUNTRY ...|              15|
+--------------+----------------+--------------------+----------

In [0]:
Program_DF.printSchema()

root
 |-- ProgramClassID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- Duration_seconds: integer (nullable = true)

