In [4]:
from pathlib import Path
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [20]:
!pip install numpy

Collecting numpy
  Downloading numpy-2.1.1-cp312-cp312-win_amd64.whl.metadata (59 kB)
     ---------------------------------------- 0.0/59.7 kB ? eta -:--:--
     ---------------------------------- ----- 51.2/59.7 kB 1.3 MB/s eta 0:00:01
     ---------------------------------------- 59.7/59.7 kB 1.1 MB/s eta 0:00:00
Downloading numpy-2.1.1-cp312-cp312-win_amd64.whl (12.6 MB)
   ---------------------------------------- 0.0/12.6 MB ? eta -:--:--
   - -------------------------------------- 0.4/12.6 MB 8.9 MB/s eta 0:00:02
   -- ------------------------------------- 0.9/12.6 MB 9.7 MB/s eta 0:00:02
   ---- ----------------------------------- 1.5/12.6 MB 11.6 MB/s eta 0:00:01
   ------ --------------------------------- 2.0/12.6 MB 10.5 MB/s eta 0:00:02
   -------- ------------------------------- 2.5/12.6 MB 10.8 MB/s eta 0:00:01
   --------- ------------------------------ 3.0/12.6 MB 11.3 MB/s eta 0:00:01
   ----------- ---------------------------- 3.5/12.6 MB 11.3 MB/s eta 0:00:01
   -----


[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [12]:
data_directory = Path("__file__").resolve().parent.parent.parent / 'data'

In [3]:
spark = SparkSession.builder.getOrCreate()

In [13]:
my_grocery_list = [
    ['Banana', 2, 1.74],
    ['Apple', 4, 2.04],
    ['Carrot', 1, 1.09],
    ['Cake', 1, 10.99],
]

df_grocery_list = spark.createDataFrame(
    my_grocery_list, ['Item', 'Quantity', 'Price']
)

df_grocery_list.printSchema()

root
 |-- Item: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Price: double (nullable = true)



In [16]:
data_file = str(data_directory / 'broadcast_logs' / 'BroadcastLogs_2018_Q3_M8_sample.CSV')

In [17]:
logs = spark.read.csv(
    data_file,
    sep='|',
    header=True,
    inferSchema=True,
    timestampFormat='yyyy-MM-dd',
)

In [18]:
logs.select('BroadcastLogID', 'LogServiceID', 'LogDate').show(5, False)

+--------------+------------+----------+
|BroadcastLogID|LogServiceID|LogDate   |
+--------------+------------+----------+
|1196192316    |3157        |2018-08-01|
|1196192317    |3157        |2018-08-01|
|1196192318    |3157        |2018-08-01|
|1196192319    |3157        |2018-08-01|
|1196192320    |3157        |2018-08-01|
+--------------+------------+----------+
only showing top 5 rows



In [21]:
import numpy as np

column_split = np.array_split(
    np.array(logs.columns), len(logs.columns) // 3
)

print(column_split)

for x in column_split:
    logs.select(*x).show(5, False)

[array(['BroadcastLogID', 'LogServiceID', 'LogDate'], dtype='<U22'), array(['SequenceNO', 'AudienceTargetAgeID', 'AudienceTargetEthnicID'],
      dtype='<U22'), array(['CategoryID', 'ClosedCaptionID', 'CountryOfOriginID'], dtype='<U22'), array(['DubDramaCreditID', 'EthnicProgramID', 'ProductionSourceID'],
      dtype='<U22'), array(['ProgramClassID', 'FilmClassificationID', 'ExhibitionID'],
      dtype='<U22'), array(['Duration', 'EndTime', 'LogEntryDate'], dtype='<U22'), array(['ProductionNO', 'ProgramTitle', 'StartTime'], dtype='<U22'), array(['Subtitle', 'NetworkAffiliationID', 'SpecialAttentionID'],
      dtype='<U22'), array(['BroadcastOriginPointID', 'CompositionID', 'Producer1'],
      dtype='<U22'), array(['Producer2', 'Language1', 'Language2'], dtype='<U22')]
+--------------+------------+----------+
|BroadcastLogID|LogServiceID|LogDate   |
+--------------+------------+----------+
|1196192316    |3157        |2018-08-01|
|1196192317    |3157        |2018-08-01|
|1196192318    |

In [22]:
logs = logs.drop('BroadcastLogID', 'SequenceNO')

print('BroadcastLogID' in logs.columns)

False


In [23]:
logs.select(F.col('Duration')).show(5)

print(logs.select(F.col('Duration')).dtypes)

+----------------+
|        Duration|
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 5 rows

[('Duration', 'string')]


In [24]:
logs.select(
    F.col('Duration'),
    F.col('Duration').substr(1,2).cast('int').alias('dur_hours'),
    F.col('Duration').substr(4,2).cast('int').alias('dur_minutes'),
    F.col('Duration').substr(7,2).cast('int').alias('dur_seconds'),
).distinct().show(
    5
)

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:04:52.0000000|        0|          4|         52|
|00:10:06.0000000|        0|         10|          6|
|00:09:52.0000000|        0|          9|         52|
|00:04:26.0000000|        0|          4|         26|
|00:14:59.0000000|        0|         14|         59|
+----------------+---------+-----------+-----------+
only showing top 5 rows



In [26]:
logs.select(
    F.col('Duration'),
    (
        F.col('Duration').substr(1,2).cast('int')*60*60
        + F.col('Duration').substr(4,2).cast('int')*60
        + F.col('Duration').substr(7,2).cast('int')
    ).alias('Duration_seconds')
).distinct().show(
    5
)

+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|01:59:30.0000000|            7170|
|00:31:00.0000000|            1860|
|00:28:08.0000000|            1688|
|00:32:00.0000000|            1920|
|00:30:00.0000000|            1800|
+----------------+----------------+
only showing top 5 rows

