#### What is tabular data:
* Two dimensional table in which data is organized into rows and columns. 
* PySpark's data frame structure maps naturally to tabular data.
* The data frame is *column-major* so its API focuses on manipulating columns to transform data. 
* PySpark operates on a whole data frame structure via methods such as `select ()`, `groupby ()` or on Column objects using a function like `split ()`.


In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/06/12 07:35:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Creates a list for grocery (to be used to create a PySpark Data Frame)
my_grocery_list = [
    ["Banana", 2, 1.74],
    ["Apple", 4, 2.04],
    ["Carrot", 1, 1.09],
    ["Cake", 1, 10.99],
]

In [3]:
# Creates PySpark Data Frame 
df_grocery_list = spark.createDataFrame(
    my_grocery_list, ["Item", "Quantity", "Price"]
)

In [4]:
df_grocery_list #.printSchema()

DataFrame[Item: string, Quantity: bigint, Price: double]

24/06/12 02:03:07 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 214844 ms exceeds timeout 120000 ms
24/06/12 02:03:07 WARN SparkContext: Killing executors is not supported by current scheduler.
24/06/12 02:03:11 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:643)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1057)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
	at sc

* Data Frame can be easily created using `.createDatFrame()` function. This function accepts two parameters. 
* First parameter is data itself. It can be in form of list of items, Pandas df, or a RDD. 
* Second parameter is the *schema* of the data frame. 
* Master node knows about the structure of the data frame but actual data is represented on the worker nodes. 
* We implement logic on abstract structure and master delegates the logic(or work) efficiently across cluster.



In [2]:
import os
directory = "/Users/u354769/Desktop/Ameya_Learning/DataAnalysisWithPythonAndPySpark/Book_Materials/Chapters/Chapter_4/Data"
logs = spark.read.csv(os.path.join(directory,"BroadcastLogs_2018_Q3_M8.CSV"),sep = "|", header = True, inferSchema = True, timestampFormat="yyyy-MM-dd",)

                                                                                

#### Parameters for `spark.read.csv ()` function:
* Path --> The only Mandatory parameter that contains path or paths for file(s) to be ingested.
* Delimited Parameter --> Optional row delimiter parameter. Default is comma (,).

In [3]:
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string 

#### Exercise 4.1
 Let’s take the following file, called sample.csv, which contains three columns:

 Item,Quantity,Price

 Banana, organic,1,0.99

 Pear,7,1.24

 Cake, chocolate,1,14.50

 Complete the following code to ingest the file successfully.
 
```python
sample = spark.read.csv([...],
                        sep=[...],
                        header=[...],
                        quote=[...],
                        inferSchema=[...]
)
```
(Note: If you want to test your code, sample.csv is available in the book’s repository under data/sample.csv/sample.csv).

In [63]:
import os
csv_file_path = "/Users/u354769/Desktop/Ameya_Learning/DataAnalysisWithPythonAndPySpark/Book_Materials/Chapters/Chapter_4/Data/sample1.csv"
sample = spark.read.csv(csv_file_path,
                        sep=",",
                        header=True,
                        quote="$",
                        inferSchema=True)

In [64]:
sample.show()

+---------------+--------+-----+
|           Item|Quantity|Price|
+---------------+--------+-----+
|Banana, organic|       1| 0.99|
|           Pear|       7| 1.24|
|Cake, chocolate|       1| 14.5|
+---------------+--------+-----+



In [65]:
sample.printSchema()

root
 |-- Item: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)



24/06/11 07:28:32 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 967930 ms exceeds timeout 120000 ms
24/06/11 07:28:32 WARN SparkContext: Killing executors is not supported by current scheduler.
24/06/11 07:36:29 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:643)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1057)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
	at sc

* Star schemas are common in the relational database world because of normalization, a process used to avoid duplicating pieces of data and improve data integrity.
* In Spark’s universe, we often prefer working with a single table instead of linking a multitude of tables to get the data. We call these denormalized tables, or, colloquially, fat tables.
* Normalized data has many advantages when you’re working with relational information.
* In addition to being easier to maintain, data normalization reduces the probability of getting anomalies or illogical records in the data. On the flip side, large-scale data systems sometimes embrace denormalized tables to avoid costly join operations.
* When dealing with analytics, a single table containing all the data is best.


#### Questions for Sean

* Creating many (100+) new columns using withColumns() will slow Spark down to a grind. If you need to create a lot of columns at once, use the select() approach. While it will generate the same work, it is less tasking on the query planner.
    Why?
    

In [5]:
# Using SELECT() in four different ways

# Using the string to column conversion (example 1)
logs.select("BroadCastLogID", "LogServiceID", "LogDate").show(5, False)

+--------------+------------+-------------------+
|BroadCastLogID|LogServiceID|LogDate            |
+--------------+------------+-------------------+
|1196192316    |3157        |2018-08-01 00:00:00|
|1196192317    |3157        |2018-08-01 00:00:00|
|1196192318    |3157        |2018-08-01 00:00:00|
|1196192319    |3157        |2018-08-01 00:00:00|
|1196192320    |3157        |2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 5 rows



In [7]:
# Using the string to column conversion (example 2)
logs.select(*["BroadCastLogID", "LogServiceID", "LogDate"]).show(5, False)

# prefixed * unpacks the container so that each element becomes a parameter of the function.

+--------------+------------+-------------------+
|BroadCastLogID|LogServiceID|LogDate            |
+--------------+------------+-------------------+
|1196192316    |3157        |2018-08-01 00:00:00|
|1196192317    |3157        |2018-08-01 00:00:00|
|1196192318    |3157        |2018-08-01 00:00:00|
|1196192319    |3157        |2018-08-01 00:00:00|
|1196192320    |3157        |2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 5 rows



In [10]:
# Using column objects
import pyspark.sql.functions as F
logs.select(F.col("BroadCastLogId"), F.col("LogServiceID"), F.col("LogDate")).show(5,False)

+--------------+------------+-------------------+
|BroadCastLogId|LogServiceID|LogDate            |
+--------------+------------+-------------------+
|1196192316    |3157        |2018-08-01 00:00:00|
|1196192317    |3157        |2018-08-01 00:00:00|
|1196192318    |3157        |2018-08-01 00:00:00|
|1196192319    |3157        |2018-08-01 00:00:00|
|1196192320    |3157        |2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 5 rows



In [None]:
#Using column object with * prefix
logs.select(*[F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")])


In [15]:
# Performing EDA, peeking at every column in groups of three. 
import numpy as np

column_split = np.array_split(np.array(logs.columns), len(logs.columns) // 3) # Splits logs.column array in group of 3's
print(column_split)

[array(['BroadcastLogID', 'LogServiceID', 'LogDate'], dtype='<U22'), array(['SequenceNO', 'AudienceTargetAgeID', 'AudienceTargetEthnicID'],
      dtype='<U22'), array(['CategoryID', 'ClosedCaptionID', 'CountryOfOriginID'], dtype='<U22'), array(['DubDramaCreditID', 'EthnicProgramID', 'ProductionSourceID'],
      dtype='<U22'), array(['ProgramClassID', 'FilmClassificationID', 'ExhibitionID'],
      dtype='<U22'), array(['Duration', 'EndTime', 'LogEntryDate'], dtype='<U22'), array(['ProductionNO', 'ProgramTitle', 'StartTime'], dtype='<U22'), array(['Subtitle', 'NetworkAffiliationID', 'SpecialAttentionID'],
      dtype='<U22'), array(['BroadcastOriginPointID', 'CompositionID', 'Producer1'],
      dtype='<U22'), array(['Producer2', 'Language1', 'Language2'], dtype='<U22')]


In [22]:
for x in column_split:
    logs.select(*x).show(5,False)

+--------------+------------+-------------------+
|BroadcastLogID|LogServiceID|LogDate            |
+--------------+------------+-------------------+
|1196192316    |3157        |2018-08-01 00:00:00|
|1196192317    |3157        |2018-08-01 00:00:00|
|1196192318    |3157        |2018-08-01 00:00:00|
|1196192319    |3157        |2018-08-01 00:00:00|
|1196192320    |3157        |2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 5 rows

+----------+-------------------+----------------------+
|SequenceNO|AudienceTargetAgeID|AudienceTargetEthnicID|
+----------+-------------------+----------------------+
|1         |4                  |null                  |
|2         |null               |null                  |
|3         |null               |null                  |
|4         |null               |null                  |
|5         |null               |null                  |
+----------+-------------------+----------------------+
only showing top 5 ro

In [20]:
column_split

[array(['BroadcastLogID', 'LogServiceID', 'LogDate'], dtype='<U22'),
 array(['SequenceNO', 'AudienceTargetAgeID', 'AudienceTargetEthnicID'],
       dtype='<U22'),
 array(['CategoryID', 'ClosedCaptionID', 'CountryOfOriginID'], dtype='<U22'),
 array(['DubDramaCreditID', 'EthnicProgramID', 'ProductionSourceID'],
       dtype='<U22'),
 array(['ProgramClassID', 'FilmClassificationID', 'ExhibitionID'],
       dtype='<U22'),
 array(['Duration', 'EndTime', 'LogEntryDate'], dtype='<U22'),
 array(['ProductionNO', 'ProgramTitle', 'StartTime'], dtype='<U22'),
 array(['Subtitle', 'NetworkAffiliationID', 'SpecialAttentionID'],
       dtype='<U22'),
 array(['BroadcastOriginPointID', 'CompositionID', 'Producer1'],
       dtype='<U22'),
 array(['Producer2', 'Language1', 'Language2'], dtype='<U22')]

In [23]:
# Deleting Columns
# Using drop() to get rid of columns not needed
# drop () returns a new data frame (like every method in PySpark).

logs = logs.drop("BroadCastLogID","SequenceNO")

print("BroadCastLogID" in logs.columns) # testing if column was dropped

False


#### Exercise 4.2

What is the printed result of this code?

sample_frame.columns # => ['item', 'price', 'quantity', 'UPC']

print(sample_frame.drop('item', 'UPC', 'prices').columns)

a ['item' 'UPC']

b ['item', 'upc']

c ['price', 'quantity'] <--- Answer

d ['price', 'quantity', 'UPC']

e Raises an error

In [24]:
# Creating new columns using withColumn() method.

logs.select("Duration").show(5, False)


+----------------+
|Duration        |
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 5 rows



In [25]:
print(logs.select("Duration").dtypes)

# PySpark doesn’t have a default type for time without dates or duration, so it kept the column as a string.

[('Duration', 'string')]


In [26]:
# Creating a new columns using substring function
logs.select(
    F.col("Duration"),
    F.col("Duration").substr(1, 2).cast("int").alias("dur_hours"),
    F.col("Duration").substr(4, 2).cast("int").alias("dur_minutes"),
    F.col("Duration").substr(7, 2).cast("int").alias("dur_seconds"),
).distinct().show(
    5
)

[Stage 29:>                                                         (0 + 8) / 8]

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:10:06.0000000|        0|         10|          6|
|00:10:37.0000000|        0|         10|         37|
|00:04:52.0000000|        0|          4|         52|
|00:26:41.0000000|        0|         26|         41|
|00:08:18.0000000|        0|          8|         18|
+----------------+---------+-----------+-----------+
only showing top 5 rows



                                                                                

In [28]:
# Creating a duration in seconds field from the Duration column using substring and cast functions

logs.select(
    F.col("Duration"),
    (F.col("Duration").substr(1,2).cast("int") * 60 * 60  # Casting to integer is necessary for multiplication operation to work.
    + F.col("Duration").substr(4,2).cast("int") * 60
    + F.col("Duration").substr(7,2).cast("int")).alias("Duration_seconds"),).distinct().show(5) # distinct () removes duplicate records.



+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|00:10:30.0000000|             630|
|00:25:52.0000000|            1552|
|00:28:08.0000000|            1688|
|06:00:00.0000000|           21600|
|00:32:08.0000000|            1928|
+----------------+----------------+
only showing top 5 rows



                                                                                

In [29]:
# Creating a duration in seconds field from the Duration column using withColumn
logs = logs.withColumn(
    "Duration_seconds",
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
), )
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

#### About `withColumn()`:
* Use `withColumn()` when all the columns from the data frame are needed along with the new columns. 
* unlike `select ()`, `withColumn()` automatically includes all the 'old' columns in new data frame created.
* If you create a column withColumn() and give it a name that already exists in your data frame, PySpark will happily overwrite the column.
* `select()` will be useful when you’re explicitly working with a few columns.
* When you need to create a few new ones without changing the rest of the data frame, `withColumn()` is preferred.
* Creating many (100+) new columns using   `withColumns()` will slow Spark down to a grind. Use `select ()` approach in this case.


In [30]:
# Renaming columns
logs = logs.withColumnRenamed("Duration_seconds", "duration_seconds")
logs.printSchema()



root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

In [3]:
#Exercise 4.3
# Reread the data in a logs_raw data frame (the data file is ./data/broadcast_logs- BroadcastLogs_2018_Q3_M8.CSV), 
# this time without passing any optional parameters. Print the first five rows of data, as well as the schema. 
# What are the differences in terms of data and schema between logs and logs_raw?

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os

spark = SparkSession.builder.getOrCreate()


csv_file_path = "/Users/u354769/Desktop/Ameya_Learning/DataAnalysisWithPythonAndPySpark/Book_Materials/Chapters/Chapter_4/Data/BroadcastLogs_2018_Q3_M8.csv"
logs_raw = spark.read.csv(csv_file_path,
                        # sep=",",
                        # header=True,
                        # quote="$",
                        # inferSchema=True
                        )
logs_raw.show(5, False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Data for all columns crammed into one single column.
# Header name is defaulted to _c0
# timestamp format includes micro-seconds



In [4]:

#Exercise 4.4
# Create a new data frame, logs_clean, that contains only the columns that do not end with ID.

logs = spark.read.csv(csv_file_path
                        , sep = "|"
                        , header = True
                        , inferSchema = True
                        , timestampFormat="yyyy-MM-dd",
                        )


                                                                                

In [5]:
logs_clean = logs.select(
    *[x for x in logs.columns if "ID" in x]
).show(5, False)


+--------------+------------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+--------------------+------------------+----------------------+-------------+
|BroadcastLogID|LogServiceID|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|
+--------------+------------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+--------------------+------------------+----------------------+-------------+
|1196192316    |3157        |4                  |null                  |13        |3              |3            

24/06/13 05:35:50 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 390603 ms exceeds timeout 120000 ms
24/06/13 05:35:50 WARN SparkContext: Killing executors is not supported by current scheduler.
24/06/13 05:37:13 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B