-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

<i18n value="358d2c22-9d78-4888-a7ec-54b7d5f3db64"/>


# Just Enough Python for Databricks SQL Lab

## Learning Objectives
By the end of this lab, you should be able to:
* Review basic Python code and describe expected outcomes of code execution
* Reason through control flow statements in Python functions
* Add parameters to a SQL query by wrapping it in a Python function

In [0]:
%run ../Includes/Classroom-Setup-05.3L

Python interpreter will be restarted.
Python interpreter will be restarted.


Resetting the learning environment:
| removing the working directory "dbfs:/mnt/dbacademy-users/munirsheikhcloudseekho@gmail.com/data-engineering-with-databricks"...(1 seconds)

Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/data-engineering-with-databricks/v02"

Validating the locally installed datasets:
| listing local files...(7 seconds)
| completed (7 seconds total)

Using the "default" schema.

Predefined paths variables:
| DA.paths.working_dir: dbfs:/mnt/dbacademy-users/munirsheikhcloudseekho@gmail.com/data-engineering-with-databricks
| DA.paths.user_db:     dbfs:/mnt/dbacademy-users/munirsheikhcloudseekho@gmail.com/data-engineering-with-databricks/database.db
| DA.paths.datasets:    dbfs:/mnt/dbacademy-datasets/data-engineering-with-databricks/v02
| DA.paths.checkpoints: dbfs:/mnt/dbacademy-users/munirsheikhcloudseekho@gmail.com/data-engineering-with-databricks/_checkpoints

Setup completed (9 seconds)


<i18n value="97cba873-1459-478f-831b-b52fc54265eb"/>


# Reviewing Python Basics

In the previous notebook, we briefly explored using **`spark.sql()`** to execute arbitrary SQL commands from Python.

Look at the following 3 cells. Before executing each cell, identify:
1. The expected output of cell execution
1. What logic is being executed
1. Changes to the resultant state of the environment

Then execute the cells, compare the results to your expectations, and see the explanations below.

In [0]:
course = "dewd"

In [0]:
spark.sql(f"SELECT '{course}' AS course_name")

Out[8]: DataFrame[course_name: string]

In [0]:
df = spark.sql(f"SELECT '{course}' AS course_name")
display(df)

course_name
dewd


<i18n value="bc8fda28-92ad-4cd5-aa24-34022269698a"/>


1. **Cmd 5** assigns a string to a variable. When a variable assignment is successful, no output is displayed to the notebook. A new variable is added to the current execution environment.
1. **Cmd 6** executes a SQL query and displays the schema for the DataFrame alongside the word **`DataFrame`**. In this case, the SQL query is just to select a string, so no changes to our environment occur. 
1. **Cmd 7** executes the same SQL query and displays the output of the DataFrame. This combination of **`display()`** and **`spark.sql()`** most closely mirrors executing logic in a **`%sql`** cell; the results will always be printed in a formatted table, assuming results are returned by the query; some queries will instead manipulate tables or databases, in which case the word **`OK`** will print to show successful execution. In this case, no changes to our environment occur from running this code.

<i18n value="ef0b350e-c470-4e89-9617-948e49dd1710"/>


## Setting Up a Development Environment

Throughout this course, we use logic similar to the following cell to capture information about the user currently executing the notebook and create an isolated development database.

The **`re`** library is the <a href="https://docs.python.org/3/library/re.html" target="_blank">standard Python library for regex</a>.

Databricks SQL has a special method to capture the username of the **`current_user()`**; and the **`.first()[0]`** code is a quick hack to capture the first row of the first column of a query executed with **`spark.sql()`** (in this case, we do this safely knowing that there will only be 1 row and 1 column).

All other logic below is just string formatting.

In [0]:
import re

username = spark.sql("SELECT current_user()").first()[0]
clean_username = re.sub("[^a-zA-Z0-9]", "_", username)
schema_name = f"dbacademy_{clean_username}_{course}_5_3l"
working_dir = f"dbfs:/user/{username}/dbacademy/{course}/5.3l"

print(f"username:    {username}")
print(f"schema_name:     {schema_name}")
print(f"working_dir: {working_dir}")

username:    munirsheikhcloudseekho@gmail.com
schema_name:     dbacademy_munirsheikhcloudseekho_gmail_com_dewd_5_3l
working_dir: dbfs:/user/munirsheikhcloudseekho@gmail.com/dbacademy/dewd/5.3l


<i18n value="1273f7a3-823a-4b1f-914a-ce6eaaa867b3"/>


Below, we add a simple control flow statement to this logic to create and use this user-specific database. 

Optionally, we will reset this database and drop all of the contents on repeat execution. (Note the the default value for the parameter **`reset`** is **`True`**).

In [0]:
def create_database(course, reset=True):
    import re

    username = spark.sql("SELECT current_user()").first()[0]
    clean_username = re.sub("[^a-zA-Z0-9]", "_", username)
    schema_name = f"dbacademy_{clean_username}_{course}_5_3l"
    working_dir = f"dbfs:/user/{username}/dbacademy/{course}/5.3l"

    print(f"username:    {username}")
    print(f"schema_name: {schema_name}")
    print(f"working_dir: {working_dir}")

    if reset:
        spark.sql(f"DROP DATABASE IF EXISTS {schema_name} CASCADE")
        dbutils.fs.rm(working_dir, True)
        
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {schema_name} LOCATION '{working_dir}/{schema_name}.db'")
    spark.sql(f"USE {schema_name}")
    
create_database(course)

username:    munirsheikhcloudseekho@gmail.com
schema_name: dbacademy_munirsheikhcloudseekho_gmail_com_dewd_5_3l
working_dir: dbfs:/user/munirsheikhcloudseekho@gmail.com/dbacademy/dewd/5.3l


<i18n value="cfa0adf3-cc23-4ba1-8daf-2c70af7fa079"/>


While this logic as defined is geared toward isolating students in shared workspaces for instructional purposes, the same basic design could be leveraged for testing new logic in an isolated environment before pushing to production.

<i18n value="1c994e19-2b72-45c3-a174-8a7e21701688"/>


## Handling Errors Gracefully

Review the logic in the function below.

Note that we've just declared a new database that currently contains no tables.

In [0]:
def query_or_make_demo_table(table_name):
    try:
        display(spark.sql(f"SELECT * FROM {table_name}"))
        print(f"Displayed results for the table {table_name}")
        
    except:
        spark.sql(f"CREATE TABLE {table_name} (id INT, name STRING, value DOUBLE, state STRING)")
        spark.sql(f"""INSERT INTO {table_name}
                      VALUES (1, "Yve", 1.0, "CA"),
                             (2, "Omar", 2.5, "NY"),
                             (3, "Elia", 3.3, "OH"),
                             (4, "Rebecca", 4.7, "TX"),
                             (5, "Ameena", 5.3, "CA"),
                             (6, "Ling", 6.6, "NY"),
                             (7, "Pedro", 7.1, "KY")""")
        
        display(spark.sql(f"SELECT * FROM {table_name}"))
        print(f"Created the table {table_name}")

<i18n value="5a449d08-9811-4b0d-9004-74b8bb04eef5"/>


Try to identify the following before executing the next cell:
1. The expected output of cell execution
1. What logic is being executed
1. Changes to the resultant state of the environment

In [0]:
query_or_make_demo_table("demo_table")

id,name,value,state
1,Yve,1.0,CA
2,Omar,2.5,NY
3,Elia,3.3,OH
4,Rebecca,4.7,TX
5,Ameena,5.3,CA
6,Ling,6.6,NY
7,Pedro,7.1,KY


Created the table demo_table


<i18n value="8ddb2ea1-9e4e-4ac7-a369-ff984114653f"/>


Now answer the same three questions before running the same query below.

In [0]:
query_or_make_demo_table("demo_table")

id,name,value,state
1,Yve,1.0,CA
2,Omar,2.5,NY
3,Elia,3.3,OH
4,Rebecca,4.7,TX
5,Ameena,5.3,CA
6,Ling,6.6,NY
7,Pedro,7.1,KY


Displayed results for the table demo_table


<i18n value="6efbda51-9c51-440a-aaaf-7276ad175398"/>


- On the first execution, the table **`demo_table`** did not yet exist. As such, the attempt to return the contents of the table created an error, which resulted in our **`except`** block of logic executing. This block:
  1. Created the table
  1. Inserted values
  1. Printed or displayed the contents of the table
- On the second execution, the table **`demo_table`** already exists, and so the first query in the **`try`** block executes without error. As a result, we just display the results of the query without modifying anything in our environment.

<i18n value="a0f957ea-7604-46b9-9b06-d672b73efcec"/>


## Adapting SQL to Python
Let's consider the following SQL query against our demo table created above.

In [0]:
%sql
SELECT id, value 
FROM demo_table
WHERE state = "CA"

id,value
1,1.0
5,5.3


<i18n value="c4abcb35-3733-4565-8f8c-0df4b23f1e71"/>



which can also be expressed using the PySpark API and the **`display`** function as seen here:

In [0]:
results = spark.sql("SELECT id, value FROM demo_table WHERE state = 'CA'")
display(results)

id,value
1,1.0
5,5.3


<i18n value="6a4e7e96-c53a-4b8e-abf5-412fe4170c27"/>


Let's use this simple example to practice creating a Python function that adds optional functionality.

Our target function will:
* Be based upon a query that only includes the **`id`** and **`value`** columns from the a table named **`demo_table`**
* Will allow filtering of that query by **`state`** where the the default behavior is to include all states
* Will optionally render the results of the query using the **`display`** function where the default behavior is to not render
* Will return:
  * The query result object (a PySpark DataFrame) if **`render_results`** is False
  * The **`None`** value  if **`render_results`** is True

Stretch Goal:
* Add an assert statement to verify that the value passed for the **`state`** parameter contains two, uppercase letters

Some starter logic has been provided below:

In [0]:
# ANSWER
def preview_values(state=None, render_results=False):
    query = "SELECT id, value FROM demo_table"
    
    if state is not None:
        assert state == state.upper() and len(state) == 2, "Please use the standard 2-letter, uppercase, state abbreviations"
        query += f" WHERE state = '{state}'"
    
    query_results = spark.sql(query)
    
    if render_results:
        display(query_results)
        return None
    else:
        return query_results

<i18n value="060207a1-a34c-4817-abee-f6e0b9c3b48a"/>


The assert statements below can be used to check whether or not your function works as intended.

In [0]:
import pyspark.sql.dataframe

assert type(preview_values()) == pyspark.sql.dataframe.DataFrame, "Function should return the results as a DataFrame"
assert preview_values().columns == ["id", "value"], "Query should only return **`id`** and **`value`** columns"

assert preview_values(render_results=True) is None, "Function should not return None when rendering"
assert preview_values(render_results=False) is not None, "Function should return DataFrame when not rendering"

assert preview_values(state=None).count() == 7, "Function should allow no state"
assert preview_values(state="NY").count() == 2, "Function should allow filtering by state"
assert preview_values(state="CA").count() == 2, "Function should allow filtering by state"
assert preview_values(state="OH").count() == 1, "Function should allow filtering by state"

id,value
1,1.0
2,2.5
3,3.3
4,4.7
5,5.3
6,6.6
7,7.1


-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>