# Environment Used - SageMaker Notebooks on M5.2XL instance size
# 2, 3 & 9 - Reading data from Redshift into Pandas DF
# Recommended Library - PyAthena, Pandas

In [3]:
import pandas as pd
from pyathena import connect
from pyathena.pandas_cursor import PandasCursor

cur = connect(s3_staging_dir='s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/athena_temp/',
                region_name='us-west-2',
                cursor_class = PandasCursor).cursor()

> Method 1 to read Athena table into pandas dataframe

In [5]:
%%time
payerDF1 = cur.execute("""SELECT * FROM oasis_summarized.sst_payer_lives_monthly
LIMIT 5000000""").as_pandas()

CPU times: user 27 s, sys: 6.14 s, total: 33.1 s
Wall time: 1min 52s


In [None]:
type(payerDF)

In [None]:
payerDF1.head()

> Method 2 to read Athena table into pandas dataframe

> If you want to use the query results output to S3 directly, you can use PandasCursor method "as_pandas(cursor)".
> This cursor fetches query results faster than the default cursor


In [35]:
%%time
from pyathena.util import as_pandas

cur.execute("""
SELECT * FROM oasis_summarized.sst_payer_lives_monthly
LIMIT 5000000""")

payerDF2 = as_pandas(cursor)

CPU times: user 19.3 s, sys: 380 ms, total: 19.7 s
Wall time: 1min 16s


In [None]:
# To describe the metadata of the table read by cursor
print(cursor.description)

In [None]:
type(payerDF2)

In [None]:
payerDF2.head()

# 4 & 5 Intermediate table creation in Athena

> Note: There is no concept of temporary tables in Athena reason being its server less and temporary tables are supported when we have dedicated memory and session in system

> Method 1: CTAS with no external_location

In [None]:
cursor.execute("""drop table if exists analytics.sst_payer_lives_athena_temp""")

In [None]:
cursor.execute("""
create table analytics.sst_payer_intermediate_tbl2
as
select * FROM oasis_summarized.sst_payer_lives_monthly
LIMIT 50""")

> Method 2: CTAS with external_location

In [None]:
!aws s3 rm \
s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/sst_payer_intermediate_tbl2 \
    --recursive

In [None]:
cursor.execute("drop table analytics.sst_payer_intermediate_tbl2")

In [None]:
cursor.execute("""
create table analytics.sst_payer_intermediate_tbl2
with (external_location='s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/sst_payer_intermediate_tbl2/')
as
select * FROM oasis_summarized.sst_payer_lives_monthly
LIMIT 50""")

> Method 3: CTAS with file format mentioned

In [None]:
cursor.execute("drop table analytics.sst_payer_intermediate_tbl3")

In [None]:
!aws s3 rm \
s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/sst_payer_intermediate_tbl3 \
    --recursive

In [None]:
cursor.execute("""
create table analytics.sst_payer_intermediate_tbl3
with (format='parquet', external_location='s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/sst_payer_intermediate_tbl3/')
as
select * FROM oasis_summarized.sst_payer_lives_monthly
LIMIT 50""")

# 6 - Persist a pandas DF into Athena Table

In [None]:
import pandas as pd
from pyathena import connect
from pyathena.util import to_sql

conn = connect(aws_access_key_id='AKIAV6ACNNR2J7YS3DNK',
                 aws_secret_access_key='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',
                 s3_staging_dir='s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/athena_temp/',
                 region_name='us-west-2')

cursor = conn.cursor()

In [None]:
%%time
payerDF3 = pd.read_sql("""
SELECT * FROM oasis_summarized.sst_payer_lives_monthly 
LIMIT 500""", conn)

In [None]:
! aws s3 rm \
s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/sst_payer_lives_monthly_500records/ \
--recursive

In [None]:
cursor.execute("drop table analytics.sst_payer_lives_monthly_500records")

In [None]:
%%time
to_sql(payerDF3, 
       'sst_payer_lives_monthly_500records', 
       conn, 
       's3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/sst_payer_lives_monthly_500records/',
       schema='analytics', 
       index=False, 
       if_exists='replace')

In [None]:
%%time
from pyathena.util import as_pandas

cursor.execute("""
SELECT * FROM oasis_summarized.sst_payer_lives_monthly
LIMIT 200""")

payerDF4 = as_pandas(cursor)

In [None]:
! aws s3 rm \
s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/sst_payer_lives_monthly_200records/ \
--recursive

In [None]:
cursor.execute("drop table analytics.sst_payer_lives_monthly_200records")

In [None]:
%%time
to_sql(payerDF4, 
       'sst_payer_lives_monthly_200records', 
       conn, 
       's3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/sst_payer_lives_monthly_200records/',
       schema='analytics', 
       index=False, 
       if_exists='replace')

# 7 - CSV files into S3
# 7.1 - Read CSV File from S3 into pandas DF

In [None]:
import s3fs
import pandas as pd

In [None]:
# reading a csv file from S3 into pandas dataframe
csvDF = pd.read_csv('s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/csv_test_rs/intermediate_payer_lives_202003041515.csv')

In [None]:
csvDF.shape

In [None]:
csvDF.head()

# 7.2 - Read CSV file and use the data to join with redshift tables

In [None]:
%%time
to_sql(csvDF, 
       'csv_data_payer', 
       conn, 
       's3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/csv_data_payer/',
       schema='analytics', 
       index=False, 
       if_exists='replace')

In [None]:
%%time
# in this scenario table is created without mentioning and the underlying data is still parquet
cursor.execute(
    """
    create table analytics.payer_joined_with_csv_test as
    SELECT *
    FROM 
    (select * from analytics.sst_payer_lives_monthly_200records) inte
    left join
    (select zip_ecosystem_id as id, zip_ecosystem_name as ecosystem_csv from analytics.csv_data_payer limit 2) cs
    on inte.zip_ecosystem_id = cs.id
    """ 
)

# 8 - Read CSV files from local sagemaker env after uploading it

In [None]:
#Reading CSV files from local into pandas dataframe
localcsvDF = pd.read_csv('intermediate_payer_lives_202003041515.csv')

In [None]:
localcsvDF.shape

# 11 - Persist a pandas DF into a CSV 
# 11.1 - Copy Redshift table into CSV file by converting it to pandas DF in Local Sagemaker Environment

In [None]:
%%time
payerLivesDF = pd.read_sql(
    """
    SELECT * FROM 
    oasis_summarized.sst_payer_lives_monthly
    LIMIT 200
    """, conn)

In [None]:
type(payerLivesDF)

In [None]:
# Copy a redshift table data into a CSV file in local
payerLivesDF.to_csv("sst_payer_lives_monthly_csv1", encoding='utf-8', index=False)

# 11.2 - CSV file in to S3
# Recommended Library (boto3 : https://pypi.org/project/boto3/)

In [None]:
# Copy a redshift table data into a CSV file in S3

from io import StringIO
import boto3
s3 = boto3.client("s3")
csv_buf = StringIO()
payerLivesDF.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket='cmg-oasis-prod-commercial-bucket', Body=csv_buf.getvalue(), Key='Analytics/AthenaQueryResult/sst_payer_lives_moonthly/data2.csv')

# Athena connections with boto3

In [8]:
import boto3,time
import pandas as pd

client = boto3.client('athena')

In [9]:
region='us-west-2'
defaultdb="analytics"
default_output="s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/athena_temp/tables/"
default_write_location="s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/athena_temp/tables"

In [10]:
## execute Athena SQL
def executeQuery(query, database=defaultdb, s3_output=default_output, poll=10):
    athena = boto3.client('athena',region_name=region)
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )

    print('Execution ID: ' + response['QueryExecutionId'])
    queryExecutionId=response['QueryExecutionId']
    state='QUEUED'
    while( state=='RUNNING' or state=='QUEUED'):
       response = athena.get_query_execution(QueryExecutionId=queryExecutionId)
       state=response['QueryExecution']['Status']['State']
       print (state)
       if  state=='RUNNING' or state=='QUEUED':
            time.sleep(poll)
       elif (state=='FAILED'):
             print (response['QueryExecution']['Status']['StateChangeReason'])
        
        
    #print (response)    
    return response

In [11]:
## Read from Athena to a Pandas Dataframe
def read_from_athena(sql):
    response=executeQuery(sql)
    return pd.read_csv(response['QueryExecution']['ResultConfiguration']['OutputLocation'])

In [12]:
## Save Pandas Dataframe to Athena table
def save_to_athena(df, database, tablename):
    pandas_to_athena_types_lookup={ "int64":"bigint", "object":"string", "float64":"double"}
    
    ## save the data
    table_location=default_write_location+tablename
    file_location=table_location+'/'+tablename+".pq"
    df.to_parquet(file_location)
    
    ## add the table to Athena
    data_types=df.dtypes
    columns = ''
    for i,k in enumerate(df.columns): 
        key = str(data_types[i])
        #print (i,key)
        if key in pandas_to_athena_types_lookup.keys():
            columns += (k + " " + pandas_to_athena_types_lookup.get(key)+", ")
        else:
            raise ValueError('Type mapping does not exist for type : '+key) 
    columns=columns[:-2]
    
    sql = F"CREATE EXTERNAL TABLE {database}.{tablename} \
    ( {columns} )  \
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  \
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'  \
    OUTPUTFORMAT  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'  \
    LOCATION '{table_location}'  \
    TBLPROPERTIES ( 'classification'='parquet','typeOfData'='file')"
    
    #print (sql)
    
    response=executeQuery(sql)

In [None]:
%%time
sql="""Select *
from oasis_summarized.sst_payer_lives_monthly
LIMIT 200"""

df=read_from_athena(sql)

In [14]:
df.head()

Unnamed: 0,zip_ecosystem_id,zip_ecosystem_name,mdm_plan_id,mdm_payer_id,mdm_payer_parent_id,gne_payer_parent_owner_name,gne_payer_market_type,gne_parent_owner_name,gne_payer_plan_name,mdm_payer_name,mdm_plan_type,mdm_payer_role,gne_book_of_business,drg_benefit_type,drg_zip,drg_county,drg_state,date_year_month,drg_lives_count,flag_current_month
0,3,OHIO,50003880,50001371,50000278,Payer Plan - BCBS - BCBS NE,Payer Plan,BCBS - BCBS NE,BCBS NE,BLUECROSS BLUESHIELD NEBRASKA,OTHER COMMERCIAL,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,201905,1.110973,N
1,3,OHIO,50004836,50002107,50000973,Payer Plan - BCBS - Premera BC,Payer Plan,BCBS - Premera BC,Premera BC,PREMERA BLUE CROSS,PPO,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,201712,0.362815,N
2,3,OHIO,50005735,50001743,50000791,Payer Plan - Humana,Payer Plan,Humana,Humana,HUMANA,POS,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,201804,0.139627,N
3,3,OHIO,50005829,50001455,50000661,Payer Plan - BCBS - Anthem BCBS,Payer Plan,BCBS - Anthem BCBS,Anthem - BCBS OH,ANTHEM BLUECROSS BLUESHIELD OHIO,PPO,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,202001,68.090332,N
4,3,OHIO,50003549,50001654,50000480,Health System Plan - MI - Henry Ford Health,Health System Plan,MI - Henry Ford Health,Henry Ford Health (MI) - Health Alliance Plan ...,HEALTH ALLIANCE PLAN (MI),OTHER COMMERCIAL,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,201804,4.366031,N


In [15]:
%%time
payerDF1 = pd.read_sql("""SELECT * FROM oasis_summarized.sst_payer_lives_monthly
LIMIT 200""", conn)

CPU times: user 145 ms, sys: 4.37 ms, total: 149 ms
Wall time: 3.47 s


In [16]:
payerDF1.head()

Unnamed: 0,zip_ecosystem_id,zip_ecosystem_name,mdm_plan_id,mdm_payer_id,mdm_payer_parent_id,gne_payer_parent_owner_name,gne_payer_market_type,gne_parent_owner_name,gne_payer_plan_name,mdm_payer_name,mdm_plan_type,mdm_payer_role,gne_book_of_business,drg_benefit_type,drg_zip,drg_county,drg_state,date_year_month,drg_lives_count,flag_current_month
0,8,SEATTLE - ALASKA,50003818,50001431,50000716,Payer Plan - BCBS - BS CA,Payer Plan,BCBS - BS CA,BS CA,BLUESHIELD CALIFORNIA,HMO,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,98391,PIERCE COUNTY,WA,201904,7.160412,N
1,8,SEATTLE - ALASKA,50003842,50001499,50000958,Payer Plan - Centene,Payer Plan,Centene,Centene - Health Net,HEALTH NET (OR),POS,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,98391,PIERCE COUNTY,WA,201807,8.937989,N
2,8,SEATTLE - ALASKA,50006490,50001260,50000671,Payer Plan - UnitedHealth Group,Payer Plan,UnitedHealth Group,UnitedHealth Group,UNITED,OTHER MEDICARE,PRIVATE HEALTH INSURER,MEDICARE_ADVANTAGE,PHARMACY BENEFIT,98391,PIERCE COUNTY,WA,201909,2.086866,N
3,8,SEATTLE - ALASKA,50006490,50001260,50000671,Payer Plan - UnitedHealth Group,Payer Plan,UnitedHealth Group,UnitedHealth Group,UNITED,OTHER MEDICARE,PRIVATE HEALTH INSURER,MEDICARE_ADVANTAGE,PHARMACY BENEFIT,98391,PIERCE COUNTY,WA,201907,2.117001,N
4,8,SEATTLE - ALASKA,50174720,50173500,50171765,Employer Plan,Employer Plan,Employer Plan,Employer Plan - Genworth Financial,GENWORTH FINANCIAL,EMPLOYER,EMPLOYER,COMMERCIAL,PHARMACY BENEFIT,98391,PIERCE COUNTY,WA,202002,0.64861,Y


# Additional tests
# AWS Data Wrangler with Athena

In [22]:
!pip install awswrangler

Collecting awswrangler
[?25l  Downloading https://files.pythonhosted.org/packages/e9/99/b3ba9811e1a5f346da484f2dff40924613ec481df5d463e30bc3fd71096e/awswrangler-0.3.2.tar.gz (61kB)
[K    100% |████████████████████████████████| 71kB 5.0MB/s ta 0:00:011
Collecting tenacity~=6.0.0 (from awswrangler)
  Downloading https://files.pythonhosted.org/packages/bf/d4/8cab4b5239ddb62d950243abff9e88046bb61737ce3eee8694b3d748560c/tenacity-6.0.0-py2.py3-none-any.whl
Collecting pg8000~=1.13.2 (from awswrangler)
  Downloading https://files.pythonhosted.org/packages/16/32/ae895597e43bc968e0e3e63860e9932b851115457face0d06d7f451b71fc/pg8000-1.13.2-py3-none-any.whl
Collecting pymysql~=0.9.3 (from awswrangler)
[?25l  Downloading https://files.pythonhosted.org/packages/ed/39/15045ae46f2a123019aa968dfcba0396c161c20f855f11dea6796bcaae95/PyMySQL-0.9.3-py2.py3-none-any.whl (47kB)
[K    100% |████████████████████████████████| 51kB 34.4MB/s ta 0:00:01
Collecting scramp==1.1.0 (from pg8000~=1.13.2->awswrangler)


In [36]:
%%time
import awswrangler as wr

wrPayerDF = wr.pandas.read_sql_athena(
    sql="""select * from 
           sst_payer_lives_monthly 
           LIMIT 200""",
    database="oasis_summarized",
    s3_output="s3://cmg-oasis-prod-commercial-bucket/Analytics/AthenaQueryResult/athena_temp/",
    ctas_approach=True
)

CPU times: user 242 ms, sys: 7.7 ms, total: 250 ms
Wall time: 5.92 s


In [37]:
wrPayerDF.head()

Unnamed: 0,zip_ecosystem_id,zip_ecosystem_name,mdm_plan_id,mdm_payer_id,mdm_payer_parent_id,gne_payer_parent_owner_name,gne_payer_market_type,gne_parent_owner_name,gne_payer_plan_name,mdm_payer_name,mdm_plan_type,mdm_payer_role,gne_book_of_business,drg_benefit_type,drg_zip,drg_county,drg_state,date_year_month,drg_lives_count,flag_current_month
0,3,OHIO,50003880,50001371,50000278,Payer Plan - BCBS - BCBS NE,Payer Plan,BCBS - BCBS NE,BCBS NE,BLUECROSS BLUESHIELD NEBRASKA,OTHER COMMERCIAL,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,201905,1.1109732052032155,N
1,3,OHIO,50004836,50002107,50000973,Payer Plan - BCBS - Premera BC,Payer Plan,BCBS - Premera BC,Premera BC,PREMERA BLUE CROSS,PPO,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,201712,0.3628152961123411,N
2,3,OHIO,50005735,50001743,50000791,Payer Plan - Humana,Payer Plan,Humana,Humana,HUMANA,POS,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,201804,0.1396267238669316,N
3,3,OHIO,50005829,50001455,50000661,Payer Plan - BCBS - Anthem BCBS,Payer Plan,BCBS - Anthem BCBS,Anthem - BCBS OH,ANTHEM BLUECROSS BLUESHIELD OHIO,PPO,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,202001,68.09033153400202,N
4,3,OHIO,50003549,50001654,50000480,Health System Plan - MI - Henry Ford Health,Health System Plan,MI - Henry Ford Health,Henry Ford Health (MI) - Health Alliance Plan ...,HEALTH ALLIANCE PLAN (MI),OTHER COMMERCIAL,PRIVATE HEALTH INSURER,COMMERCIAL,PHARMACY BENEFIT,47041,DEARBORN COUNTY,IN,201804,4.3660308879233005,N
