-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# DataFrame & Column
##### Objectives
1. Construct columns
1. Subset columns
1. Add or replace columns
1. Subset rows
1. Sort rows

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html" target="_blank">DataFrame</a>: `select`, `selectExpr`, `drop`, `withColumn`, `withColumnRenamed`, `filter`, `distinct`, `limit`, `sort`
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.html" target="_blank">Column</a>: `alias`, `isin`, `cast`, `isNotNull`, `desc`, operators

In [0]:
%run ./Includes/Classroom-Setup

Let's use the BedBricks events dataset.

In [0]:
eventsDF = spark.read.parquet(eventsPath)
display(eventsDF)

## Column Expressions

A <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.html" target="_blank">Column</a> is a logical construction that will be computed based on the data in a DataFrame using an expression

Construct a new Column based on existing columns in a DataFrame

In [0]:
from pyspark.sql.functions import col

eventsDF.device
eventsDF["device"]
col("device")

Scala supports an additional syntax for creating a new Column based on existing columns in a DataFrame

In [0]:
%scala
$"device"

### Column Operators and Methods
| Method | Description |
| --- | --- |
| \*, + , <, >= | Math and comparison operators |
| ==, != | Equality and inequality tests (Scala operators are `===` and `=!=`) |
| alias | Gives the column an alias |
| cast, astype | Casts the column to a different data type |
| isNull, isNotNull, isNan | Is null, is not null, is NaN |
| asc, desc | Returns a sort expression based on ascending/descending order of the column |

Create complex expressions with existing columns, operators, and methods.

In [0]:
col("ecommerce.purchase_revenue_in_usd") + col("ecommerce.total_item_quantity")
col("event_timestamp").desc()
(col("ecommerce.purchase_revenue_in_usd") * 100).cast("int")

Here's an example of using these column expressions in the context of a DataFrame

In [0]:
revDF = (eventsDF
         .filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
         .withColumn("purchase_revenue", (col("ecommerce.purchase_revenue_in_usd") * 100).cast("int"))
         .withColumn("avg_purchase_revenue", col("ecommerce.purchase_revenue_in_usd") / col("ecommerce.total_item_quantity"))
         .sort(col("avg_purchase_revenue").desc())
        )

display(revDF)

## DataFrame Transformation Methods
| Method | Description |
| --- | --- |
| select | Returns a new DataFrame by computing given expression for each element |
| drop | Returns a new DataFrame with a column dropped |
| withColumnRenamed | Returns a new DataFrame with a column renamed |
| withColumn | Returns a new DataFrame by adding a column or replacing the existing column that has the same name |
| filter, where | Filters rows using the given condition |
| sort, orderBy | Returns a new DataFrame sorted by the given expressions |
| dropDuplicates, distinct | Returns a new DataFrame with duplicate rows removed |
| limit | Returns a new DataFrame by taking the first n rows |
| groupBy | Groups the DataFrame using the specified columns, so we can run aggregation on them |

### Subset columns
Use DataFrame transformations to subset columns

#### `select()`
Selects a list of columns or column based expressions

In [0]:
devicesDF = eventsDF.select("user_id", "device")
display(devicesDF)

In [0]:
from pyspark.sql.functions import col

locationsDF = eventsDF.select(
    "user_id", 
    col("geo.city").alias("city"), 
    col("geo.state").alias("state")
)
display(locationsDF)

#### `selectExpr()`
Selects a list of SQL expressions

In [0]:
appleDF = eventsDF.selectExpr("user_id", "device in ('macOS', 'iOS') as apple_user")
display(appleDF)

#### `drop()`
Returns a new DataFrame after dropping the given column, specified as a string or Column object

Use strings to specify multiple columns

In [0]:
anonymousDF = eventsDF.drop("user_id", "geo", "device")
display(anonymousDF)

In [0]:
noSalesDF = eventsDF.drop(col("ecommerce"))
display(noSalesDF)

### Add or replace columns
Use DataFrame transformations to add or replace columns

#### `withColumn()`
Returns a new DataFrame by adding a column or replacing an existing column that has the same name.

In [0]:
mobileDF = eventsDF.withColumn("mobile", col("device").isin("iOS", "Android"))
display(mobileDF)

In [0]:
purchaseQuantityDF = eventsDF.withColumn("purchase_quantity", col("ecommerce.total_item_quantity").cast("int"))
purchaseQuantityDF.printSchema()

#### `withColumnRenamed()`
Returns a new DataFrame with a column renamed.

In [0]:
locationDF = eventsDF.withColumnRenamed("geo", "location")
display(locationDF)

### Subset Rows
Use DataFrame transformations to subset rows

#### `filter()`
Filters rows using the given SQL expression or column based condition.

In [0]:
purchasesDF = eventsDF.filter("ecommerce.total_item_quantity > 0")
display(purchasesDF)

In [0]:
revenueDF = eventsDF.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
display(revenueDF)

In [0]:
androidDF = eventsDF.filter((col("traffic_source") != "direct") & (col("device") == "Android"))
display(androidDF)

#### `dropDuplicates()`
Returns a new DataFrame with duplicate rows removed, optionally considering only a subset of columns.

##### Alias: `distinct`

In [0]:
eventsDF.distinct()

In [0]:
distinctUsersDF = eventsDF.dropDuplicates(["user_id"])
display(distinctUsersDF)

#### `limit()`
Returns a new DataFrame by taking the first n rows.

In [0]:
limitDF = eventsDF.limit(100)
display(limitDF)

### Sort rows
Use DataFrame transformations to sort rows

#### `sort()`
Returns a new DataFrame sorted by the given columns or expressions.

##### Alias: `orderBy`

In [0]:
increaseTimestampsDF = eventsDF.sort("event_timestamp")
display(increaseTimestampsDF)

In [0]:
decreaseTimestampsDF = eventsDF.sort(col("event_timestamp").desc())
display(decreaseTimestampsDF)

In [0]:
increaseSessionsDF = eventsDF.orderBy(["user_first_touch_timestamp", "event_timestamp"])
display(increaseSessionsDF)

In [0]:
decreaseSessionsDF = eventsDF.sort(col("user_first_touch_timestamp").desc(), col("event_timestamp"))
display(decreaseSessionsDF)

# Purchase Revenues Lab

Prepare dataset of events with purchase revenue.

##### Tasks
1. Extract purchase revenue for each event
2. Filter events where revenue is not null
3. Check what types of events have revenue
4. Drop unneeded column

##### Methods
- DataFrame: `select`, `drop`, `withColumn`, `filter`, `dropDuplicates`
- Column: `isNotNull`

In [0]:
eventsDF = spark.read.parquet(eventsPath)
display(eventsDF)

### 1. Extract purchase revenue for each event
Add new column **`revenue`** by extracting **`ecommerce.purchase_revenue_in_usd`**

In [0]:
# TODO
revenueDF = eventsDF.FILL_IN
display(revenueDF)

**CHECK YOUR WORK**

In [0]:
expected1 = [5830.0, 5485.0, 5289.0, 5219.1, 5180.0, 5175.0, 5125.0, 5030.0, 4985.0, 4985.0]
result1 = [row.revenue for row in revenueDF.sort(col("revenue").desc_nulls_last()).limit(10).collect()]

assert(expected1 == result1)

### 2. Filter events where revenue is not null
Filter for records where **`revenue`** is not **`null`**

In [0]:
# TODO
purchasesDF = revenueDF.FILL_IN
display(purchasesDF)

**CHECK YOUR WORK**

In [0]:
assert purchasesDF.filter(col("revenue").isNull()).count() == 0, "Nulls in 'revenue' column"

### 3. Check what types of events have revenue
Find unique **`event_name`** values in **`purchasesDF`** in one of two ways:
- Select "event_name" and get distinct records
- Drop duplicate records based on the "event_name" only

<img src="https://files.training.databricks.com/images/icon_hint_32.png" alt="Hint"> There's only one event associated with revenues

In [0]:
# TODO
distinctDF = purchasesDF.FILL_IN
display(distinctDF)

### 4. Drop unneeded column
Since there's only one event type, drop **`event_name`** from **`purchasesDF`**.

In [0]:
# TODO
finalDF = purchasesDF.FILL_IN
display(finalDF)

**CHECK YOUR WORK**

In [0]:
expected_columns = {"device", "ecommerce", "event_previous_timestamp", "event_timestamp",
                    "geo", "items", "revenue", "traffic_source",
                    "user_first_touch_timestamp", "user_id"}
assert(set(finalDF.columns) == expected_columns)

### 5. Chain all the steps above excluding step 3

In [0]:
# TODO
finalDF = (eventsDF
  .FILL_IN
)

display(finalDF)

**CHECK YOUR WORK**

In [0]:
assert(finalDF.count() == 180678)

In [0]:
expected_columns = {"device", "ecommerce", "event_previous_timestamp", "event_timestamp",
                    "geo", "items", "revenue", "traffic_source",
                    "user_first_touch_timestamp", "user_id"}
assert(set(finalDF.columns) == expected_columns)

### Clean up classroom

In [0]:
%run ./Includes/Classroom-Cleanup

-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>