d-sandbox
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 400px">
</div>

# DataFrame & Column
1. Construct columns
1. Subset columns
1. Add or replace columns
1. Subset rows
1. Sort rows

##### Methods
- DataFrame (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html" target="_blank">Scala</a>): `select`, `selectExpr`, `drop`, `withColumn`, `withColumnRenamed`, `filter`, `distinct`, `limit`, `sort`
- Column (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=column#pyspark.sql.Column" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html" target="_blank">Scala</a>): `alias`, `isin`, `cast`, `isNotNull`, `desc`, operators

In [0]:
%run ./Includes/Classroom-Setup

Let's use the BedBricks events dataset.

In [0]:
eventsDF = spark.read.parquet(eventsPath)
display(eventsDF)

device,ecommerce,event_name,event_previous_timestamp,event_timestamp,geo,items,traffic_source,user_first_touch_timestamp,user_id
macOS,"List(null, null, null)",warranty,1593878899217692.0,1593878946592107,"List(Montrose, MI)",List(),google,1593878899217692,UA000000107379500
Windows,"List(null, null, null)",press,1593876662175340.0,1593877011756535,"List(Northampton, MA)",List(),google,1593876662175340,UA000000107359357
macOS,"List(null, null, null)",add_item,1593878792892652.0,1593878815459100,"List(Salinas, CA)","List(List(null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1))",youtube,1593878455472030,UA000000107375547
iOS,"List(null, null, null)",mattresses,1593878178791663.0,1593878809276923,"List(Everett, MA)",List(),facebook,1593877903116176,UA000000107370581
Windows,"List(null, null, null)",mattresses,,1593878628143633,"List(Cottage Grove, MN)",List(),google,1593878628143633,UA000000107377108
Windows,"List(null, null, null)",main,,1593878634344194,"List(Medina, MN)",List(),youtube,1593878634344194,UA000000107377161
iOS,"List(null, null, null)",main,,1593877936171803,"List(Mount Pleasant, UT)",List(),direct,1593877936171803,UA000000107370851
macOS,"List(null, null, null)",main,,1593876843215329,"List(Piedmont, AL)",List(),instagram,1593876843215329,UA000000107360961
Android,"List(null, null, null)",warranty,1593878529774474.0,1593879213196400,"List(Rancho Santa Margarita, CA)",List(),instagram,1593878529774474,UA000000107376205
Windows,"List(null, null, null)",main,,1593876713246514,"List(Elyria, OH)",List(),facebook,1593876713246514,UA000000107359805


In [0]:
eventsDF.createOrReplaceTempView("events_tbl")

In [0]:
display(spark.sql("SELECT distinct device FROM events_tbl"))

device
iOS
Linux
macOS
Chrome OS
Android
Windows


### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Construct columns

A **column** is a logical construction that will be computed based on the data in a DataFrame using an expression

Construct a new column based on the input columns existing in a DataFrame

In [0]:
from pyspark.sql.functions import col,lit

col("device")
display(eventsDF.device)
eventsDF["device"]

Column<'device'>Out[8]: Column<'device'>

In [0]:
eventsDF.printSchema()

root
 |-- device: string (nullable = true)
 |-- ecommerce: struct (nullable = true)
 |    |-- purchase_revenue_in_usd: double (nullable = true)
 |    |-- total_item_quantity: long (nullable = true)
 |    |-- unique_items: long (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_previous_timestamp: long (nullable = true)
 |-- event_timestamp: long (nullable = true)
 |-- geo: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- coupon: string (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- item_name: string (nullable = true)
 |    |    |-- item_revenue_in_usd: double (nullable = true)
 |    |    |-- price_in_usd: double (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- user_first_touch_timestamp: long (nullable = true)

Use column objects to form complex expressions

In [0]:
col("ecommerce.purchase_revenue_in_usd") + col("ecommerce.total_item_quantity")
col("event_timestamp").desc()
(col("ecommerce.purchase_revenue_in_usd") * 100).cast("int")

In [0]:
display(spark.sql("SELECT ecommerce{purchase_revenue_in_usd},ecommerce{total_item_quantity} FROM events_tbl LIMIT 2"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mParseException[0m                            Traceback (most recent call last)
[0;32m<command-344034784584742>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mdisplay[0m[0;34m([0m[0mspark[0m[0;34m.[0m[0msql[0m[0;34m([0m[0;34m"SELECT ecommerce{purchase_revenue_in_usd},ecommerce{total_item_quantity} FROM events_tbl LIMIT 2"[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/session.py[0m in [0;36msql[0;34m(self, sqlQuery)[0m
[1;32m    775[0m         [0;34m[[0m[0mRow[0m[0;34m([0m[0mf1[0m[0;34m=[0m[0;36m1[0m[0;34m,[0m [0mf2[0m[0;34m=[0m[0;34m'row1'[0m[0;34m)[0m[0;34m,[0m [0mRow[0m[0;34m([0m[0mf1[0m[0;34m=[0m[0;36m2[0m[0;34m,[0m [0mf2[0m[0;34m=[0m[0;34m'row2'[0m[0;34m)[0m[0;34m,[0m [0mRow[0m[0;34m([0m[0mf1[0m[0;34m=[0m[0;36m3[0m[0;34m,[0m [0mf2[0m[0;34m=[

### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Subset columns
Use DataFrame transformations to subset columns

#### **`select()`**
Selects a set of columns or column based expressions

In [0]:
# devicesDF = eventsDF.select("user_id", "device")
devicesDF=spark.sql("SELECT user_id, device from events_tbl")
display(devicesDF)

user_id,device
UA000000107379500,macOS
UA000000107359357,Windows
UA000000107375547,macOS
UA000000107370581,iOS
UA000000107377108,Windows
UA000000107377161,Windows
UA000000107370851,iOS
UA000000107360961,macOS
UA000000107376205,Android
UA000000107359805,Windows


In [0]:
from pyspark.sql.functions import col

# locationsDF = eventsDF.select("user_id", 
#   col("geo.city").alias("city"),
#   col("geo.state").alias("state"))

locationsDF=spark.sql("SELECT user_id, geo.city, geo.state FROM events_tbl")

display(locationsDF)

user_id,city,state
UA000000107379500,Montrose,MI
UA000000107359357,Northampton,MA
UA000000107375547,Salinas,CA
UA000000107370581,Everett,MA
UA000000107377108,Cottage Grove,MN
UA000000107377161,Medina,MN
UA000000107370851,Mount Pleasant,UT
UA000000107360961,Piedmont,AL
UA000000107376205,Rancho Santa Margarita,CA
UA000000107359805,Elyria,OH


#### **`selectExpr()`**
Selects a set of SQL expressions

In [0]:
# appleDF = eventsDF.selectExpr("user_id", "device in ('macOS', 'iOS') as apple_user")
appleDF = spark.sql("SELECT user_id, device FROM events_tbl WHERE device in ('macOs','iOS')")
display(appleDF)

user_id,device
UA000000107370581,iOS
UA000000107370851,iOS
UA000000107369909,iOS
UA000000107359573,iOS
UA000000107369512,iOS
UA000000107365065,iOS
UA000000107374663,iOS
UA000000107359571,iOS
UA000000107360852,iOS
UA000000107373270,iOS


#### `drop()`
Returns a new DataFrame after dropping the given column, specified as a string or column object

Use strings to specify multiple columns

In [0]:
anonymousDF = eventsDF.drop("user_id", "geo", "device")
display(anonymousDF)

In [0]:
noSalesDF = eventsDF.drop(col("ecommerce"))
display(noSalesDF)

### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Add or replace columns
Use DataFrame transformations to add or replace columns

#### `withColumn`
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

In [0]:
mobileDF = eventsDF.withColumn("mobile", col("device").isin("iOS", "Android"))
display(mobileDF)

In [0]:
purchaseQuantityDF = eventsDF.withColumn("purchase_quantity", col("ecommerce.total_item_quantity").cast("int"))
purchaseQuantityDF.printSchema()

#### `withColumnRenamed()`
Returns a new DataFrame with a column renamed.

In [0]:
locationDF = eventsDF.withColumnRenamed("geo", "location")
display(locationDF)

### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Subset Rows
Use DataFrame transformations to subset rows

#### `filter()`
Filters rows using the given SQL expression or column based condition.

In [0]:
purchasesDF = eventsDF.filter("ecommerce.total_item_quantity > 0")
display(purchasesDF)

In [0]:
revenueDF = eventsDF.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
display(revenueDF)

In [0]:
androidDF = eventsDF.filter((col("traffic_source") != "direct") & (col("device") == "Android"))
display(androidDF)

#### `dropDuplicates()`
Returns a new DataFrame with duplicate rows removed, optionally considering only a subset of columns.

##### Alias: `distinct`

In [0]:
eventsDF.distinct()

In [0]:
distinctUsersDF = eventsDF.dropDuplicates(["user_id"])
display(distinctUsersDF)

#### `limit()`
Returns a new DataFrame by taking the first n rows.

In [0]:
limitDF = eventsDF.limit(100)
display(limitDF)

### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Sort rows
Use DataFrame transformations to sort rows

#### `sort()`
Returns a new DataFrame sorted by the given columns or expressions.

##### Alias: `orderBy`

In [0]:
increaseTimestampsDF = eventsDF.sort("event_timestamp")
display(increaseTimestampsDF)

In [0]:
decreaseTimestampsDF = eventsDF.sort(col("event_timestamp").desc())
display(decreaseTimestampsDF)

In [0]:
increaseSessionsDF = eventsDF.orderBy(["user_first_touch_timestamp", "event_timestamp"])
display(increaseSessionsDF)

In [0]:
decreaseSessionsDF = eventsDF.sort(col("user_first_touch_timestamp").desc(), col("event_timestamp"))
display(decreaseSessionsDF)

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Purchase Revenues Lab

Prepare dataset of events with purchase revenue.
1. Extract purchase revenue for each event
2. Filter events where revenue is not null
3. Check what types of events have revenue
4. Drop unneeded column

##### Methods
- DataFrame: `select`, `drop`, `withColumn`, `filter`, `dropDuplicates`
- Column: `isNotNull`

In [0]:
eventsDF = spark.read.parquet(eventsPath)
display(eventsDF)

### 1. Extract purchase revenue for each event
Add new column **`revenue`** by extracting **`ecommerce.purchase_revenue_in_usd`**

In [0]:
# TODO
revenueDF = eventsDF.FILL_IN
display(revenueDF)

-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work

In [0]:
expected1 = [5830.0, 5485.0, 5289.0, 5219.1, 5180.0, 5175.0, 5125.0, 5030.0, 4985.0, 4985.0]
result1 = [row.revenue for row in revenueDF.sort(col("revenue").desc_nulls_last()).limit(10).collect()]

assert(expected1 == result1)

### 2. Filter events where revenue is not null
Filter for records where **`revenue`** is not **`null`**

In [0]:
# TODO
purchasesDF = revenueDF.FILL_IN
display(purchasesDF)

### 3. Check what types of events have revenue
Find unique **`event_name`** values in **`purchasesDF`** in one of two ways:
- Select "event_name" and get distinct records
- Drop duplicate records based on the "event_name" only

Hint: There's only one event associated with revenues

In [0]:
# TODO
distinctDF = purchasesDF.FILL_IN
display(distinctDF)

### 4. Drop unneeded column
Since there's only one event type, drop **`event_name`** from **`purchasesDF`**.

In [0]:
# TODO
finalDF = purchasesDF.FILL_IN
display(finalDF)

### 5. Chain all the steps above excluding step 3

In [0]:
# TODO
finalDF = (eventsDF
  .FILL_IN
)

display(finalDF)

-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work

In [0]:
assert(finalDF.count() == 180678)

In [0]:
expected_columns = {'device', 'ecommerce', 'event_previous_timestamp', 'event_timestamp', 
                    'geo', 'items', 'revenue', 'traffic_source', 
                    'user_first_touch_timestamp', 'user_id'}
assert(set(finalDF.columns) == expected_columns)

### Clean up classroom

In [0]:
%run ./Includes/Classroom-Cleanup
