
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>




# DataFrame & Column
##### Objectives
1. Construct columns
1. Subset columns
1. Add or replace columns
1. Subset rows
1. Sort rows

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html" target="_blank">DataFrame</a>: **`select`**, **`selectExpr`**, **`drop`**, **`withColumn`**, **`withColumnRenamed`**, **`filter`**, **`distinct`**, **`limit`**, **`sort`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/column.html" target="_blank">Column</a>: **`alias`**, **`isin`**, **`cast`**, **`isNotNull`**, **`desc`**, operators

In [0]:
%run ../Includes/Classroom-Setup

Python interpreter will be restarted.
Python interpreter will be restarted.


Resetting the learning environment:
| No action taken

Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03"

Validating the locally installed datasets:
| listing local files...(7 seconds)
| validation completed...(7 seconds total)

Creating & using the schema "muaazkhurshid_q1yg_da_asp" in the catalog "spark_catalog"...(2 seconds)

Predefined tables in "muaazkhurshid_q1yg_da_asp":
| -none-

Predefined paths variables:
| DA.paths.working_dir: dbfs:/mnt/dbacademy-users/muaazkhurshid@uni-koblenz.de/apache-spark-programming-with-databricks
| DA.paths.user_db:     dbfs:/mnt/dbacademy-users/muaazkhurshid@uni-koblenz.de/apache-spark-programming-with-databricks/database.db
| DA.paths.datasets:    dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03
| DA.paths.checkpoints: dbfs:/mnt/dbacademy-users/muaazkhurshid@uni-koblenz.de/apache-spark-programming-with-databricks/_checkpoints

Setup completed (30 seconds)






Let's use the BedBricks events dataset.

In [0]:
events_df = spark.read.format("delta").load(DA.paths.events)
display(events_df)

device,ecommerce,event_name,event_previous_timestamp,event_timestamp,geo,items,traffic_source,user_first_touch_timestamp,user_id
macOS,"List(null, null, null)",warranty,1593878899217692.0,1593878946592107,"List(Montrose, MI)",List(),google,1593878899217692,UA000000107379500
Windows,"List(null, null, null)",press,1593876662175340.0,1593877011756535,"List(Northampton, MA)",List(),google,1593876662175340,UA000000107359357
macOS,"List(null, null, null)",add_item,1593878792892652.0,1593878815459100,"List(Salinas, CA)","List(List(null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1))",youtube,1593878455472030,UA000000107375547
iOS,"List(null, null, null)",mattresses,1593878178791663.0,1593878809276923,"List(Everett, MA)",List(),facebook,1593877903116176,UA000000107370581
Windows,"List(null, null, null)",mattresses,,1593878628143633,"List(Cottage Grove, MN)",List(),google,1593878628143633,UA000000107377108
Windows,"List(null, null, null)",main,,1593878634344194,"List(Medina, MN)",List(),youtube,1593878634344194,UA000000107377161
iOS,"List(null, null, null)",main,,1593877936171803,"List(Mount Pleasant, UT)",List(),direct,1593877936171803,UA000000107370851
macOS,"List(null, null, null)",main,,1593876843215329,"List(Piedmont, AL)",List(),instagram,1593876843215329,UA000000107360961
Android,"List(null, null, null)",warranty,1593878529774474.0,1593879213196400,"List(Rancho Santa Margarita, CA)",List(),instagram,1593878529774474,UA000000107376205
Windows,"List(null, null, null)",main,,1593876713246514,"List(Elyria, OH)",List(),facebook,1593876713246514,UA000000107359805





## Column Expressions

A <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/column.html" target="_blank">Column</a> is a logical construction that will be computed based on the data in a DataFrame using an expression

Construct a new Column based on existing columns in a DataFrame

In [0]:
from pyspark.sql.functions import col

print(events_df.device)
print(events_df["device"])
print(col("device"))

Column<'device'>
Column<'device'>
Column<'device'>




Scala supports an additional syntax for creating a new Column based on existing columns in a DataFrame

In [0]:
%scala
$"device"




### Column Operators and Methods
| Method | Description |
| --- | --- |
| \*, + , <, >= | Math and comparison operators |
| ==, != | Equality and inequality tests (Scala operators are **`===`** and **`=!=`**) |
| alias | Gives the column an alias |
| cast, astype | Casts the column to a different data type |
| isNull, isNotNull, isNan | Is null, is not null, is NaN |
| asc, desc | Returns a sort expression based on ascending/descending order of the column |




Create complex expressions with existing columns, operators, and methods.

In [0]:
col("ecommerce.purchase_revenue_in_usd") + col("ecommerce.total_item_quantity")
col("event_timestamp").desc()
(col("ecommerce.purchase_revenue_in_usd") * 100).cast("int")

Out[6]: Column<'CAST((ecommerce.purchase_revenue_in_usd * 100) AS INT)'>



Here's an example of using these column expressions in the context of a DataFrame

In [0]:
rev_df = (events_df
         .filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
         .withColumn("purchase_revenue", (col("ecommerce.purchase_revenue_in_usd") * 100).cast("int"))
         .withColumn("avg_purchase_revenue", col("ecommerce.purchase_revenue_in_usd") / col("ecommerce.total_item_quantity"))
         .sort(col("avg_purchase_revenue").desc())
        )

# display(rev_df)



## DataFrame Transformation Methods
| Method | Description |
| --- | --- |
| **`select`** | Returns a new DataFrame by computing given expression for each element |
| **`drop`** | Returns a new DataFrame with a column dropped |
| **`withColumnRenamed`** | Returns a new DataFrame with a column renamed |
| **`withColumn`** | Returns a new DataFrame by adding a column or replacing the existing column that has the same name |
| **`filter`**, **`where`** | Filters rows using the given condition |
| **`sort`**, **`orderBy`** | Returns a new DataFrame sorted by the given expressions |
| **`dropDuplicates`**, **`distinct`** | Returns a new DataFrame with duplicate rows removed |
| **`limit`** | Returns a new DataFrame by taking the first n rows |
| **`groupBy`** | Groups the DataFrame using the specified columns, so we can run aggregation on them |



### Subset columns
Use DataFrame transformations to subset columns



#### **`select()`**
Selects a list of columns or column based expressions

In [0]:
devices_df = events_df.select("user_id", "device")
# display(devices_df)

In [0]:
from pyspark.sql.functions import col

locations_df = events_df.select(
    "user_id", 
    col("geo.city").alias("city"), 
    col("geo.state").alias("state")
)
# display(locations_df)



#### **`selectExpr()`**
Selects a list of SQL expressions

In [0]:
apple_df = events_df.selectExpr("user_id", "device in ('macOS', 'iOS') as apple_user")
# display(apple_df)



#### **`drop()`**
Returns a new DataFrame after dropping the given column, specified as a string or Column object

Use strings to specify multiple columns

In [0]:
anonymous_df = events_df.drop("user_id", "geo", "device")
# display(anonymous_df)

In [0]:
no_sales_df = events_df.drop(col("ecommerce"))
# display(no_sales_df)




### Add or replace columns
Use DataFrame transformations to add or replace columns




#### **`withColumn()`**
Returns a new DataFrame by adding a column or replacing an existing column that has the same name.

In [0]:
mobile_df = events_df.withColumn("mobile", col("device").isin("iOS", "Android"))
# display(mobile_df)

In [0]:
purchase_quantity_df = events_df.withColumn("purchase_quantity", col("ecommerce.total_item_quantity").cast("int"))
purchase_quantity_df.printSchema()

root
 |-- device: string (nullable = true)
 |-- ecommerce: struct (nullable = true)
 |    |-- purchase_revenue_in_usd: double (nullable = true)
 |    |-- total_item_quantity: long (nullable = true)
 |    |-- unique_items: long (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_previous_timestamp: long (nullable = true)
 |-- event_timestamp: long (nullable = true)
 |-- geo: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- coupon: string (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- item_name: string (nullable = true)
 |    |    |-- item_revenue_in_usd: double (nullable = true)
 |    |    |-- price_in_usd: double (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- user_first_touch_timestamp: long (nullable = true)




#### **`withColumnRenamed()`**
Returns a new DataFrame with a column renamed.

In [0]:
location_df = events_df.withColumnRenamed("geo", "location")
# display(location_df)




### Subset Rows
Use DataFrame transformations to subset rows




#### **`filter()`**
Filters rows using the given SQL expression or column based condition.

##### Alias: **`where`**

In [0]:
purchases_df = events_df.filter("ecommerce.total_item_quantity > 0")
# display(purchases_df)

In [0]:
revenue_df = events_df.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
# display(revenue_df)

In [0]:
android_df = events_df.filter((col("traffic_source") != "direct") & (col("device") == "Android"))
# display(android_df)



#### **`dropDuplicates()`**
Returns a new DataFrame with duplicate rows removed, optionally considering only a subset of columns.

##### Alias: **`distinct`**

In [0]:
# display(events_df.distinct()) --10,000 rows

In [0]:
distinct_users_df = events_df.dropDuplicates(["user_id"])
# display(distinct_users_df)



#### **`limit()`**
Returns a new DataFrame by taking the first n rows.

In [0]:
limit_df = events_df.limit(100)
# display(limit_df)



### Sort rows
Use DataFrame transformations to sort rows



#### **`sort()`**
Returns a new DataFrame sorted by the given columns or expressions.

##### Alias: **`orderBy`**

In [0]:
increase_timestamps_df = events_df.sort("event_timestamp")
# display(increase_timestamps_df)

In [0]:
decrease_timestamp_df = events_df.sort(col("event_timestamp").desc())
# display(decrease_timestamp_df)

In [0]:
increase_sessions_df = events_df.orderBy(["user_first_touch_timestamp", "event_timestamp"])
# display(increase_sessions_df)

In [0]:
decrease_sessions_df = events_df.sort(col("user_first_touch_timestamp").desc(), col("event_timestamp"))
# display(decrease_sessions_df)



### Clean up classroom

In [0]:
DA.cleanup()

Resetting the learning environment:
| No action taken

Validating the locally installed datasets:
| listing local files...(3 seconds)
| validation completed...(3 seconds total)


&copy; 2023 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>