
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning">
</div>



 
# Cleaning Data

As we inspect and clean our data, we'll need to construct various column expressions and queries to express transformations to apply on our dataset.

Column expressions are constructed from existing columns, operators, and built-in functions. They can be used in **`SELECT`** statements to express transformations that create new columns.

Many standard SQL query commands (e.g. **`DISTINCT`**, **`WHERE`**, **`GROUP BY`**, etc.) are available in Spark SQL to express transformations.

In this notebook, we'll review a few concepts that might differ from other systems you're used to, as well as calling out a few useful functions for common operations.

We'll pay special attention to behaviors around **`NULL`** values, as well as formatting strings and datetime fields.

## Learning Objectives
By the end of this lesson, you should be able to:
- Summarize datasets and describe null behaviors
- Retrieve and remove duplicates
- Validate datasets for expected counts, missing values, and duplicate records
- Apply common transformations to clean and transform data



## Run Setup

The setup script will create the data and declare necessary values for the rest of this notebook to execute.

In [0]:
%run ./Includes/Classroom-Setup-02.4

## Data Overview

We'll work with new users records from the **`users_dirty`** table, which has the following schema:

| field | type | description |
|---|---|---|
| user_id | string | unique identifier |
| user_first_touch_timestamp | long | time at which the user record was created in microseconds since epoch |
| email | string | most recent email address provided by the user to complete an action |
| updated | timestamp | time at which this record was last updated |

Let's start by counting values in each field of our data.

In [0]:
SELECT count(*), count(user_id), count(user_first_touch_timestamp), count(email), count(updated)
FROM users_dirty


## Inspect Missing Data

Based on the counts above, it looks like there are at least a handful of null values in all of our fields.

**NOTE:** Null values behave incorrectly in some math functions, including **`count()`**.

- **`count(col)`** skips **`NULL`** values when counting specific columns or expressions.
- **`count(*)`** is a special case that counts the total number of rows (including rows that are only **`NULL`** values).

We can count null values in a field by filtering for records where that field is null, using either:  
**`count_if(col IS NULL)`** or **`count(*)`** with a filter for where **`col IS NULL`**. 

Both statements below correctly count records with missing emails.

In [0]:
SELECT count_if(email IS NULL) FROM users_dirty;
SELECT count(*) FROM users_dirty WHERE email IS NULL;

In [0]:
%python 
from pyspark.sql.functions import col
usersDF = spark.read.table("users_dirty")

usersDF.selectExpr("count_if(email IS NULL)")
usersDF.where(col("email").isNull()).count()

 
## Deduplicate Rows
We can use **`DISTINCT *`** to remove true duplicate records where entire rows contain the same values.

In [0]:
SELECT DISTINCT(*) FROM users_dirty

In [0]:
%python
usersDF.distinct().display()


  
## Deduplicate Rows Based on Specific Columns

The code below uses **`GROUP BY`** to remove duplicate records based on **`user_id`** and **`user_first_touch_timestamp`** column values. (Recall that these fields are both generated when a given user is first encountered, thus forming unique tuples.)

Here, we are using the aggregate function **`max`** as a hack to:
- Keep values from the **`email`** and **`updated`** columns in the result of our group by
- Capture non-null emails when multiple records are present

In [0]:
CREATE OR REPLACE TEMP VIEW deduped_users AS 
SELECT user_id, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
FROM users_dirty
WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;

SELECT count(*) FROM deduped_users

In [0]:
%python
from pyspark.sql.functions import max
dedupedDF = (usersDF
    .where(col("user_id").isNotNull())
    .groupBy("user_id", "user_first_touch_timestamp")
    .agg(max("email").alias("email"), 
         max("updated").alias("updated"))
    )

dedupedDF.count()


Let's confirm that we have the expected count of remaining records after deduplicating based on distinct **`user_id`** and **`user_first_touch_timestamp`** values.

In [0]:
SELECT COUNT(DISTINCT(user_id, user_first_touch_timestamp))
FROM users_dirty
WHERE user_id IS NOT NULL

In [0]:
%python
(usersDF
    .dropDuplicates(["user_id", "user_first_touch_timestamp"])
    .filter(col("user_id").isNotNull())
    .count())


## Validate Datasets
Based on our manual review above, we've visually confirmed that our counts are as expected.
 
We can also programmatically perform validation using simple filters and **`WHERE`** clauses.

Validate that the **`user_id`** for each row is unique.

In [0]:
SELECT max(row_count) <= 1 no_duplicate_ids FROM (
  SELECT user_id, count(*) AS row_count
  FROM deduped_users
  GROUP BY user_id)

In [0]:
%python
from pyspark.sql.functions import count

display(dedupedDF
    .groupBy("user_id")
    .agg(count("*").alias("row_count"))
    .select((max("row_count") <= 1).alias("no_duplicate_ids")))




Confirm that each email is associated with at most one **`user_id`**.

In [0]:
SELECT max(user_id_count) <= 1 at_most_one_id FROM (
  SELECT email, count(user_id) AS user_id_count
  FROM deduped_users
  WHERE email IS NOT NULL
  GROUP BY email)

In [0]:
%python

display(dedupedDF
    .where(col("email").isNotNull())
    .groupby("email")
    .agg(count("user_id").alias("user_id_count"))
    .select((max("user_id_count") <= 1).alias("at_most_one_id")))


 
## Date Format and Regex
Now that we've removed null fields and eliminated duplicates, we may wish to extract further value out of the data.

The code below:
- Correctly scales and casts the **`user_first_touch_timestamp`** to a valid timestamp
- Extracts the calendar date and clock time for this timestamp in human readable format
- Uses **`regexp_extract`** to extract the domains from the email column using regex

In [0]:
SELECT *, 
  date_format(first_touch, "MMM d, yyyy") AS first_touch_date,
  date_format(first_touch, "HH:mm:ss") AS first_touch_time,
  regexp_extract(email, "(?<=@).+", 0) AS email_domain
FROM (
  SELECT *,
    CAST(user_first_touch_timestamp / 1e6 AS timestamp) AS first_touch 
  FROM deduped_users
)

In [0]:
%python
from pyspark.sql.functions import date_format, regexp_extract

display(dedupedDF
    .withColumn("first_touch", (col("user_first_touch_timestamp") / 1e6).cast("timestamp"))
    .withColumn("first_touch_date", date_format("first_touch", "MMM d, yyyy"))
    .withColumn("first_touch_time", date_format("first_touch", "HH:mm:ss"))
    .withColumn("email_domain", regexp_extract("email", "(?<=@).+", 0))
)

Databricks data profile. Run in Databricks to view.


## Data Profile 

Databricks version 9.1 and newer offer two convenient methods for data profiling within Notebooks: through the cell output UI and via the dbutils library.

When working with data frames or the results of SQL queries in a Databricks Notebook, users have the option to access a dedicated **Data Profile** tab. Clicking on this tab initiates the creation of an extensive data profile, providing not only summary statistics but also histograms that cover the entire dataset, ensuring a comprehensive view of the data, rather than just what is visible.

This data profile encompasses a range of insights, including information about numeric, string, and date columns, making it a powerful tool for data exploration and understanding.

**Using cell output UI:**

1. In the cell output, you will see a `Table` tab on the right.

1. Click on the `Table` tab to access the cell output options.

1. Next to the `Table` tab, you'll find a "Data Profile" tab. Click on it.

1. Databricks will automatically execute a new command to generate a data profile.

1. The generated data profile will provide summary statistics for numeric, string, and date columns, along with histograms of value distributions for each column.


 
Run the following cell to delete the tables and files associated with this lesson.

In [0]:
%python
DA.cleanup()


&copy; 2024 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the 
<a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/><a href="https://databricks.com/privacy-policy">Privacy Policy</a> | 
<a href="https://databricks.com/terms-of-use">Terms of Use</a> | 
<a href="https://help.databricks.com/">Support</a>