# ADS-508-01-SP23 Team 8: Final Project

# Setup Database and Athena Tables

Much of the code is modified from `Fregly, C., & Barth, A. (2021). Data science on AWS: Implementing end-to-end, continuous AI and machine learning pipelines. O’Reilly.`

## Install missing dependencies

[PyAthena](https://pypi.org/project/PyAthena/) is a Python DB API 2.0 (PEP 249) compliant client for Amazon Athena.

In [3]:
!pip install --disable-pip-version-check -q PyAthena==2.1.0

[0m

## Globally import libraries

In [4]:
import boto3
from botocore.client import ClientError
import sagemaker
import pandas as pd
from pyathena import connect
from IPython.core.display import display, HTML

## Instantiate AWS SageMaker session

In [5]:
session = boto3.session.Session()
region = session.region_name
sagemaker_session = sagemaker.Session()
def_bucket = sagemaker_session.default_bucket()
bucket = 'sagemaker-us-east-ads508-sp23-t8'

s3 = boto3.Session().client(service_name="s3",
                            region_name=region)

In [6]:
setup_s3_bucket_passed = False
ingest_create_athena_db_passed = False
ingest_create_athena_table_tsv_passed = False

In [7]:
print(f"Default bucket: {def_bucket}")
print(f"Public T8 bucket: {bucket}")

Default bucket: sagemaker-us-east-1-657724983756
Public T8 bucket: sagemaker-us-east-ads508-sp23-t8


## Verify S3 Bucket Creation

In [8]:
%%bash

aws s3 ls s3://${bucket}/

2023-03-16 17:05:02 aws-athena-query-results-657724983756-us-east-1
2023-03-02 16:56:48 sagemaker-studio-657724983756-5nh7ydsouq7
2023-03-02 17:25:41 sagemaker-studio-657724983756-7yc8bp8xk0b
2023-03-02 17:01:51 sagemaker-us-east-1-657724983756
2023-03-17 05:19:31 sagemaker-us-east-ads508-sp23-t8


In [9]:
response = None

try:
    response = s3.head_bucket(Bucket=bucket)
    print(response)
    setup_s3_bucket_passed = True
except ClientError as e:
    print(f"[ERROR] Cannot find bucket {bucket} in {response} due to {e}.")

{'ResponseMetadata': {'RequestId': '3M7QCX621FG6HM0T', 'HostId': 'IYJClEFWEtK3JIGUVIFtrvq/aYESk0eFsF8HyXWqK8mqfid4vnK7iw8c+Dv1dWBkc98cfYq43FE=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'IYJClEFWEtK3JIGUVIFtrvq/aYESk0eFsF8HyXWqK8mqfid4vnK7iw8c+Dv1dWBkc98cfYq43FE=', 'x-amz-request-id': '3M7QCX621FG6HM0T', 'date': 'Tue, 21 Mar 2023 17:06:54 GMT', 'x-amz-bucket-region': 'us-east-1', 'x-amz-access-point-alias': 'false', 'content-type': 'application/xml', 'server': 'AmazonS3'}, 'RetryAttempts': 0}}


In [10]:
%store setup_s3_bucket_passed

Stored 'setup_s3_bucket_passed' (bool)


## Create Athena Database

In [11]:
database_name = "ads508_t8"

In [12]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = f"s3://{bucket}/athena/staging"
print(s3_staging_dir)

s3://sagemaker-us-east-ads508-sp23-t8/athena/staging


In [13]:
conn = connect(region_name=region,
               s3_staging_dir=s3_staging_dir)

### Verify The Database Has Been Created Succesfully

In [14]:
show_db_stmnt = "SHOW DATABASES"

df_show = pd.read_sql(show_db_stmnt,
                      conn)
df_show.head(17)

Unnamed: 0,database_name
0,ads508_t8
1,default
2,dsoaws


In [15]:
if database_name in df_show.values:
    ingest_create_athena_db_passed = True

In [16]:
%store ingest_create_athena_db_passed

Stored 'ingest_create_athena_db_passed' (bool)


# Create Parquet Files from TSV Table

As you can see from the query below, we’re also adding a new `year` column to our dataset by converting the `review_date` string to a date format, and then cast the year out of the date. Let’s store the year value as an integer. And let's partition the Parquet data by `Product Category`.

In [17]:
ingest_create_athena_table_parquet_passed = False

In [18]:
%store -r ingest_create_athena_table_tsv_passed

In [19]:
try:
    ingest_create_athena_table_tsv_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [20]:
print(ingest_create_athena_table_tsv_passed)

True


In [21]:
if not ingest_create_athena_table_tsv_passed:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


## Define custom function to create tables in existing database

In [22]:
def create_athena_tbl_tsv(conn=None,
                          db=None,
                          tbl_name=None,
                          fields='',
                          s3_path=None,
                          delim=',',
                          ret='',
                          comp='',
                          skip=''):
    # Set Athena parameters

    # SQL statement to execute
    drop_tsv_tbl_stmnt = f"""DROP TABLE IF EXISTS {db}.{tbl_name}"""

    create_tsv_tbl_stmnt = f"""
        CREATE EXTERNAL TABLE IF NOT EXISTS {db}.{tbl_name}({fields})
        ROW FORMAT DELIMITED
            FIELDS
                TERMINATED BY '{delim}'
            LINES
                TERMINATED BY '{ret}\\n'
        LOCATION '{s3_path}'
        TBLPROPERTIES ({comp}{skip})
        """

    print(f'Create table statement:\n{create_tsv_tbl_stmnt}')

    pd.read_sql(drop_tsv_tbl_stmnt,
                conn)

    pd.read_sql(create_tsv_tbl_stmnt,
                conn)
    
    # Verify The Table Has Been Created Succesfully
    show_tsv_tbl_stmnt = f"SHOW TABLES IN {db}"

    df_show = pd.read_sql(show_tsv_tbl_stmnt,
                          conn)
    display(df_show.head(17))

    if tbl_name in df_show.values:
        ingest_create_athena_table_tsv_passed = True

    print(f'\nDataframe contains records: {ingest_create_athena_table_tsv_passed}')

## Create Athena Table from Local TSV File - `census_block_loc.csv`

### Dataset columns



In [23]:
ceb_tsv_tbl_name = 'census_block'
ceb_tsv_field_list = """
latitude double,
longitude double,
blockCode string,
county string
"""
ceb_tsv_s3_raw_data_path = f"s3://{bucket}/raw_data/census_block"
print(ceb_tsv_s3_raw_data_path)

create_athena_tbl_tsv(conn=conn,
                      db=database_name,
                      tbl_name=ceb_tsv_tbl_name,
                      fields=ceb_tsv_field_list,
                      s3_path=ceb_tsv_s3_raw_data_path,
                      comp='',
                      skip="'skip.header.line.count'='1'")

s3://sagemaker-us-east-ads508-sp23-t8/raw_data/census_block
Create table statement:

        CREATE EXTERNAL TABLE IF NOT EXISTS ads508_t8.census_block(
latitude double,
longitude double,
blockCode string,
county string
)
        ROW FORMAT DELIMITED
            FIELDS
                TERMINATED BY ','
            LINES
                TERMINATED BY '\n'
        LOCATION 's3://sagemaker-us-east-ads508-sp23-t8/raw_data/census_block'
        TBLPROPERTIES ('skip.header.line.count'='1')
        


Unnamed: 0,tab_name
0,census
1,census_block
2,crime
3,crime_pqt
4,evictions
5,grad_outcomes
6,hs_info
7,jobs



Dataframe contains records: True


### Run A Sample Query

In [24]:
ceb_select_dbn_stmnt01 = f"""
    SELECT
        blockCode,
        count(*),
        min(latitude) AS min_lat,
        max(latitude) AS max_lat,
        min(longitude) AS min_long,
        max(longitude) AS max_long     
    FROM {database_name}.{ceb_tsv_tbl_name}
    GROUP BY blockCode
    ORDER BY count(*) DESC
    LIMIT 50000
    """

print(ceb_select_dbn_stmnt01)

ceb_df01_s01 = pd.read_sql(ceb_select_dbn_stmnt01,
                           conn)
print(ceb_df01_s01.shape)
display(ceb_df01_s01.head(15))


    SELECT
        blockCode,
        count(*),
        min(latitude) AS min_lat,
        max(latitude) AS max_lat,
        min(longitude) AS min_long,
        max(longitude) AS max_long     
    FROM ads508_t8.census_block
    GROUP BY blockCode
    ORDER BY count(*) DESC
    LIMIT 50000
    
(20406, 6)


Unnamed: 0,blockCode,_col1,min_lat,max_lat,min_long,max_long
0,360819901000008,550,40.49809,40.570452,-73.896935,-73.808291
1,360819901000012,537,40.511658,40.56593,-74.039397,-73.93809
2,360599904000008,354,40.534271,40.579497,-73.767136,-73.706985
3,340259900000002,328,40.486784,40.525226,-74.064724,-73.950754
4,360599904000006,317,40.536533,40.577236,-73.700653,-73.65
5,340259900000001,308,40.484523,40.522965,-73.985578,-73.887437
6,360859901000005,283,40.507136,40.559146,-74.105879,-74.039397
7,360819901000009,273,40.491307,40.550101,-73.947588,-73.884271
8,360470702030001,237,40.579497,40.642814,-73.890603,-73.836784
9,360599902000001,233,40.868945,40.927739,-73.694322,-73.65


In [25]:
# Set S3 path to Parquet data
abt_s3_data_path = f"s3://{bucket}/ABT"

# Execute Statement
_This can take a few minutes.  Please be patient._

In [26]:
cen_tsv_tbl_name = 'census'
abt_tbl_name = 'abt'
drop_abt_tbl_stmnt = f"""DROP TABLE IF EXISTS {database_name}.{abt_tbl_name}"""

# SQL statement to execute
create_abt_tble_stmnt = f"""
CREATE TABLE IF NOT EXISTS {database_name}.{abt_tbl_name}
WITH (
    external_location = '{abt_s3_data_path}'
    )
AS
SELECT
        cen.censustract,
        cen.borough,
        cen.totalpop,
        cen.men,
        cen.women,
        cen.hispanic,
        cen.white,
        cen.black,
        cen.native,
        cen.asian,
        cen.citizen,
        cen.income,
        cen.poverty,
        cen.childpoverty,
        cen.professional,
        cen.service,
        cen.office,
        cen.construction,
        cen.production,
        cen.drive,
        cen.carpool,
        cen.transit,
        cen.walk,
        cen.othertransp,
        cen.workathome,
        cen.meancommute,
        cen.employed,
        cen.privatework,
        cen.publicwork,
        cen.selfemployed,
        cen.familywork,
        cen.unemployment,
        ceb.blockCode
FROM {database_name}.{cen_tsv_tbl_name} AS cen
LEFT JOIN (
    SELECT
        blockCode,
        count(*),
        min(latitude) AS min_lat,
        max(latitude) AS max_lat,
        min(longitude) AS min_long,
        max(longitude) AS max_long     
    FROM {database_name}.{ceb_tsv_tbl_name}
    GROUP BY blockCode
    ORDER BY count(*) DESC
    LIMIT 50000
    ) AS ceb
    ON cen.censustract = ceb.blockCode
"""

print(f'Create table statement:\n{create_abt_tble_stmnt}')

pd.read_sql(drop_abt_tbl_stmnt,
            conn)

pd.read_sql(create_abt_tble_stmnt,
            conn)

Create table statement:

CREATE TABLE IF NOT EXISTS ads508_t8.abt
WITH (
    external_location = 's3://sagemaker-us-east-ads508-sp23-t8/ABT'
    )
AS
SELECT
        cen.censustract,
        cen.borough,
        cen.totalpop,
        cen.men,
        cen.women,
        cen.hispanic,
        cen.white,
        cen.black,
        cen.native,
        cen.asian,
        cen.citizen,
        cen.income,
        cen.poverty,
        cen.childpoverty,
        cen.professional,
        cen.service,
        cen.office,
        cen.construction,
        cen.production,
        cen.drive,
        cen.carpool,
        cen.transit,
        cen.walk,
        cen.othertransp,
        cen.workathome,
        cen.meancommute,
        cen.employed,
        cen.privatework,
        cen.publicwork,
        cen.selfemployed,
        cen.familywork,
        cen.unemployment,
        ceb.blockCode
FROM ads508_t8.census AS cen
LEFT JOIN (
    SELECT
        blockCode,
        count(*),
        min(latitude) AS

Unnamed: 0,rows


# Show the Tables

In [27]:
show_tbl_stmnt = f"SHOW TABLES in {database_name}"

In [28]:
df_tables = pd.read_sql(show_tbl_stmnt,
                        conn)
df_tables.head(17)

Unnamed: 0,tab_name
0,abt
1,census
2,census_block
3,crime
4,crime_pqt
5,evictions
6,grad_outcomes
7,hs_info
8,jobs


In [29]:
if abt_tbl_name in df_tables.values:
    ingest_create_athena_table_parquet_passed = True

In [30]:
%store ingest_create_athena_table_parquet_passed

Stored 'ingest_create_athena_table_parquet_passed' (bool)


# Run Sample Query

In [31]:
cri_borough01 = 'bronx'

cri_select_borough_stmnt02 = f"""
    SELECT * FROM {database_name}.{abt_tbl_name}
    WHERE lower(borough) = '{cri_borough01}'
    LIMIT 17
    """

print(cri_select_borough_stmnt02)

cri_df02_s01 = pd.read_sql(cri_select_borough_stmnt02,
                           conn)
cri_df02_s01.head(17)

In [33]:
if not cri_df02_s01.empty:
    print("[OK]")
else:
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOUR DATA HAS NOT BEEN CONVERTED TO PARQUET. LOOK IN PREVIOUS CELLS TO FIND THE ISSUE.")
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++")

[OK]


## Review the New Athena Table in the Glue Catalog

In [34]:
display(
    HTML(
        f'<b>Review <a target="top" href="https://console.aws.amazon.com/glue/home?region={region}#">AWS Glue Catalog</a></b>'
    )
)

## Store Variables for the Next Notebooks

In [35]:
%store

Stored variables and their in-db values:
balanced_bias_data_jsonlines_s3_uri                   -> 's3://sagemaker-us-east-1-657724983756/bias-detect
balanced_bias_data_s3_uri                             -> 's3://sagemaker-us-east-1-657724983756/bias-detect
bias_data_s3_uri                                      -> 's3://sagemaker-us-east-1-657724983756/bias-detect
ingest_create_athena_db_passed                        -> True
ingest_create_athena_table_parquet_passed             -> True
ingest_create_athena_table_tsv_passed                 -> True
s3_private_path_tsv                                   -> 's3://sagemaker-us-east-1-657724983756/amazon-revi
s3_public_path_tsv                                    -> 's3://amazon-reviews-pds/tsv'
setup_dependencies_passed                             -> True
setup_iam_roles_passed                                -> True
setup_instance_check_passed                           -> True
setup_s3_bucket_passed                                -> True


## Release Resources

In [36]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [37]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}

<IPython.core.display.Javascript object>