# Reader & Writer
##### Objectives
1. Read from CSV files
1. Read from JSON files
1. Write DataFrame to files
1. Write DataFrame to tables
1. Write DataFrame to a Delta table

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html" target="_blank">DataFrameReader</a>: **`csv`**, **`json`**, **`option`**, **`schema`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.html" target="_blank">DataFrameWriter</a>: **`mode`**, **`option`**, **`parquet`**, **`format`**, **`saveAsTable`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StructType.html?highlight=structtype#pyspark.sql.types.StructType" target="_blank">StructType</a>: **`toDDL`**

##### Spark Types
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/data_types.html" target="_blank">Types</a>: **`ArrayType`**, **`DoubleType`**, **`IntegerType`**, **`LongType`**, **`StringType`**, **`StructType`**, **`StructField`**

In [0]:
%run ../Includes/Classroom-Setup

## DataFrameReader
Interface used to load a DataFrame from external storage systems

**`spark.read.parquet("path/to/files")`**

DataFrameReader is accessible through the SparkSession attribute **`read`**. This class includes methods to load DataFrames from different external storage systems.

### Read from CSV files
Read from CSV with the DataFrameReader's **`csv`** method and the following options:

Tab separator, use first line as header, infer schema

In [0]:
users_csv_path = f"{DA.paths.datasets}/ecommerce/users/users-500k.csv"

users_df = (spark
           .read
           .option("sep", "\t")
           .option("header", True)
           .option("inferSchema", True)
           .csv(users_csv_path)
          )

users_df.printSchema()

Spark's Python API also allows you to specify the DataFrameReader options as parameters to the **`csv`** method

In [0]:
users_df = (spark
           .read
           .csv(users_csv_path, sep="\t", header=True, inferSchema=True)
          )

users_df.printSchema()

Manually define the schema by creating a **`StructType`** with column names and data types

In [0]:
from pyspark.sql.types import LongType, StringType, StructType, StructField

user_defined_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("user_first_touch_timestamp", LongType(), True),
    StructField("email", StringType(), True)
])

Read from CSV using this user-defined schema instead of inferring the schema

In [0]:
users_df = (spark
           .read
           .option("sep", "\t")
           .option("header", True)
           .schema(user_defined_schema)
           .csv(users_csv_path)
          )

Alternatively, define the schema using <a href="https://en.wikipedia.org/wiki/Data_definition_language" target="_blank">data definition language (DDL)</a> syntax.

In [0]:
ddl_schema = "user_id string, user_first_touch_timestamp long, email string"

users_df = (spark
           .read
           .option("sep", "\t")
           .option("header", True)
           .schema(ddl_schema)
           .csv(users_csv_path)
          )

### Read from JSON files

Read from JSON with DataFrameReader's **`json`** method and the infer schema option

In [0]:
events_json_path = f"{DA.paths.datasets}/ecommerce/events/events-500k.json"

events_df = (spark
            .read
            .option("inferSchema", True)
            .json(events_json_path)
           )

events_df.printSchema()

Read data faster by creating a **`StructType`** with the schema names and data types

In [0]:
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, LongType, StringType, StructType, StructField

user_defined_schema = StructType([
    StructField("device", StringType(), True),
    StructField("ecommerce", StructType([
        StructField("purchaseRevenue", DoubleType(), True),
        StructField("total_item_quantity", LongType(), True),
        StructField("unique_items", LongType(), True)
    ]), True),
    StructField("event_name", StringType(), True),
    StructField("event_previous_timestamp", LongType(), True),
    StructField("event_timestamp", LongType(), True),
    StructField("geo", StructType([
        StructField("city", StringType(), True),
        StructField("state", StringType(), True)
    ]), True),
    StructField("items", ArrayType(
        StructType([
            StructField("coupon", StringType(), True),
            StructField("item_id", StringType(), True),
            StructField("item_name", StringType(), True),
            StructField("item_revenue_in_usd", DoubleType(), True),
            StructField("price_in_usd", DoubleType(), True),
            StructField("quantity", LongType(), True)
        ])
    ), True),
    StructField("traffic_source", StringType(), True),
    StructField("user_first_touch_timestamp", LongType(), True),
    StructField("user_id", StringType(), True)
])

events_df = (spark
            .read
            .schema(user_defined_schema)
            .json(events_json_path)
           )

You can use the **`StructType`** Scala method **`toDDL`** to have a DDL-formatted string created for you.

This is convenient when you need to get the DDL-formated string for ingesting CSV and JSON but you don't want to hand craft it or the **`StructType`** variant of the schema.

However, this functionality is not available in Python but the power of the notebooks allows us to use both languages.

In [0]:
# Step 1 - use this trick to transfer a value (the dataset path) between Python and Scala using the shared spark-config
spark.conf.set("whatever_your_scope.events", events_json_path)

In a Python notebook like this one, create a Scala cell to injest the data and produce the DDL formatted schema

In [0]:
%scala
// Step 2 - pull the value from the config (or copy & paste it)
val eventsJsonPath = spark.conf.get("whatever_your_scope.events")

// Step 3 - Read in the JSON, but let it infer the schema
val eventsSchema = spark.read
                        .option("inferSchema", true)
                        .json(eventsJsonPath)
                        .schema.toDDL

// Step 4 - print the schema, select it, and copy it.
println("="*80)
println(eventsSchema)
println("="*80)

In [0]:
# Step 5 - paste the schema from above and assign it to a variable as seen here
events_schema = "`device` STRING,`ecommerce` STRUCT<`purchase_revenue_in_usd`: DOUBLE, `total_item_quantity`: BIGINT, `unique_items`: BIGINT>,`event_name` STRING,`event_previous_timestamp` BIGINT,`event_timestamp` BIGINT,`geo` STRUCT<`city`: STRING, `state`: STRING>,`items` ARRAY<STRUCT<`coupon`: STRING, `item_id`: STRING, `item_name`: STRING, `item_revenue_in_usd`: DOUBLE, `price_in_usd`: DOUBLE, `quantity`: BIGINT>>,`traffic_source` STRING,`user_first_touch_timestamp` BIGINT,`user_id` STRING"

# Step 6 - Read in the JSON data using our new DDL formatted string
events_df = (spark.read
                 .schema(events_schema)
                 .json(events_json_path))

display(events_df)

This is a great "trick" for producing a schema for a net-new dataset and for accelerating development.

When you are done (e.g. for Step #7), make sure to delete your temporary code.

<img src="https://files.training.databricks.com/images/icon_warn_32.png"> WARNING: **Do not use this trick in production**</br>
the inference of a schema can be REALLY slow as it<br/>
forces a full read of the source dataset to infer the schema

## DataFrameWriter
Interface used to write a DataFrame to external storage systems

<strong><code>
(df  
&nbsp;  .write                         
&nbsp;  .option("compression", "snappy")  
&nbsp;  .mode("overwrite")      
&nbsp;  .parquet(output_dir)       
)
</code></strong>

DataFrameWriter is accessible through the SparkSession attribute **`write`**. This class includes methods to write DataFrames to different external storage systems.

### Write DataFrames to files

Write **`users_df`** to parquet with DataFrameWriter's **`parquet`** method and the following configurations:

Snappy compression, overwrite mode

In [0]:
users_output_dir = f"{DA.paths.working_dir}/users.parquet"

(users_df
 .write
 .option("compression", "snappy")
 .mode("overwrite")
 .parquet(users_output_dir)
)

In [0]:
display(
    dbutils.fs.ls(users_output_dir)
)

As with DataFrameReader, Spark's Python API also allows you to specify the DataFrameWriter options as parameters to the **`parquet`** method

In [0]:
(users_df
 .write
 .parquet(users_output_dir, compression="snappy", mode="overwrite")
)

### Write DataFrames to tables

Write **`events_df`** to a table using the DataFrameWriter method **`saveAsTable`**

<img src="https://files.training.databricks.com/images/icon_note_32.png" alt="Note"> This creates a global table, unlike the local view created by the DataFrame method **`createOrReplaceTempView`**

In [0]:
events_df.write.mode("overwrite").saveAsTable("events")

This table was saved in the database created for you in classroom setup.

See database name printed below.

In [0]:
print(f"Database Name: {DA.db_name}")

... or even the tables in that database:

In [0]:
%sql
SHOW TABLES IN ${DA.db_name}

## Delta Lake

In almost all cases, the best practice is to use Delta Lake format, especially whenever the data will be referenced from a Databricks workspace. 

<a href="https://delta.io/" target="_blank">Delta Lake</a> is an open source technology designed to work with Spark to bring reliability to data lakes.

![delta](https://files.training.databricks.com/images/aspwd/delta_storage_layer.png)

#### Delta Lake's Key Features
- ACID transactions
- Scalable metadata handling
- Unified streaming and batch processing
- Time travel (data versioning)
- Schema enforcement and evolution
- Audit history
- Parquet format
- Compatible with Apache Spark API

### Write Results to a Delta Table

Write **`events_df`** with the DataFrameWriter's **`save`** method and the following configurations: Delta format & overwrite mode.

In [0]:
events_output_path = f"{DA.paths.working_dir}/delta/events"

(events_df
 .write
 .format("delta")
 .mode("overwrite")
 .save(events_output_path)
)

### Clean up classroom

In [0]:
DA.cleanup()

# Datetime Functions

##### Objectives
1. Cast to timestamp
2. Format datetimes
3. Extract from timestamp
4. Convert to date
5. Manipulate datetimes

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/column.html" target="_blank">Column</a>: **`cast`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#datetime-functions" target="_blank">Built-In Functions</a>: **`date_format`**, **`to_date`**, **`date_add`**, **`year`**, **`month`**, **`dayofweek`**, **`minute`**, **`second`**

In [0]:
%run ../Includes/Classroom-Setup

Let's use a subset of the BedBricks events dataset to practice working with date times.

In [0]:
from pyspark.sql.functions import col

df = spark.read.format("delta").load(DA.paths.events).select("user_id", col("event_timestamp").alias("timestamp"))
display(df)

### Built-In Functions: Date Time Functions
Here are a few built-in functions to manipulate dates and times in Spark.

| Method | Description |
| --- | --- |
| **`add_months`** | Returns the date that is numMonths after startDate |
| **`current_timestamp`** | Returns the current timestamp at the start of query evaluation as a timestamp column |
| **`date_format`** | Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument. |
| **`dayofweek`** | Extracts the day of the month as an integer from a given date/timestamp/string |
| **`from_unixtime`** | Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the yyyy-MM-dd HH:mm:ss format |
| **`minute`** | Extracts the minutes as an integer from a given date/timestamp/string. |
| **`unix_timestamp`** | Converts time string with given pattern to Unix timestamp (in seconds) |

### Cast to Timestamp

#### **`cast()`**
Casts column to a different data type, specified using string representation or DataType.

In [0]:
timestamp_df = df.withColumn("timestamp", (col("timestamp") / 1e6).cast("timestamp"))
display(timestamp_df)

In [0]:
from pyspark.sql.types import TimestampType

timestamp_df = df.withColumn("timestamp", (col("timestamp") / 1e6).cast(TimestampType()))
display(timestamp_df)

### Datetime Patterns for Formatting and Parsing
There are several common scenarios for datetime usage in Spark:

- CSV/JSON datasources use the pattern string for parsing and formatting datetime content.
- Datetime functions related to convert StringType to/from DateType or TimestampType e.g. **`unix_timestamp`**, **`date_format`**, **`from_unixtime`**, **`to_date`**, **`to_timestamp`**, etc.

Spark uses <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html" target="_blank">pattern letters for date and timestamp parsing and formatting</a>. A subset of these patterns are shown below.

| Symbol | Meaning         | Presentation | Examples               |
| ------ | --------------- | ------------ | ---------------------- |
| G      | era             | text         | AD; Anno Domini        |
| y      | year            | year         | 2020; 20               |
| D      | day-of-year     | number(3)    | 189                    |
| M/L    | month-of-year   | month        | 7; 07; Jul; July       |
| d      | day-of-month    | number(3)    | 28                     |
| Q/q    | quarter-of-year | number/text  | 3; 03; Q3; 3rd quarter |
| E      | day-of-week     | text         | Tue; Tuesday           |

<img src="https://files.training.databricks.com/images/icon_warn_32.png" alt="Warning"> Spark's handling of dates and timestamps changed in version 3.0, and the patterns used for parsing and formatting these values changed as well. For a discussion of these changes, please reference <a href="https://databricks.com/blog/2020/07/22/a-comprehensive-look-at-dates-and-timestamps-in-apache-spark-3-0.html" target="_blank">this Databricks blog post</a>.

### Format date

#### **`date_format()`**
Converts a date/timestamp/string to a string formatted with the given date time pattern.

In [0]:
from pyspark.sql.functions import date_format

formatted_df = (timestamp_df
                .withColumn("date string", date_format("timestamp", "MMMM dd, yyyy"))
                .withColumn("time string", date_format("timestamp", "HH:mm:ss.SSSSSS"))
               )
display(formatted_df)

### Extract datetime attribute from timestamp

#### **`year`**
Extracts the year as an integer from a given date/timestamp/string.

##### Similar methods: **`month`**, **`dayofweek`**, **`minute`**, **`second`**, etc.

In [0]:
from pyspark.sql.functions import year, month, dayofweek, minute, second

datetime_df = (timestamp_df
               .withColumn("year", year(col("timestamp")))
               .withColumn("month", month(col("timestamp")))
               .withColumn("dayofweek", dayofweek(col("timestamp")))
               .withColumn("minute", minute(col("timestamp")))
               .withColumn("second", second(col("timestamp")))
              )
display(datetime_df)

### Convert to Date

#### **`to_date`**
Converts the column into DateType by casting rules to DateType.

In [0]:
from pyspark.sql.functions import to_date

date_df = timestamp_df.withColumn("date", to_date(col("timestamp")))
display(date_df)

### Manipulate Datetimes

#### **`date_add`**
Returns the date that is the given number of days after start

In [0]:
from pyspark.sql.functions import date_add

plus_2_df = timestamp_df.withColumn("plus_two_days", date_add(col("timestamp"), 2))
display(plus_2_df)

### Clean up classroom

In [0]:
DA.cleanup()

# DataFrame & Column
##### Objectives
1. Construct columns
1. Subset columns
1. Add or replace columns
1. Subset rows
1. Sort rows

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html" target="_blank">DataFrame</a>: **`select`**, **`selectExpr`**, **`drop`**, **`withColumn`**, **`withColumnRenamed`**, **`filter`**, **`distinct`**, **`limit`**, **`sort`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/column.html" target="_blank">Column</a>: **`alias`**, **`isin`**, **`cast`**, **`isNotNull`**, **`desc`**, operators

In [0]:
%run ../Includes/Classroom-Setup

Let's use the BedBricks events dataset.

In [0]:
events_df = spark.read.format("delta").load(DA.paths.events)
display(events_df)

## Column Expressions

A <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/column.html" target="_blank">Column</a> is a logical construction that will be computed based on the data in a DataFrame using an expression

Construct a new Column based on existing columns in a DataFrame

In [0]:
from pyspark.sql.functions import col

print(events_df.device)
print(events_df["device"])
print(col("device"))

Scala supports an additional syntax for creating a new Column based on existing columns in a DataFrame

In [0]:
%scala
$"device"

### Column Operators and Methods
| Method | Description |
| --- | --- |
| \*, + , <, >= | Math and comparison operators |
| ==, != | Equality and inequality tests (Scala operators are **`===`** and **`=!=`**) |
| alias | Gives the column an alias |
| cast, astype | Casts the column to a different data type |
| isNull, isNotNull, isNan | Is null, is not null, is NaN |
| asc, desc | Returns a sort expression based on ascending/descending order of the column |

Create complex expressions with existing columns, operators, and methods.

In [0]:
col("ecommerce.purchase_revenue_in_usd") + col("ecommerce.total_item_quantity")
col("event_timestamp").desc()
(col("ecommerce.purchase_revenue_in_usd") * 100).cast("int")

Here's an example of using these column expressions in the context of a DataFrame

In [0]:
rev_df = (events_df
         .filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
         .withColumn("purchase_revenue", (col("ecommerce.purchase_revenue_in_usd") * 100).cast("int"))
         .withColumn("avg_purchase_revenue", col("ecommerce.purchase_revenue_in_usd") / col("ecommerce.total_item_quantity"))
         .sort(col("avg_purchase_revenue").desc())
        )

display(rev_df)

## DataFrame Transformation Methods
| Method | Description |
| --- | --- |
| **`select`** | Returns a new DataFrame by computing given expression for each element |
| **`drop`** | Returns a new DataFrame with a column dropped |
| **`withColumnRenamed`** | Returns a new DataFrame with a column renamed |
| **`withColumn`** | Returns a new DataFrame by adding a column or replacing the existing column that has the same name |
| **`filter`**, **`where`** | Filters rows using the given condition |
| **`sort`**, **`orderBy`** | Returns a new DataFrame sorted by the given expressions |
| **`dropDuplicates`**, **`distinct`** | Returns a new DataFrame with duplicate rows removed |
| **`limit`** | Returns a new DataFrame by taking the first n rows |
| **`groupBy`** | Groups the DataFrame using the specified columns, so we can run aggregation on them |

### Subset columns
Use DataFrame transformations to subset columns

#### **`select()`**
Selects a list of columns or column based expressions

In [0]:
devices_df = events_df.select("user_id", "device")
display(devices_df)

In [0]:
from pyspark.sql.functions import col

locations_df = events_df.select(
    "user_id", 
    col("geo.city").alias("city"), 
    col("geo.state").alias("state")
)
display(locations_df)

#### **`selectExpr()`**
Selects a list of SQL expressions

In [0]:
apple_df = events_df.selectExpr("user_id", "device in ('macOS', 'iOS') as apple_user")
display(apple_df)

#### **`drop()`**
Returns a new DataFrame after dropping the given column, specified as a string or Column object

Use strings to specify multiple columns

In [0]:
anonymous_df = events_df.drop("user_id", "geo", "device")
display(anonymous_df)

In [0]:
no_sales_df = events_df.drop(col("ecommerce"))
display(no_sales_df)


### Add or replace columns
Use DataFrame transformations to add or replace columns

#### **`withColumn()`**
Returns a new DataFrame by adding a column or replacing an existing column that has the same name.

In [0]:
mobile_df = events_df.withColumn("mobile", col("device").isin("iOS", "Android"))
display(mobile_df)

In [0]:
purchase_quantity_df = events_df.withColumn("purchase_quantity", col("ecommerce.total_item_quantity").cast("int"))
purchase_quantity_df.printSchema()

#### **`withColumnRenamed()`**
Returns a new DataFrame with a column renamed.

In [0]:
location_df = events_df.withColumnRenamed("geo", "location")
display(location_df)

### Subset Rows
Use DataFrame transformations to subset rows

#### **`filter()`**
Filters rows using the given SQL expression or column based condition.

##### Alias: **`where`**

In [0]:
purchases_df = events_df.filter("ecommerce.total_item_quantity > 0")
display(purchases_df)

In [0]:
revenue_df = events_df.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
display(revenue_df)

In [0]:
android_df = events_df.filter((col("traffic_source") != "direct") & (col("device") == "Android"))
display(android_df)

#### **`dropDuplicates()`**
Returns a new DataFrame with duplicate rows removed, optionally considering only a subset of columns.

##### Alias: **`distinct`**

In [0]:
display(events_df.distinct())

In [0]:
distinct_users_df = events_df.dropDuplicates(["user_id"])
display(distinct_users_df)

#### **`limit()`**
Returns a new DataFrame by taking the first n rows.

In [0]:
limit_df = events_df.limit(100)
display(limit_df)

### Sort rows
Use DataFrame transformations to sort rows

#### **`sort()`**
Returns a new DataFrame sorted by the given columns or expressions.

##### Alias: **`orderBy`**

In [0]:
increase_timestamps_df = events_df.sort("event_timestamp")
display(increase_timestamps_df)

In [0]:
decrease_timestamp_df = events_df.sort(col("event_timestamp").desc())
display(decrease_timestamp_df)

In [0]:
increase_sessions_df = events_df.orderBy(["user_first_touch_timestamp", "event_timestamp"])
display(increase_sessions_df)

In [0]:
decrease_sessions_df = events_df.sort(col("user_first_touch_timestamp").desc(), col("event_timestamp"))
display(decrease_sessions_df)

### Clean up classroom

In [0]:
DA.cleanup()

# Active Users Lab
Plot daily active users and average active users by day of week.
1. Extract timestamp and date of events
2. Get daily active users
3. Get average number of active users by day of week
4. Sort day of week in correct order

In [0]:
%run ../Includes/Classroom-Setup

### Setup
Run the cell below to create the starting DataFrame of user IDs and timestamps of events logged on the BedBricks website.

In [0]:
from pyspark.sql.functions import col

df = (spark
      .read
      .format("delta")
      .load(DA.paths.events)
      .select("user_id", col("event_timestamp").alias("ts"))
     )

display(df)

### 1. Extract timestamp and date of events
- Convert **`ts`** from microseconds to seconds by dividing by 1 million and cast to timestamp
- Add **`date`** column by converting **`ts`** to date

In [0]:
# ANSWER
from pyspark.sql.functions import to_date

datetime_df = (df
               .withColumn("ts", (col("ts") / 1e6).cast("timestamp"))
               .withColumn("date", to_date("ts"))
              )
display(datetime_df)

**1.1: CHECK YOUR WORK**

In [0]:
from pyspark.sql.types import DateType, StringType, StructField, StructType, TimestampType

expected1a = StructType([StructField("user_id", StringType(), True),
                         StructField("ts", TimestampType(), True),
                         StructField("date", DateType(), True)])

result1a = datetime_df.schema

assert expected1a == result1a, "datetime_df does not have the expected schema"
print("All test pass")

In [0]:
import datetime

expected1b = datetime.date(2020, 6, 19)
result1b = datetime_df.sort("date").first().date

assert expected1b == result1b, "datetime_df does not have the expected date values"
print("All test pass")

### 2. Get daily active users
- Group by date
- Aggregate approximate count of distinct **`user_id`** and alias to "active_users"
  - Recall built-in function to get **approximate count distinct** (also recall:  approximate count distinct is different than count distinct!)
- Sort by date
- Plot as line graph

In [0]:
# ANSWER
from pyspark.sql.functions import approx_count_distinct

active_users_df = (datetime_df
                   .groupBy("date")
                   .agg(approx_count_distinct("user_id").alias("active_users"))
                   .sort("date")
                  )
display(active_users_df)

**2.1: CHECK YOUR WORK**

In [0]:
from pyspark.sql.types import LongType

expected2a = StructType([StructField("date", DateType(), True),
                         StructField("active_users", LongType(), False)])

result2a = active_users_df.schema

assert expected2a == result2a, "active_users_df does not have the expected schema"
print("All test pass")

In [0]:
expected2b = [(datetime.date(2020, 6, 19), 251573), (datetime.date(2020, 6, 20), 357215), (datetime.date(2020, 6, 21), 305055), (datetime.date(2020, 6, 22), 239094), (datetime.date(2020, 6, 23), 243117)]

result2b = [(row.date, row.active_users) for row in active_users_df.orderBy("date").take(5)]

assert expected2b == result2b, "active_users_df does not have the expected values"
print("All test pass")

### 3. Get average number of active users by day of week
- Add **`day`** column by extracting day of week from **`date`** using a datetime pattern string - the expected output here will be a day name, not a number (e.g. **`Mon`**, not **`1`**)
- Group by **`day`**
- Aggregate average of **`active_users`** and alias to "avg_users"

In [0]:
# ANSWER
from pyspark.sql.functions import date_format, avg

active_dow_df = (active_users_df
                 .withColumn("day", date_format(col("date"), "E"))
                 .groupBy("day")
                 .agg(avg(col("active_users")).alias("avg_users"))
                )
display(active_dow_df)

**3.1: CHECK YOUR WORK**

In [0]:
from pyspark.sql.types import DoubleType

expected3a = StructType([StructField("day", StringType(), True),
                         StructField("avg_users", DoubleType(), True)])

result3a = active_dow_df.schema

assert expected3a == result3a, "active_dow_df does not have the expected schema"
print("All test pass")

In [0]:
expected3b = [("Fri", 247180.66666666666), ("Mon", 238195.5), ("Sat", 278482.0), ("Sun", 282905.5), ("Thu", 264620.0), ("Tue", 260942.5), ("Wed", 227214.0)]

result3b = [(row.day, row.avg_users) for row in active_dow_df.sort("day").collect()]

assert expected3b == result3b, "active_dow_df does not have the expected values"
print("All test pass")

# Sort Day Lab

##### Tasks
1. Define a UDF to label the day of week
1. Apply the UDF to label and sort by day of week
1. Plot active users by day of week as a bar graph

In [0]:
%run ../Includes/Classroom-Setup

Start with a DataFrame of the average number of active users by day of week.

This was the resulting **`df`** in a previous lab.

In [0]:
from pyspark.sql.functions import approx_count_distinct, avg, col, date_format, to_date

df = (spark
      .read
      .format("delta")
      .load(DA.paths.events)
      .withColumn("ts", (col("event_timestamp") / 1e6).cast("timestamp"))
      .withColumn("date", to_date("ts"))
      .groupBy("date").agg(approx_count_distinct("user_id").alias("active_users"))
      .withColumn("day", date_format(col("date"), "E"))
      .groupBy("day").agg(avg(col("active_users")).alias("avg_users"))
     )

display(df)

### 1. Define UDF to label day of week

Use the **`label_day_of_week`** function provided below to create the UDF **`label_dow_udf`**

In [0]:
def label_day_of_week(day: str) -> str:
    dow = {"Mon": "1", "Tue": "2", "Wed": "3", "Thu": "4",
           "Fri": "5", "Sat": "6", "Sun": "7"}
    return dow.get(day) + "-" + day

In [0]:
# ANSWER
label_dow_udf = spark.udf.register("label_dow", label_day_of_week)

### 2. Apply UDF to label and sort by day of week
- Update the **`day`** column by applying the UDF and replacing this column
- Sort by **`day`**
- Plot as a bar graph

In [0]:
# ANSWER
final_df = (df
            .withColumn("day", label_dow_udf(col("day")))
            .sort("day")
           )
display(final_df)

# Revenue by Traffic Lab
Get the 3 traffic sources generating the highest total revenue.
1. Aggregate revenue by traffic source
2. Get top 3 traffic sources by total revenue
3. Clean revenue columns to have two decimal places

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html" target="_blank">DataFrame</a>: **`groupBy`**, **`sort`**, **`limit`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/column.html" target="_blank">Column</a>: **`alias`**, **`desc`**, **`cast`**, **`operators`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html" target="_blank">Built-in Functions</a>: **`avg`**, **`sum`**

In [0]:
%run ../Includes/Classroom-Setup

### Setup
Run the cell below to create the starting DataFrame **`df`**.

In [0]:
from pyspark.sql.functions import col

# Purchase events logged on the BedBricks website
df = (spark.read.format("delta").load(DA.paths.events)
      .withColumn("revenue", col("ecommerce.purchase_revenue_in_usd"))
      .filter(col("revenue").isNotNull())
      .drop("event_name")
     )

display(df)

### 1. Aggregate revenue by traffic source
- Group by **`traffic_source`**
- Get sum of **`revenue`** as **`total_rev`**. Round this to the tens decimal place (e.g. `nnnnn.n`). 
- Get average of **`revenue`** as **`avg_rev`**

Remember to import any necessary built-in functions.

In [0]:
# ANSWER
from pyspark.sql.functions import avg, col, sum

traffic_df = (df
              .groupBy("traffic_source")
              .agg(sum(col("revenue")).alias("total_rev"),
                   avg(col("revenue")).alias("avg_rev"))
             )

display(traffic_df)

**1.1: CHECK YOUR WORK**

In [0]:
from pyspark.sql.functions import round

expected1 = [(12704560.0, 1083.175), (78800000.3, 983.2915), (24797837.0, 1076.6221), (47218429.0, 1086.8303), (16177893.0, 1083.4378), (8044326.0, 1087.218)]
test_df = traffic_df.sort("traffic_source").select(round("total_rev", 4).alias("total_rev"), round("avg_rev", 4).alias("avg_rev"))
result1 = [(row.total_rev, row.avg_rev) for row in test_df.collect()]

assert(expected1 == result1)
print("All test pass")

### 2. Get top three traffic sources by total revenue
- Sort by **`total_rev`** in descending order
- Limit to first three rows

In [0]:
# ANSWER
top_traffic_df = traffic_df.sort(col("total_rev").desc()).limit(3)
display(top_traffic_df)

**2.1: CHECK YOUR WORK**

In [0]:
expected2 = [(78800000.3, 983.2915), (47218429.0, 1086.8303), (24797837.0, 1076.6221)]
test_df = top_traffic_df.select(round("total_rev", 4).alias("total_rev"), round("avg_rev", 4).alias("avg_rev"))
result2 = [(row.total_rev, row.avg_rev) for row in test_df.collect()]

assert(expected2 == result2)
print("All test pass")

### 3. Limit revenue columns to two decimal places
- Modify columns **`avg_rev`** and **`total_rev`** to contain numbers with two decimal places
  - Use **`withColumn()`** with the same names to replace these columns
  - To limit to two decimal places, multiply each column by 100, cast to long, and then divide by 100

In [0]:
# ANSWER
final_df = (top_traffic_df
            .withColumn("avg_rev", (col("avg_rev") * 100).cast("long") / 100)
            .withColumn("total_rev", (col("total_rev") * 100).cast("long") / 100)
           )

display(final_df)

**3.1: CHECK YOUR WORK**

In [0]:
expected3 = [(78800000.29, 983.29), (47218429.0, 1086.83), (24797837.0, 1076.62)]
result3 = [(row.total_rev, row.avg_rev) for row in final_df.collect()]

assert(expected3 == result3)
print("All test pass")

### 4. Bonus: Rewrite using a built-in math function
Find a built-in math function that rounds to a specified number of decimal places

In [0]:
# ANSWER
from pyspark.sql.functions import round

bonus_df = (top_traffic_df
            .withColumn("avg_rev", round("avg_rev", 2))
            .withColumn("total_rev", round("total_rev", 2))
           )

display(bonus_df)

**4.1: CHECK YOUR WORK**

In [0]:
expected4 = [(78800000.3, 983.29), (47218429.0, 1086.83), (24797837.0, 1076.62)]
result4 = [(row.total_rev, row.avg_rev) for row in bonus_df.collect()]

assert(expected4 == result4)
print("All test pass")

### 5. Chain all the steps above

In [0]:
# ANSWER
# Solution #1 using round

chain_df = (df
            .groupBy("traffic_source")
            .agg(sum(col("revenue")).alias("total_rev"),
                 avg(col("revenue")).alias("avg_rev"))
            .sort(col("total_rev").desc())
            .limit(3)
            .withColumn("avg_rev", round("avg_rev", 2))
            .withColumn("total_rev", round("total_rev", 2))
           )

display(chain_df)

In [0]:
# ANSWER
# Solution #2 using *100, cast, /100
# chain_df = (df
#             .groupBy("traffic_source")
#             .agg(sum(col("revenue")).alias("total_rev"),
#                  avg(col("revenue")).alias("avg_rev"))
#             .sort(col("total_rev").desc())
#             .limit(3)
#             .withColumn("avg_rev", (col("avg_rev") * 100).cast("long") / 100)
#             .withColumn("total_rev", (col("total_rev") * 100).cast("long") / 100)
#            )

# display(chain_df)

**5.1: CHECK YOUR WORK**

In [0]:
method_a = [(78800000.3,  983.29), (47218429.0, 1086.83), (24797837.0, 1076.62)]
method_b = [(78800000.29, 983.29), (47218429.0, 1086.83), (24797837.0, 1076.62)]
result5 = [(row.total_rev, row.avg_rev) for row in chain_df.collect()]

assert result5 == method_a or result5 == method_b
print("All test pass")

%md # Query Optimization

We'll explore query plans and optimizations for several examples including logical optimizations and exanples with and without predicate pushdown.

##### Objectives
1. Logical optimizations
1. Predicate pushdown
1. No predicate pushdown

##### Methods 
- <a href="https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.explain.html#pyspark.sql.DataFrame.explain" target="_blank">DataFrame</a>: **`explain`**

Let’s run our set up cell, and get our initial DataFrame stored in the variable **`df`**. Displaying this DataFrame shows us events data.

In [0]:
%run ../Includes/Classroom-Setup

In [0]:
df = spark.read.format("delta").load(DA.paths.events)
display(df)

### Logical Optimization

**`explain(..)`** prints the query plans, optionally formatted by a given explain mode. Compare the following logical plan & physical plan, noting how Catalyst handled the multiple **`filter`** transformations.

In [0]:
from pyspark.sql.functions import col

limit_events_df = (df
                   .filter(col("event_name") != "reviews")
                   .filter(col("event_name") != "checkout")
                   .filter(col("event_name") != "register")
                   .filter(col("event_name") != "email_coupon")
                   .filter(col("event_name") != "cc_info")
                   .filter(col("event_name") != "delivery")
                   .filter(col("event_name") != "shipping_info")
                   .filter(col("event_name") != "press")
                  )

limit_events_df.explain(True)

Of course, we could have written the query originally using a single **`filter`** condition ourselves. Compare the previous and following query plans.

In [0]:
better_df = (df
             .filter((col("event_name").isNotNull()) &
                     (col("event_name") != "reviews") &
                     (col("event_name") != "checkout") &
                     (col("event_name") != "register") &
                     (col("event_name") != "email_coupon") &
                     (col("event_name") != "cc_info") &
                     (col("event_name") != "delivery") &
                     (col("event_name") != "shipping_info") &
                     (col("event_name") != "press"))
            )

better_df.explain(True)

Of course, we wouldn't write the following code intentionally, but in a long, complex query you might not notice the duplicate filter conditions. Let's see what Catalyst does with this query.

In [0]:
stupid_df = (df
             .filter(col("event_name") != "finalize")
             .filter(col("event_name") != "finalize")
             .filter(col("event_name") != "finalize")
             .filter(col("event_name") != "finalize")
             .filter(col("event_name") != "finalize")
            )

stupid_df.explain(True)

### Caching

By default the data of a DataFrame is present on a Spark cluster only while it is being processed during a query -- it is not automatically persisted on the cluster afterwards. (Spark is a data processing engine, not a data storage system.) You can explicity request Spark to persist a DataFrame on the cluster by invoking its **`cache`** method.

If you do cache a DataFrame, you should always explictly evict it from cache by invoking **`unpersist`** when you no longer need it.

<img src="https://files.training.databricks.com/images/icon_best_32.png" alt="Best Practice"> Caching a DataFrame can be appropriate if you are certain that you will use the same DataFrame multiple times, as in:

- Exploratory data analysis
- Machine learning model training

<img src="https://files.training.databricks.com/images/icon_warn_32.png" alt="Warning"> Aside from those use cases, you should **not** cache DataFrames because it is likely that you'll *degrade* the performance of your application.

- Caching consumes cluster resources that could otherwise be used for task execution
- Caching can prevent Spark from performing query optimizations, as shown in the next example

### Predicate Pushdown

Here is example reading from a JDBC source, where Catalyst determines that *predicate pushdown* can take place.

In [0]:
%scala
// Ensure that the driver class is loaded
Class.forName("org.postgresql.Driver")

In [0]:
jdbc_url = "jdbc:postgresql://54.213.33.240/training"

# Username and Password w/read-only rights
conn_properties = {
    "user" : "training",
    "password" : "training"
}

pp_df = (spark
         .read
         .jdbc(url=jdbc_url,                 # the JDBC URL
               table="training.people_1m",   # the name of the table
               column="id",                  # the name of a column of an integral type that will be used for partitioning
               lowerBound=1,                 # the minimum value of columnName used to decide partition stride
               upperBound=1000000,           # the maximum value of columnName used to decide partition stride
               numPartitions=8,              # the number of partitions/connections
               properties=conn_properties    # the connection properties
              )
         .filter(col("gender") == "M")   # Filter the data by gender
        )

pp_df.explain(True)

Note the lack of a **Filter** and the presence of a **PushedFilters** in the **Scan**. The filter operation is pushed to the database and only the matching records are sent to Spark. This can greatly reduce the amount of data that Spark needs to ingest.

### No Predicate Pushdown

In comparison, caching the data before filtering eliminates the possibility for the predicate push down.

In [0]:
cached_df = (spark
            .read
            .jdbc(url=jdbc_url,
                  table="training.people_1m",
                  column="id",
                  lowerBound=1,
                  upperBound=1000000,
                  numPartitions=8,
                  properties=conn_properties
                 )
            )

cached_df.cache()
filtered_df = cached_df.filter(col("gender") == "M")

filtered_df.explain(True)

In addition to the **Scan** (the JDBC read) we saw in the previous example, here we also see the **InMemoryTableScan** followed by a **Filter** in the explain plan.

This means Spark had to read ALL the data from the database and cache it, and then scan it in cache to find the records matching the filter condition.

Remember to clean up after ourselves!

In [0]:
cached_df.unpersist()

# Partitioning
##### Objectives
1. Get partitions and cores
1. Repartition DataFrames
1. Configure default shuffle partitions

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html" target="_blank">DataFrame</a>: **`repartition`**, **`coalesce`**, **`rdd.getNumPartitions`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.html" target="_blank">SparkConf</a>: **`get`**, **`set`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html" target="_blank">SparkSession</a>: **`spark.sparkContext.defaultParallelism`**

##### SparkConf Parameters
- **`spark.sql.shuffle.partitions`**, **`spark.sql.adaptive.enabled`**

In [0]:
%run ../Includes/Classroom-Setup

### Get partitions and cores

Use the **`rdd`** method **`getNumPartitions`** to get the number of DataFrame partitions.

In [0]:
df = spark.read.format("delta").load(DA.paths.events)
df.rdd.getNumPartitions()

Access **`SparkContext`** through **`SparkSession`** to get the number of cores or slots.

Use the **`defaultParallelism`** attribute to get the number of cores in a cluster.

In [0]:
print(spark.sparkContext.defaultParallelism)

**`SparkContext`** is also provided in Databricks notebooks as the variable **`sc`**.

In [0]:
print(sc.defaultParallelism)

### Repartition DataFrame

There are two methods available to repartition a DataFrame: **`repartition`** and **`coalesce`**.

#### **`repartition`**
Returns a new DataFrame that has exactly **`n`** partitions.

- Wide transformation
- Pro: Evenly balances partition sizes  
- Con: Requires shuffling all data

In [0]:
repartitioned_df = df.repartition(8)

In [0]:
repartitioned_df.rdd.getNumPartitions()

#### **`coalesce`**
Returns a new DataFrame that has exactly **`n`** partitions, when fewer partitions are requested.

If a larger number of partitions is requested, it will stay at the current number of partitions.

- Narrow transformation, some partitions are effectively concatenated
- Pro: Requires no shuffling
- Cons:
  - Is not able to increase # partitions
  - Can result in uneven partition sizes

In [0]:
coalesce_df = df.coalesce(8)

In [0]:
coalesce_df.rdd.getNumPartitions()

### Configure default shuffle partitions

Use the SparkSession's **`conf`** attribute to get and set dynamic Spark configuration properties. The **`spark.sql.shuffle.partitions`** property determines the number of partitions that result from a shuffle. Let's check its default value:

In [0]:
spark.conf.get("spark.sql.shuffle.partitions")

Assuming that the data set isn't too large, you could configure the default number of shuffle partitions to match the number of cores:

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)
print(spark.conf.get("spark.sql.shuffle.partitions"))

### Partitioning Guidelines
- Make the number of partitions a multiple of the number of cores
- Target a partition size of ~200MB
- Size default shuffle partitions by dividing largest shuffle stage input by the target partition size (e.g., 4TB / 200MB = 20,000 shuffle partition count)

<img src="https://files.training.databricks.com/images/icon_note_32.png" alt="Note"> When writing a DataFrame to storage, the number of DataFrame partitions determines the number of data files written. (This assumes that <a href="https://sparkbyexamples.com/apache-hive/hive-partitions-explained-with-examples/" target="_blank">Hive partitioning</a> is not used for the data in storage. A discussion of DataFrame partitioning vs Hive partitioning is beyond the scope of this class.)

### Adaptive Query Execution

<img src="https://files.training.databricks.com/images/aspwd/partitioning_aqe.png" width="60%" />

In Spark 3, <a href="https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution" target="_blank">AQE</a> is now able to <a href="https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html" target="_blank"> dynamically coalesce shuffle partitions</a> at runtime. This means that you can set **`spark.sql.shuffle.partitions`** based on the largest data set your application processes and allow AQE to reduce the number of partitions automatically when there is less data to process.

The **`spark.sql.adaptive.enabled`** configuration option controls whether AQE is turned on/off.

In [0]:
spark.conf.get("spark.sql.adaptive.enabled")

# De-Duping Data Lab

In this exercise, we're doing ETL on a file we've received from a customer. That file contains data about people, including:

* first, middle and last names
* gender
* birth date
* Social Security number
* salary

But, as is unfortunately common in data we get from this customer, the file contains some duplicate records. Worse:

* In some of the records, the names are mixed case (e.g., "Carol"), while in others, they are uppercase (e.g., "CAROL").
* The Social Security numbers aren't consistent either. Some of them are hyphenated (e.g., "992-83-4829"), while others are missing hyphens ("992834829").

If all of the name fields match -- if you disregard character case -- then the birth dates and salaries are guaranteed to match as well,
and the Social Security Numbers *would* match if they were somehow put in the same format.

Your job is to remove the duplicate records. The specific requirements of your job are:

* Remove duplicates. It doesn't matter which record you keep; it only matters that you keep one of them.
* Preserve the data format of the columns. For example, if you write the first name column in all lowercase, you haven't met this requirement.

<img src="https://files.training.databricks.com/images/icon_hint_32.png" alt="Hint"> The initial dataset contains 103,000 records.
The de-duplicated result has 100,000 records.

Next, write the results in **Delta** format as a **single data file** to the directory given by the variable **delta_dest_dir**.

<img src="https://files.training.databricks.com/images/icon_hint_32.png" alt="Hint"> Remember the relationship between the number of partitions in a DataFrame and the number of files written.

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.html" target="_blank">DataFrameReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html" target="_blank">DataFrame</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html" target="_blank">Built-In Functions</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.html" target="_blank">DataFrameWriter</a>

In [0]:
%run ../Includes/Classroom-Setup

It's helpful to look at the file first, so you can check the format with **`dbutils.fs.head()`**.

In [0]:
dbutils.fs.head(f"{DA.paths.datasets}/people/people-with-dups.txt")

In [0]:
# TODO

source_file = f"{DA.paths.datasets}/people/people-with-dups.txt"
delta_dest_dir = f"{DA.paths.working_dir}/people"

# In case it already exists
dbutils.fs.rm(delta_dest_dir, True)

# Complete your work here...


**CHECK YOUR WORK**

In [0]:
verify_files = dbutils.fs.ls(delta_dest_dir)
verify_delta_format = False
verify_num_data_files = 0
for f in verify_files:
    if f.name == "_delta_log/":
        verify_delta_format = True
    elif f.name.endswith(".parquet"):
        verify_num_data_files += 1

assert verify_delta_format, "Data not written in Delta format"
assert verify_num_data_files == 1, "Expected 1 data file written"

verify_record_count = spark.read.format("delta").load(delta_dest_dir).count()
assert verify_record_count == 100000, "Expected 100000 records in final result"

del verify_files, verify_delta_format, verify_num_data_files, verify_record_count
print("All test pass")