## Analyzing Tabular Data with PySpark

In [1]:
from pyspark.sql import SparkSession
#By using getorCreate(), our program will work in interactive and batch mode. 
spark = (SparkSession.builder
         .appName ("Tabular Data 2022")
         .getOrCreate()
)

PySpark has multiple ways to *import tabular* data, but the two most popular are 
- the list of lists and the
- pandas data frame

In [2]:
# The list of Lists
my_grocery_list = [
["Banana", 2, 1.74],
["Apple", 4, 2.04],
["Carrot", 1, 1.09],
["Cake", 1, 10.99],
]

In [3]:
df_grocery_list = spark.createDataFrame(
my_grocery_list, ["Item", "Quantity", "Price"]
)
df_grocery_list.printSchema()

root
 |-- Item: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Price: double (nullable = true)



## Data Manipulation : Selecting, dropping, renaming, ordering, diagnosing

### Dataset Introduction
We use  CRTC (Canadian Radio-Television and Telecommunications Commission). Every broadcaster is mandated to provide a complete log of the programs and commercials showcased to the Canadian public. 
This gives us a lot of potential questions to answer, but we’ll select just one: 
What are the channels with the greatest and least proportion of commercials?

In [4]:
import os
DIRECTORY = "./Chap4/broadcast_logs"
logs = spark.read.csv(
    path= './Chap4/BroadcastLogs_2018_Q3_M8_sample.CSV', sep='|',header=True, inferSchema=True, timestampFormat="yyyy-MM-dd",
)
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string 

### Selecting Required Columns

At its simplest, select() can take one or more column objects—or strings representing column names—and return a data frame containing only the listed columns.

#### Selecting 5 rows of 3 selected Columns

In [5]:
import pyspark.sql.functions as F
logs.select(
F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")
).show(10, False)

+--------------+------------+-------------------+
|BroadCastLogID|LogServiceID|LogDate            |
+--------------+------------+-------------------+
|1196192316    |3157        |2018-08-01 00:00:00|
|1196192317    |3157        |2018-08-01 00:00:00|
|1196192318    |3157        |2018-08-01 00:00:00|
|1196192319    |3157        |2018-08-01 00:00:00|
|1196192320    |3157        |2018-08-01 00:00:00|
|1196192321    |3157        |2018-08-01 00:00:00|
|1196192322    |3157        |2018-08-01 00:00:00|
|1196192323    |3157        |2018-08-01 00:00:00|
|1196192324    |3157        |2018-08-01 00:00:00|
|1196192325    |3157        |2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 10 rows



### See every column in groups of three using Select() 

In [6]:
import numpy as np
column_split = np.array_split(
np.array(logs.columns), len(logs.columns) // 3
)
print(column_split)

[array(['BroadcastLogID', 'LogServiceID', 'LogDate'], dtype='<U22'), array(['SequenceNO', 'AudienceTargetAgeID', 'AudienceTargetEthnicID'],
      dtype='<U22'), array(['CategoryID', 'ClosedCaptionID', 'CountryOfOriginID'], dtype='<U22'), array(['DubDramaCreditID', 'EthnicProgramID', 'ProductionSourceID'],
      dtype='<U22'), array(['ProgramClassID', 'FilmClassificationID', 'ExhibitionID'],
      dtype='<U22'), array(['Duration', 'EndTime', 'LogEntryDate'], dtype='<U22'), array(['ProductionNO', 'ProgramTitle', 'StartTime'], dtype='<U22'), array(['Subtitle', 'NetworkAffiliationID', 'SpecialAttentionID'],
      dtype='<U22'), array(['BroadcastOriginPointID', 'CompositionID', 'Producer1'],
      dtype='<U22'), array(['Producer2', 'Language1', 'Language2'], dtype='<U22')]


In [7]:
for x in column_split:
    logs.select(*x).show(5, False)

+--------------+------------+-------------------+
|BroadcastLogID|LogServiceID|LogDate            |
+--------------+------------+-------------------+
|1196192316    |3157        |2018-08-01 00:00:00|
|1196192317    |3157        |2018-08-01 00:00:00|
|1196192318    |3157        |2018-08-01 00:00:00|
|1196192319    |3157        |2018-08-01 00:00:00|
|1196192320    |3157        |2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 5 rows

+----------+-------------------+----------------------+
|SequenceNO|AudienceTargetAgeID|AudienceTargetEthnicID|
+----------+-------------------+----------------------+
|1         |4                  |null                  |
|2         |null               |null                  |
|3         |null               |null                  |
|4         |null               |null                  |
|5         |null               |null                  |
+----------+-------------------+----------------------+
only showing top 5 ro

### Droping Unncessery Columns

- BroadCastLogID is the primary key of the table and will not serve us in answering our questions
- SequenceNo is a sequence number and won’t be useful either

In [8]:
logs = logs.drop("BroadcastLogID", "SequenceNO")
#logs = logs.select(
#*[x for x in logs.columns if x not in ["BroadcastLogID", "SequenceNO"]]
#)

In [9]:
print("BroadcastLogID" in logs.columns)

False


- Just like select(), drop() takes a *cols and returns a data frame, excluding the columns passed as parameters. 
- Just like every other method in PySpark, drop() returns a new data frame
- Dropping a nonexistent column is a no-op

### Creating new Columns Iteratively

In [10]:
logs.select(F.col("Duration")).show(5)

+----------------+
|        Duration|
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 5 rows



- PySpark doesn’t have a default type for time without dates or duration, so it kept the column as a string
- We can verify the type via dtypes attribute

In [14]:
logs.select(F.col('Duration')).dtypes

[('Duration', 'string')]

#### Lets Create multiple new columns from Duration

In [11]:
logs.select(F.col('Duration'), F.col("Duration").substr(7, 2).cast("int")).show(5)

+----------------+--------------------------------------+
|        Duration|CAST(substring(Duration, 7, 2) AS INT)|
+----------------+--------------------------------------+
|02:00:00.0000000|                                     0|
|00:00:30.0000000|                                    30|
|00:00:15.0000000|                                    15|
|00:00:15.0000000|                                    15|
|00:00:15.0000000|                                    15|
+----------------+--------------------------------------+
only showing top 5 rows



- The substr() method takes two parameters. 
   - The first gives the position of where the sub-string
   - The second gives the length of the sub-string we want to extract in a number of characters
- The function application returns a string Column that is casted to an Integer via the cast() method

In [13]:
logs.select(
F.col("Duration"),
F.col("Duration").substr(0, 2).cast("int").alias("dur_hours"),
F.col("Duration").substr(4, 2).cast("int").alias("dur_minutes"),
F.col("Duration").substr(7, 2).cast("int").alias("dur_seconds"),
).distinct().show(5)
# distinct() is used to avoid seeing identical occurrences that would provide no additional information when displayed

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:04:52.0000000|        0|          4|         52|
|00:10:06.0000000|        0|         10|          6|
|00:09:52.0000000|        0|          9|         52|
|00:04:26.0000000|        0|          4|         26|
|00:14:59.0000000|        0|         14|         59|
+----------------+---------+-----------+-----------+
only showing top 5 rows



#### Creating a duration in Seconds field

In [14]:
logs.select(
F.col("Duration"),
(
F.col("Duration").substr(0, 2).cast("int") * 60 * 60
+ F.col("Duration").substr(4, 2).cast("int") * 60
    + F.col("Duration").substr(7, 2).cast("int")
).alias("Duration_seconds"),
).distinct().show(5)

+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|01:59:30.0000000|            7170|
|00:31:00.0000000|            1860|
|00:28:08.0000000|            1688|
|00:32:00.0000000|            1920|
|00:30:00.0000000|            1800|
+----------------+----------------+
only showing top 5 rows



In [15]:
# Creating a new Column with withColumn()
logs = logs.withColumn(
"Duration_seconds",(
F.col("Duration").substr(1, 2).cast("int") * 60 * 60
+ F.col("Duration").substr(4, 2).cast("int") * 60
+ F.col("Duration").substr(7, 2).cast("int")
))
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

If you create a column withColumn() and give it a name that
already exists in your data frame, PySpark will happily overwrite the column

In [16]:
logs.select(F.col('Duration'), F.col('Duration_seconds')).show(10)

+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|02:00:00.0000000|            7200|
|00:00:30.0000000|              30|
|00:00:15.0000000|              15|
|00:00:15.0000000|              15|
|00:00:15.0000000|              15|
|00:00:15.0000000|              15|
|00:00:30.0000000|              30|
|00:00:15.0000000|              15|
|00:00:15.0000000|              15|
|00:00:15.0000000|              15|
+----------------+----------------+
only showing top 10 rows



## Renaming and Reordering Columns

In [17]:
logs = logs.withColumnRenamed("Duration_seconds", "duration_seconds")
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

### batch lower-casing using toDF()

In [24]:
#toDF() returns a new data frame with the new columns
logs.toDF(*[x.lower() for x in logs.columns]).printSchema()

root
 |-- logserviceid: integer (nullable = true)
 |-- logdate: timestamp (nullable = true)
 |-- audiencetargetageid: integer (nullable = true)
 |-- audiencetargetethnicid: integer (nullable = true)
 |-- categoryid: integer (nullable = true)
 |-- closedcaptionid: integer (nullable = true)
 |-- countryoforiginid: integer (nullable = true)
 |-- dubdramacreditid: integer (nullable = true)
 |-- ethnicprogramid: integer (nullable = true)
 |-- productionsourceid: integer (nullable = true)
 |-- programclassid: integer (nullable = true)
 |-- filmclassificationid: integer (nullable = true)
 |-- exhibitionid: integer (nullable = true)
 |-- duration: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- logentrydate: timestamp (nullable = true)
 |-- productionno: string (nullable = true)
 |-- programtitle: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- subtitle: string (nullable = true)
 |-- networkaffiliationid: integer (nullable = true)
 |-- specialattenti

### Reordering Columns
Since reordering columns is equivalent to selecting columns in a different order, select() is the perfect method for the job

In [18]:
logs= logs.select(sorted(logs.columns)).printSchema()

root
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- BroadcastOriginPointID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CompositionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- Language1: integer (nullable = true)
 |-- Language2: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- Producer1: string (nullable = true)
 |-- Producer2: string (nullable = true)
 |-- ProductionNO: 

## Understanding Data with describe() and Summary ()

- describe() will show summary statistics (count, mean, standard deviation, min, and max) on all numerical and string columns. To avoid screen overflow, I display the column descriptions one by one by iterating over the list of columns.
- describe() will (lazily) compute the data frame but won’t display it, just like any transformation, so we have to show() the result

In [19]:
for i in logs.columns:
    logs.describe(i).show()

+-------+------------------+
|summary|      LogServiceID|
+-------+------------------+
|  count|            238945|
|   mean| 3450.890284375065|
| stddev|199.50673962554765|
|    min|              3157|
|    max|              3925|
+-------+------------------+

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+

+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|  count|              16112|
|   mean| 3.4929245283018866|
| stddev| 1.0415963394745125|
|    min|                  1|
|    max|                  4|
+-------+-------------------+

+-------+----------------------+
|summary|AudienceTargetEthnicID|
+-------+----------------------+
|  count|                  1710|
|   mean|    120.56432748538012|
| stddev|      71.9869405943613|
|    min|                     4|
|    max|                   337|
+-------+----------------------+

+-------+------------------+
|summary|        CategoryID|
+-------+-----------

- Where describe() will take *cols as a parameter (one or more columns, the same way as select() or drop()), summary() will take *statistics as a parameter
- By default, summary() shows everything describe() shows, adding the approximate 25-50% and 75% percentile

In [20]:
for i in logs.columns:
    logs.select(i).summary().show()

+-------+------------------+
|summary|      LogServiceID|
+-------+------------------+
|  count|            238945|
|   mean| 3450.890284375065|
| stddev|199.50673962554765|
|    min|              3157|
|    25%|              3287|
|    50%|              3379|
|    75%|              3627|
|    max|              3925|
+-------+------------------+

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    25%|
|    50%|
|    75%|
|    max|
+-------+

+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|  count|              16112|
|   mean| 3.4929245283018866|
| stddev| 1.0415963394745125|
|    min|                  1|
|    25%|                  4|
|    50%|                  4|
|    75%|                  4|
|    max|                  4|
+-------+-------------------+

+-------+----------------------+
|summary|AudienceTargetEthnicID|
+-------+----------------------+
|  count|                  1710|
|   mean|    120.56432748538012|
| st

In [21]:
for i in logs.columns:
    logs.select(i).summary("min", "10%", "90%", "max").show()

+-------+------------+
|summary|LogServiceID|
+-------+------------+
|    min|        3157|
|    10%|        3236|
|    90%|        3709|
|    max|        3925|
+-------+------------+

+-------+
|summary|
+-------+
|    min|
|    10%|
|    90%|
|    max|
+-------+

+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|    min|                  1|
|    10%|                  1|
|    90%|                  4|
|    max|                  4|
+-------+-------------------+

+-------+----------------------+
|summary|AudienceTargetEthnicID|
+-------+----------------------+
|    min|                     4|
|    10%|                    74|
|    90%|                   258|
|    max|                   337|
+-------+----------------------+

+-------+----------+
|summary|CategoryID|
+-------+----------+
|    min|         1|
|    10%|         3|
|    90%|        29|
|    max|        29|
+-------+----------+

+-------+---------------+
|summary|ClosedCaptionID|
+------

describe() and summary() are two very useful methods, but they are not meant to be used for anything other than quickly peeking at data during development. The PySpark developers don’t guarantee that the output will look the same from version to version, so if you need one of the outputs for your program, use the corresponding function in pyspark.sql.functions. They’re all there

- PySpark can infer the schema of a CSV file by setting the inferSchema optional parameter to True. PySpark accomplishes this by reading the data twice: once for setting the appropriate types for each column and once to ingest the data in the inferred format. 
- Tabular data is represented in a data frame in a series of columns, each having a name and a type. Since the data frame is a column-major data structure, the concept of rows is less relevant.
- You can use Python code to explore the data efficiently, using the column list as any Python list to expose the elements of the data frame of interest. 
- The most common operations on a data frame are the selection, deletion, and creation of columns. In PySpark, the methods used are select(), drop(), and withColumn(), respectively.
- select can be used for column reordering by passing a reordered list of columns. 
- You can rename columns one by one with the withColumnRenamed() method, or all at once by using the toDF() method.
- You can display a summary of the columns with the describe() or summary() methods. describe() has a fixed set of metrics, while summary() will take functions as parameters and apply them to all columns.