### This notebook shows examples of using SQL to explore and handle data
To follow this example notebook, execute the cells in order.  
The keyboard shortcut to execute the current cell and jump to the following is: Shift+Enter.

To delete cells no longer needed (including this one), you can use the context menu or use the Escape key (to exit any cell you might be in) and then press the d key twice. You can select multiple cells using Shift + Up/Down, to delete many quickly.  

This example assumes the configured role has permission to read/write on the default catalog database and the s3 glue temporary folder, otherwise update the code or the permissions accordingly.

####  Running the following cell will set up and start your interactive session.

In [None]:
%idle_timeout 120
%glue_version 3.0
%worker_type G.1X
%number_of_workers 3

import boto3
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
%%sql
-- This is a SQL cell running against the account Glue catalog in the same region
-- One query per cell
show databases

In [None]:
%%sql
-- You can use ANSI SQL syntax to explore the catalog and run queries that print the results
-- You can even run DDL to make changes, such as ALTER TABLE
-- If your database or table has special characters, you can escape the name with a backtick `
-- for instance: SELECT * FROM `mydb`.`mytable` LIMIT 10
show tables in default

In [None]:
# This is a Pyspark cell (which is the default)
# In the previous cell, tables with more than 20 characters in the name would be truncated
# or if you have more than 20 tables, some are not displayed
# Here we do the same but using directly the API to have more control:
spark.sql("show tables in default").show(n=30, truncate=False)

In [None]:
# If you data doesn't have a table in the catalog, you can use a temporary view to use SQL
# Here we read all the CSV files under the indicated s3 path
medicareDF = spark.read.csv("s3://awsglue-datasets/examples/medicare/", header=True)
# If there data has a reasonable size (like in this case), we can cache in memory/disk (depending on cluster size) 
# so after the first query, the following no longer have to go to read and parse from s3
medicareDF.cache()
# Instead of using the DataFrame API, you can register it as a view for SQL usage like this:
medicareDF.registerTempTable("medicare")

In [None]:
# Explore the data, since it has many long columns, change the display to be vertical for easier read
spark.sql("SELECT * from medicare").show(n=10, truncate=False, vertical=True)

In [None]:
# You can also register as a view the result of another query, to avoid repetition
spark.sql("SELECT * FROM medicare WHERE `Provider State` = 'NY'").registerTempTable("ny_medicare")

In [None]:
# You can retrieve the results of the query into a Python variable if it's small enough
# For example, retrieve the Diagnostric Related Group with the highest average payments in New York
diagnostic_group = spark.sql("""
SELECT DISTINCT FIRST_VALUE(`DRG Definition`)  OVER (ORDER BY `Average Medicare Payments` DESC) AS drg
FROM ny_medicare
""").collect()[0]['drg']
print(diagnostic_group)

In [None]:
# By default this example will save using the glue temporary bucket, you can replace it for another of your choice
output_bucket = f"aws-glue-temporary-{boto3.client('sts').get_caller_identity()['Account']}-{boto3.session.Session().region_name}"

provider_summary_df = spark.sql("""
SELECT `Provider Id`, `Provider Name`, sum(` Total Discharges `) as `Total Discharges` 
FROM medicare GROUP BY `Provider Id`, `Provider Name`
""")
# Save as a single CSV file with headers under the indicated s3 path
# Be careful, mode overwrite will wipe the ouput directory before writing
# The parenthesis are an alternative to \ to break a Python command into multiple lines
(provider_summary_df.coalesce(1).write
    .mode("overwrite")
    .csv(f"s3://{output_bucket}/example/medicare_by_provider_summary/", header=True)
)

In [None]:
# Or save it as a parquet table on the catalog, we also rename the columns for the table
(provider_summary_df
    .withColumnRenamed("Provider Id", "provider_id")
    .withColumnRenamed("Provider Name", "provider_name")
    .withColumnRenamed("Total Discharges", "total_discharges")
    .write
    .mode("overwrite")
    .option("path", f"s3://{output_bucket}/example/medicare_by_provider_summary_table/")
    .format("parquet")
    .saveAsTable("default.example_medicare_by_provider_summary")
)

In [None]:
%%sql
-- Check the new catalog table
DESCRIBE TABLE default.example_medicare_by_provider_summary

In [None]:
# Free the cache when no longer needed 
medicareDF.unpersist()

# Remove the table (but not the s3 files)
spark.sql("DROP TABLE default.example_medicare_by_provider_summary")