## Recursively delete all files and directories in Databricks file system (dbfs)


In [0]:
# %python
# # Below line will delete all contents in the directory xyz, including xyz, if that directory is present. will fail for a systems folders, which you are not allowed to delete. 
# dbutils.fs.rm('dbfs:/FileStore/xyz', True) 

## Creating a Spark DF from a list of lists

In [0]:
my_grocery_list = [ ["Banana", 2, 1.74], ["Apple", 4, 2.04], ["Carrot", 1, 1.09] ]
df_grocery_list = spark.createDataFrame(my_grocery_list, ["Item", "Quantity", "Price"] )
df_grocery_list.printSchema()

root
 |-- Item: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Price: double (nullable = true)



## Read a CSV file

In [0]:
# Assign directory to a variable to avoid repeating the entire path again and again
import os
DIRECTORY = "dbfs:/FileStore/tables/data"

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
# We are explicitly calling out a few parameters in the SparkReader object. There are I think over 20 different parameters, but these should suffice for now 
logs = spark.read.csv(
    path=os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8.CSV"),
    sep="|",
    header=True,
    inferSchema=True,
    timestampFormat="yyyy-MM-dd",
)

In [0]:
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string 

## select()

In [0]:
# Multiple ways to using select() to acheive the same result

# Using the string to column conversion
logs.select(  "BroadCastLogID", "LogServiceID", "LogDate")
logs.select(*["BroadCastLogID", "LogServiceID", "LogDate"])
 
# Passing the column object explicitly
logs.select(  F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate"))
logs.select(*[F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")])

Out[6]: DataFrame[BroadCastLogID: int, LogServiceID: int, LogDate: timestamp]

## drop()

In [0]:
logs = logs.drop("BroadcastLogID", "SequenceNO")

# Testing if we effectively got rid of the columns
 
print("BroadcastLogID" in logs.columns)  # => False
print("SequenceNo" in logs.columns)  # => False

False
False


In [0]:
# Replicate the above drop() operation with a select()
logs = logs.select(
    *[x for x in logs.columns if x not in ["BroadcastLogID", "SequenceNO"]]
)

## withColumn()

In [0]:
# The column "Duration" is inferred as a string
logs.select(F.col("Duration")).show(5)
 
print(logs.select(F.col("Duration")).dtypes) # [('Duration', 'string')]

+----------------+
|        Duration|
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 5 rows

[('Duration', 'string')]


In [0]:
# We use substr() and cast() to extract HH, MM, and SS
logs.select(
    F.col("Duration"),
    F.col("Duration").substr(1, 2).cast("int").alias("dur_hours"),
    F.col("Duration").substr(4, 2).cast("int").alias("dur_minutes"),
    F.col("Duration").substr(7, 2).cast("int").alias("dur_seconds"),
).distinct().show(5)

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:00:30.0000000|        0|          0|         30|
|00:01:00.0000000|        0|          1|          0|
|00:00:15.0000000|        0|          0|         15|
|02:00:00.0000000|        2|          0|          0|
|00:00:10.0000000|        0|          0|         10|
+----------------+---------+-----------+-----------+
only showing top 5 rows



In [0]:
# Instead of creating seperate columns for to extract HH, MM, and SS, creating one clumn called "Duration_seconds" by performing arithmetic on columns

logs.select(
    F.col("Duration"),
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ).alias("Duration_seconds"),
).distinct().show(5)

+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|00:00:30.0000000|              30|
|00:00:05.0000000|               5|
|00:01:00.0000000|              60|
|00:00:15.0000000|              15|
|02:00:00.0000000|            7200|
+----------------+----------------+
only showing top 5 rows



In [0]:
# Bringing it home - creating a new column using withColumn() method
# Note: until now, we have not modified the logs DF in this section, but here we are modifying it by assigning the outcome to the logs DataFrame
logs = logs.withColumn(
    "Duration_seconds",
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ),
)
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

## withColumnRenamed()

In [0]:
# Changing the column name in place from "Duration_seconds" to "duration_seconds"
logs = logs.withColumnRenamed("Duration_seconds", "duration_seconds")
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

## describe()

In [0]:
# logs.describe() # is not going to return anything
# logs.describe().show(5) # will also be messy, given so mant coulmns
# let's do it one column at a time and stop at thre columns
for counter,i in enumerate(logs.columns):
    if counter < 4:
        print(counter, i, logs.select(i).dtypes)
        logs.describe(i).show()
    else:
        break

0 LogServiceID [('LogServiceID', 'int')]
+-------+------------------+
|summary|      LogServiceID|
+-------+------------------+
|  count|            238945|
|   mean| 3450.890284375065|
| stddev|199.50673962554782|
|    min|              3157|
|    max|              3925|
+-------+------------------+

1 LogDate [('LogDate', 'timestamp')]
+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+

2 AudienceTargetAgeID [('AudienceTargetAgeID', 'int')]
+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|  count|              16112|
|   mean| 3.4929245283018866|
| stddev| 1.0415963394745122|
|    min|                  1|
|    max|                  4|
+-------+-------------------+

3 AudienceTargetEthnicID [('AudienceTargetEthnicID', 'int')]
+-------+----------------------+
|summary|AudienceTargetEthnicID|
+-------+----------------------+
|  count|                  1710|
|   mean|    120.56432748538012|
| stddev|     71.

## summary()

In [0]:
for counter,i in enumerate(logs.columns):
    if counter < 4:
        print(counter, i, logs.select(i).dtypes)
        logs.select(i).summary().show()
    else:
        break

0 LogServiceID [('LogServiceID', 'int')]
+-------+------------------+
|summary|      LogServiceID|
+-------+------------------+
|  count|            238945|
|   mean| 3450.890284375065|
| stddev|199.50673962554782|
|    min|              3157|
|    25%|              3287|
|    50%|              3379|
|    75%|              3627|
|    max|              3925|
+-------+------------------+

1 LogDate [('LogDate', 'timestamp')]
+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    25%|
|    50%|
|    75%|
|    max|
+-------+

2 AudienceTargetAgeID [('AudienceTargetAgeID', 'int')]
+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|  count|              16112|
|   mean| 3.4929245283018866|
| stddev| 1.0415963394745122|
|    min|                  1|
|    25%|                  4|
|    50%|                  4|
|    75%|                  4|
|    max|                  4|
+-------+-------------------+

3 AudienceTargetEthnicID [('Audience

In [0]:
# You can get additional percentiles too and get very specific about what you want to see as an output of summary()
for counter,i in enumerate(logs.columns):
    if counter < 4:
        print(counter, i, logs.select(i).dtypes)
        logs.select(i).summary("min", "10%", "90%", "max").show()
    else:
        break

0 LogServiceID [('LogServiceID', 'int')]
+-------+------------+
|summary|LogServiceID|
+-------+------------+
|    min|        3157|
|    10%|        3236|
|    90%|        3709|
|    max|        3925|
+-------+------------+

1 LogDate [('LogDate', 'timestamp')]
+-------+
|summary|
+-------+
|    min|
|    10%|
|    90%|
|    max|
+-------+

2 AudienceTargetAgeID [('AudienceTargetAgeID', 'int')]
+-------+-------------------+
|summary|AudienceTargetAgeID|
+-------+-------------------+
|    min|                  1|
|    10%|                  1|
|    90%|                  4|
|    max|                  4|
+-------+-------------------+

3 AudienceTargetEthnicID [('AudienceTargetEthnicID', 'int')]
+-------+----------------------+
|summary|AudienceTargetEthnicID|
+-------+----------------------+
|    min|                     4|
|    10%|                    74|
|    90%|                   258|
|    max|                   337|
+-------+----------------------+



## JOINS

In [0]:
# DIRECTORY = "dbfs:/FileStore/tables/data"
log_identifier = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)
 
log_identifier.printSchema()

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)



In [0]:
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1) # example of conditional filtering on a column
print(log_identifier.count())
log_identifier.show(5)

758
+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
+---------------+------------+---------+
only showing top 5 rows



In [0]:
# Create a DF with two identical column names by a JOIN operation. In this case, the column "LogServiceID"

logs_and_channels_verbose = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)
logs_and_channels_verbose.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

In [0]:
from pyspark.sql.utils import AnalysisException
try:
    logs_and_channels_verbose.select("LogServiceID")
except AnalysisException as err:
    print(err)

Reference 'LogServiceID' is ambiguous, could be: LogServiceID, LogServiceID.


## Naming Convention - Option 1 

In [0]:
logs_and_channels = logs.join(log_identifier, "LogServiceID")
logs_and_channels.printSchema()

# No LogServiceID here: PySpark kept only the first referred column

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

## Naming Convention - Option 2

In [0]:
# Using the origin name of the column for unambiguous selection
logs_and_channels = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)
logs_and_channels.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

In [0]:
# Drop the duplicate column 
logs_and_channels = logs_and_channels.drop(log_identifier["LogServiceID"])
logs_and_channels.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

## Naming Convention - Option 3

In [0]:
# The logs DF gets aliased as "left"
# The log_identifier DF gets aliased as "right"
# With these aliases, F.col() will now resolve "left" and "right" as a prefix for the column names

logs_and_channels_verbose = logs.alias("left").join(log_identifier.alias("right"),
    logs["LogServiceID"] == log_identifier["LogServiceID"]
)
logs_and_channels_verbose.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

In [0]:
logs_and_channels_verbose = logs_and_channels_verbose.drop(F.col("right.LogServiceID"))
logs_and_channels_verbose.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

## End-to-end JOINS example with other data files

In [0]:
# DIRECTORY = "dbfs:/FileStore/tables/data"

cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description") # We are aliasing the EnglishDescription column to remember what it maps to
)
 
cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description") # We are also aliasing here again
)
 
full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)

In [0]:
full_log.printSchema()

root
 |-- ProgramClassID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti