## Create Feature Store 

In [None]:
import boto3
import sagemaker
from sagemaker.s3 import parse_s3_url
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker import get_execution_role
import logging
import time

import pandas as pd

role = get_execution_role()
sess = sagemaker.Session()

query_results= 'sagemaker-workshop-end-to-end'

In [None]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [None]:
%store -r s3uri_raw
%store -r bucket
%store -r prefix

In [None]:
out_bucket, out_prefix = parse_s3_url(s3uri_raw)
out_bucket, out_prefix

### Dataset Load

In [None]:
sess.download_data(".", 
                   out_bucket, 
                   key_prefix=out_prefix)

In [None]:
churn_df = pd.read_csv('churn.txt')
churn_df.head(5)

In [None]:
churn_df = churn_df.rename(columns=
                                 {'State': 'state', 
                                  'Account Length': 'acc_len',
                                  'Area Code': 'area_code',
                                  'Phone': 'phone',
                                  "Int'l Plan": 'intl_plan',
                                  'VMail Plan': 'vmail_plan',
                                  'VMail Message': 'vmail_msg',
                                  'Day Mins': 'day_mins',
                                  'Day Calls': 'day_calls',
                                  'Day Charge': 'day_charge',
                                  'Eve Mins': 'eve_mins',
                                  'Eve Calls': 'eve_calls',
                                  'Eve Charge': 'eve_charge',
                                  'Night Mins': 'night_mins',
                                  'Night Calls': 'night_calls',
                                  'Night Charge': 'night_charge',
                                  'Intl Mins': 'intl_mins',
                                  'Intl Calls': 'intl_calls',
                                  'Intl Charge': 'intl_charge',
                                  'CustServ Calls': 'cust_serev_calls',
                                  'Churn?': 'churn',
                                 }
                                )

In [None]:
churn_df['event_time'] = pd.to_datetime('today').strftime("%Y-%m-%dT%H:%M:%SZ")

In [None]:
churn_df.head(3)

In [None]:
churn_df.dtypes

### Creating Feature Store

In [None]:
fs_prefix = 'sagemaker-workshop-e2e-'

In [None]:
churn_feature_group_name = f'{fs_prefix}churn'
%store churn_feature_group_name
churn_feature_group_name

In [None]:
churn_feature_group = FeatureGroup(
    name=churn_feature_group_name, 
    sagemaker_session=sess
)

In [None]:
churn_feature_group.load_feature_definitions(data_frame=churn_df)

In [None]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get('FeatureGroupStatus')
    print(f'Initial status: {status}')
    while status == 'Creating':
        logger.info(f'Waiting for feature group: {feature_group.name} to be created ...')
        time.sleep(5)
        status = feature_group.describe().get('FeatureGroupStatus')
    if status != 'Created':
        raise SystemExit(f'Failed to create feature group {feature_group.name}: {status}')
    logger.info(f'FeatureGroup {feature_group.name} was successfully created.')

In [None]:
churn_feature_group.create(s3_uri=f's3://{bucket}/{prefix}', 
                               record_identifier_name='phone', 
                               event_time_feature_name='event_time', 
                               role_arn=role, 
                               enable_online_store=True)

In [None]:
wait_for_feature_group_creation_complete(churn_feature_group)

#### Ingest Data on FS

In [None]:
churn_feature_group.ingest(
    data_frame=churn_df, max_workers=2, wait=True
)

### Querying Feature Store

This step is to validate that data is available on Athena.  
Re-run query until data is available on Athena.

In [None]:
churn_query = churn_feature_group.athena_query()
churn_table = churn_query.table_name

In [None]:
query_string = f'SELECT * FROM "{churn_table}" ' \
               f'limit 10; '
%store query_string
query_string

In [None]:
output_location = f's3://{bucket}/{query_results}/query_results/'
print(f'Athena query output location: \n{output_location}')

#### Run Athena query and load the output as a Pandas dataframe

In [None]:
churn_query.run(query_string=query_string, output_location=output_location)
churn_query.wait()
churn_query

In [None]:
try:
    joined_df = churn_query.as_dataframe()
except:
    raise Exception('Custom: Data is not available yet')
joined_df.head(3)

#### Store Athena table name

In [None]:
athena_table_name = churn_query.table_name
%store athena_table_name