NOTE: The SageMaker Notebook must have permissions to the Glue Data Catalog and Athena service to perform these functions

In [None]:
print('installing the PyAthena Drivers to perform SQL queries natively')
print('alternatives include using the boto3 libraries or other Athena Data APIs')
!pip install --upgrade pip > /dev/null
!pip install PyAthena > /dev/null

In [None]:
import sagemaker
sagemaker_session = sagemaker.Session()
athena_data_bucket = sagemaker_session.default_bucket()
print('using the athena data bucket:', athena_data_bucket)
print('running in region: ', sagemaker_session.boto_region_name)

In [None]:
from pyathena import connect
import pandas as pd

sagemaker_session = sagemaker.Session()

conn = connect(s3_staging_dir="s3://" + athena_data_bucket,
               region_name=sagemaker_session.boto_region_name)


NOTE: A better way of creating these tables is either through the Glue/LakeFormation APIs or through the crawlers.  We are doing it this way to make this self contained and to show the option.

In [None]:
cursor = conn.cursor()
cursor.execute("CREATE EXTERNAL TABLE IF NOT EXISTS default.flight_delays_pq ( " +
"    yr INT, " +
"    quarter INT, " +
"    month INT, " +
"    dayofmonth INT, " +
"    dayofweek INT, " +
"    flightdate STRING, " +
"    uniquecarrier STRING, " +
"    airlineid INT, " +
"    carrier STRING, " +
"    tailnum STRING, " +
"    flightnum STRING, " +
"    originairportid INT, " +
"    originairportseqid INT, " +
"    origincitymarketid INT, " +
"    origin STRING, " +
"    origincityname STRING, " +
"    originstate STRING, " +
"    originstatefips STRING, " +
"    originstatename STRING, " +
"    originwac INT, " +
"    destairportid INT, " +
"    destairportseqid INT, " +
"    destcitymarketid INT, " +
"    dest STRING, " +
"    destcityname STRING, " +
"    deststate STRING, " +
"    deststatefips STRING, " +
"    deststatename STRING, " +
"    destwac INT, " +
"    crsdeptime STRING, " +
"    deptime STRING, " +
"    depdelay INT, " +
"    depdelayminutes INT, " +
"    depdel15 INT, " +
"    departuredelaygroups INT, " +
"    deptimeblk STRING, " +
"    taxiout INT, " +
"    wheelsoff STRING, " +
"    wheelson STRING, " +
"    taxiin INT, " +
"    crsarrtime INT, " +
"    arrtime STRING, " +
"    arrdelay INT, " +
"    arrdelayminutes INT, " +
"    arrdel15 INT, " +
"    arrivaldelaygroups INT, " +
"    arrtimeblk STRING, " +
"    cancelled INT, " +
"    cancellationcode STRING, " +
"    diverted INT, " +
"    crselapsedtime INT, " +
"    actualelapsedtime INT, " +
"    airtime INT, " +
"    flights INT, " +
"    distance INT, " +
"    distancegroup INT, " +
"    carrierdelay INT, " +
"    weatherdelay INT, " +
"    nasdelay INT, " +
"    securitydelay INT, " +
"    lateaircraftdelay INT, " +
"    firstdeptime STRING, " +
"    totaladdgtime INT, " +
"    longestaddgtime INT, " +
"    divairportlandings INT, " +
"    divreacheddest INT, " +
"    divactualelapsedtime INT, " +
"    divarrdelay INT, " +
"    divdistance INT, " +
"    div1airport STRING, " +
"    div1airportid INT, " +
"    div1airportseqid INT, " +
"    div1wheelson STRING, " +
"    div1totalgtime INT, " +
"    div1longestgtime INT, " +
"    div1wheelsoff STRING, " +
"    div1tailnum STRING, " +
"    div2airport STRING, " +
"    div2airportid INT, " +
"    div2airportseqid INT, " +
"    div2wheelson STRING, " +
"    div2totalgtime INT, " +
"    div2longestgtime INT, " +
"    div2wheelsoff STRING, " +
"    div2tailnum STRING, " +
"    div3airport STRING, " +
"    div3airportid INT, " +
"    div3airportseqid INT, " +
"    div3wheelson STRING, " +
"    div3totalgtime INT, " +
"    div3longestgtime INT, " +
"    div3wheelsoff STRING, " +
"    div3tailnum STRING, " +
"    div4airport STRING, " +
"    div4airportid INT, " +
"    div4airportseqid INT, " +
"    div4wheelson STRING, " +
"    div4totalgtime INT, " +
"    div4longestgtime INT, " +
"    div4wheelsoff STRING, " +
"    div4tailnum STRING, " +
"    div5airport STRING, " +
"    div5airportid INT, " +
"    div5airportseqid INT, " +
"    div5wheelson STRING, " +
"    div5totalgtime INT, " +
"    div5longestgtime INT, " +
"    div5wheelsoff STRING, " +
"    div5tailnum STRING " +
") " +
"PARTITIONED BY (year STRING) " +
"STORED AS PARQUET " +
"LOCATION 's3://athena-examples-" + sagemaker_session.boto_region_name + "/flight/parquet/' " +
"tblproperties ('parquet.compress'='SNAPPY')")
print(cursor.fetchall())

In [None]:
cursor = conn.cursor()
cursor.execute("MSCK REPAIR TABLE default.flight_delays_pq")
print(cursor.fetchall())

In [None]:
df = pd.read_sql(" SELECT origin, dest, count(*) as delays " +
" FROM flight_delays_pq " +
" GROUP BY origin, dest ", conn)
df

In [None]:
import numpy as np; np.random.seed(0)
import seaborn as sns; sns.set()
import matplotlib.pyplot as plt


sns.set(rc={'figure.figsize':(11.7,8.27)})

result = df.pivot(index='origin', columns='dest', values='delays')
with sns.axes_style("white"):
    sns.heatmap(result, cmap="Reds")
plt.show()