## Dependencies

In [1]:
# Import libraries.
import boto3
import json
import time
import sys
import numpy

# Import glue dependencies.
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, DataFrame

# Add paths so that this notebook can be executed anywhere in the 
# workspace's folder structure.
sys.path.append("/home/glue_user/project_lf/ETL-TDD")
sys.path.append("/home/glue_user/project_lf/ETL-TDD/automation")

# Import batch job's custom dependencies.
from automation.databrew import DataBrew
from automation.glue import Glue
from automation.redshift import Redshift
from etl.paths.components import Bucket
from automation.batch import Batch

# Import jobs.
import stage_claim_into_raw
import stage_policyholder_into_raw
import stage_provider_into_raw
import stage_claim_into_access
import stage_policyholder_into_access
import stage_provider_into_access
import stage_location_into_optimised
import stage_procedure_into_optimised
import stage_policyholder_into_optimised
import stage_provider_into_optimised
import stage_date_into_optimised
import stage_claim_into_optimised

## Initialise

In [2]:
# Initialise spark session with minimal logging.
sc = SparkContext()
sc.setLogLevel("ERROR")
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Wrap low-level AWS clients in a high-level object oriented API that uses
# S3 paths to create and coordinate AWS services.
glue = Glue('ap-southeast-2')
databrew = DataBrew('ap-southeast-2')

# Initialise Redshift helper.
redshift = Redshift()

# Define the bucket to use (TEST or PROD).
env = 'test-project-wm'

# Define the sequential job batches.
# Earlier batches must be completed before later batches.

config = { 
            "spark": spark,
            "env": env
         }

batch1 = Batch([
                  stage_claim_into_raw, 
                  stage_policyholder_into_raw, 
                  stage_provider_into_raw
               ], 
               **config)

batch2 = Batch([
                  stage_claim_into_access, 
                  stage_policyholder_into_access, 
                  stage_provider_into_access
               ], 
               **config)

batch3 = Batch([
                  stage_location_into_optimised, 
                  stage_procedure_into_optimised
               ], 
               **config)

batch4 = Batch([
                  stage_policyholder_into_optimised, 
                  stage_provider_into_optimised, 
                  stage_date_into_optimised
               ], 
               **config)

batch5 = Batch([stage_claim_into_optimised], 
               **config)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


## Delete objects in S3

In [27]:
# delete everything in the raw, access and optimised tiers
import boto3
s3 = boto3.resource('s3')
bucket = s3.Bucket(str(env))

# ********************* RESET RAW *********************
for obj in bucket.objects.filter(Prefix='etl/raw'):
    s3.Object(bucket.name,obj.key).delete()

# **********************RESET ACCESS ******************
for obj in bucket.objects.filter(Prefix='etl/access'):
    s3.Object(bucket.name,obj.key).delete()

# ******************** RESET OPTIMISED ****************
for obj in bucket.objects.filter(Prefix='etl/optimised'):
    s3.Object(bucket.name,obj.key).delete()
    


## Batch 1

In [4]:
outputs = []

batch1.run()
redshift.get_all_redshift_loads(batch1.output)

s3.Bucket(name='test-lf-ap') etl/landing/claim_db/claim/full [s3.ObjectSummary(bucket_name='test-lf-ap', key='etl/landing/claim_db/claim/full/202305211851-claim-full.csv')]


                                                                                

s3.Bucket(name='test-lf-ap') etl/landing/claim_db/policyholder/full [s3.ObjectSummary(bucket_name='test-lf-ap', key='etl/landing/claim_db/policyholder/full/202305221132-policyholder-full.csv')]


                                                                                

s3.Bucket(name='test-lf-ap') etl/landing/claim_db/provider/full [s3.ObjectSummary(bucket_name='test-lf-ap', key='etl/landing/claim_db/provider/full/202305221136-provider-full.csv')]


                                                                                

In [5]:
batch1.paths

array(['s3://test-lf-ap/etl/raw/claim_db/claim/full/202306062111/',
       's3://test-lf-ap/etl/raw/claim_db/policyholder/full/202306062111/',
       's3://test-lf-ap/etl/raw/claim_db/provider/full/202306062111/'],
      dtype=object)

In [6]:
representative_path = batch1.paths[0]

################# Delete outdated crawler #################
glue.delete_crawler(representative_path)

################# Crawl all batch paths. ##################
glue.create_crawler(batch1.paths)
glue.start_crawler(representative_path)

glue.wait_for_crawler(representative_path)

################### Profile each job. #####################
for path in batch1.paths:
     
    databrew.create_dataset(path)
    databrew.create_profile_job(path)        
    databrew.start_job_run(path)

databrew.wait_for_job(representative_path)

############## Print data profile links ###################
for path in batch1.paths:
    databrew.show_data_profile_link(path)
    

Created crawler 'test-raw'.
Waiting for crawler to finish..........


Couldn't create dataset:
                Dataset test-raw-claim already exists.
Couldn't create data profile job:
                The job test-raw-claim already exists.
Couldn't create dataset:
                Dataset test-raw-policyholder already exists.
Couldn't create data profile job:
                The job test-raw-policyholder already exists.
Couldn't create dataset:
                Dataset test-raw-provider already exists.
Couldn't create data profile job:
                The job test-raw-provider already exists.


Waiting for job to finish.........................
You can view the data profile for s3://test-lf-ap/etl/raw/claim_db/claim/full/202306062111/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-raw-claim&tab=profile-overview
You can view the data profile for s3://test-lf-ap/etl/raw/claim_db/policyholder/full/202306062111/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-raw-policyholder&tab=profile-overview
You can view the data profile for s3://test-lf-ap/etl/raw/claim_db/provider/full/202306062111/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-raw-provider&tab=profile-overview


## Batch 2

In [7]:

batch2.run()
redshift.get_all_redshift_loads(batch2.output)

                                                                                

In [8]:
representatitve_path = batch2.paths[-1]

################# Delete outdated crawler #################
glue.delete_crawler(representatitve_path)

################# Crawl all batch paths. ##################
glue.create_crawler(batch2.paths)
glue.start_crawler(representatitve_path)

glue.wait_for_crawler(representatitve_path)

################### Profile each job. #####################
for path in batch2.paths:
     
    response = databrew.create_dataset(path)
    response = databrew.create_profile_job(path)        
    response = databrew.start_job_run(path)

databrew.wait_for_job(representatitve_path)

############## Print data profile links ###################
for path in batch2.paths:
    databrew.show_data_profile_link(path)

Created crawler 'test-access'.
Waiting for crawler to finish............


Couldn't create dataset:
                Dataset test-access-claim already exists.
Couldn't create data profile job:
                The job test-access-claim already exists.
Couldn't create dataset:
                Dataset test-access-policyholder already exists.
Couldn't create data profile job:
                The job test-access-policyholder already exists.
Couldn't create dataset:
                Dataset test-access-provider already exists.
Couldn't create data profile job:
                The job test-access-provider already exists.


Waiting for job to finish....................
You can view the data profile for s3://test-lf-ap/etl/access/claim_db/claim/full/202306062115/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-access-claim&tab=profile-overview
You can view the data profile for s3://test-lf-ap/etl/access/claim_db/policyholder/full/202306062115/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-access-policyholder&tab=profile-overview
You can view the data profile for s3://test-lf-ap/etl/access/claim_db/provider/full/202306062115/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-access-provider&tab=profile-overview


In [9]:
############## Use data profile. #####################

# databrew.get_dq_results(paths[0])

## Batch 3

In [10]:
batch3.run()
redshift.get_all_redshift_loads(batch3.output)


                                                                                

In [11]:
representative_path = batch3.paths[-1]

################# Delete outdated crawler #################
glue.delete_crawler(representative_path)

################# Crawl all batch paths. ##################
glue.create_crawler(batch3.paths)
glue.start_crawler(representative_path)

glue.wait_for_crawler(representative_path)

################### Profile each job. #####################
for path in batch3.paths:
     
    response = databrew.create_dataset(path)
    response = databrew.create_profile_job(path)        
    response = databrew.start_job_run(path)

databrew.wait_for_job(representative_path)

############## Print data profile links ###################
for path in batch3.paths:
    databrew.show_data_profile_link(path)

Created crawler 'test-optimised'.
Waiting for crawler to finish...........


Couldn't create dataset:
                Dataset test-optimised-location-dim already exists.
Couldn't create data profile job:
                The job test-optimised-location-dim already exists.
Couldn't create dataset:
                Dataset test-optimised-procedure-dim already exists.
Couldn't create data profile job:
                The job test-optimised-procedure-dim already exists.


Waiting for job to finish....................
You can view the data profile for s3://test-lf-ap/etl/optimised/location_dim/full/202306062118/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-optimised-location-dim&tab=profile-overview
You can view the data profile for s3://test-lf-ap/etl/optimised/procedure_dim/full/202306062118/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-optimised-procedure-dim&tab=profile-overview


## Batch 4

In [12]:
batch4.run()
redshift.get_all_redshift_loads(batch4.output)


  for column, series in pdf.iteritems():
                                                                                

In [13]:
representative_path = batch4.paths[-1]

################# Delete outdated crawler #################
glue.delete_crawler(representative_path)

################# Crawl all batch paths. ##################
glue.create_crawler(batch4.paths)
glue.start_crawler(representative_path)

glue.wait_for_crawler(representative_path)

################### Profile each job. #####################
for path in batch4.paths:
     
    response = databrew.create_dataset(path)
    response = databrew.create_profile_job(path)        
    response = databrew.start_job_run(path)

databrew.wait_for_job(representative_path)

for path in batch4.paths:
    databrew.show_data_profile_link(path)

Created crawler 'test-optimised'.
Waiting for crawler to finish............


Couldn't create dataset:
                Dataset test-optimised-policyholder-dim already exists.
Couldn't create data profile job:
                The job test-optimised-policyholder-dim already exists.
Couldn't create dataset:
                Dataset test-optimised-provider-dim already exists.
Couldn't create data profile job:
                The job test-optimised-provider-dim already exists.
Couldn't create dataset:
                Dataset test-optimised-date-dim already exists.
Couldn't create data profile job:
                The job test-optimised-date-dim already exists.


Waiting for job to finish...............................
You can view the data profile for s3://test-lf-ap/etl/optimised/policyholder_dim/full/202306062121/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-optimised-policyholder-dim&tab=profile-overview
You can view the data profile for s3://test-lf-ap/etl/optimised/provider_dim/full/202306062121/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-optimised-provider-dim&tab=profile-overview
You can view the data profile for s3://test-lf-ap/etl/optimised/date_dim/full/202306062121/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-optimised-date-dim&tab=profile-overview


## Batch 5

In [14]:
batch5.run()
redshift.get_all_redshift_loads(batch5.output)


                                                                                

In [15]:
representative_path = batch5.paths[-1]

# Crawl each job path as an S3 target.
glue.delete_crawler(representative_path)
glue.create_crawler(batch5.paths)
glue.start_crawler(representative_path)

############## Wait for crawler to finish #####################

glue.wait_for_crawler(representative_path)

############## After crawlers finish. #####################

for path in batch5.paths:
    databrew.create_dataset(path)
    databrew.create_profile_job(path)
    databrew.start_job_run(path)

############## Wait for job to finish #####################

databrew.wait_for_job(representative_path)

for path in batch5.paths:
    databrew.show_data_profile_link(path)

Created crawler 'test-optimised'.
Waiting for crawler to finish..........

Couldn't create dataset:
                Dataset test-optimised-claim-fact already exists.


.


Couldn't create data profile job:
                The job test-optimised-claim-fact already exists.


Waiting for job to finish...........................
You can view the data profile for s3://test-lf-ap/etl/optimised/claim_fact/full/202306062125/ here:
	https://us-east-1.console.aws.amazon.com/databrew/home?region=us-east-1#dataset-details?dataset=test-optimised-claim-fact&tab=profile-overview


## Redshift - Initial Load

In [None]:
!pip install redshift_connector

In [3]:
import redshift_connector
import os
from dotenv import load_dotenv

In [None]:

load_dotenv("/home/glue_user/project_lf/ETL-TDD/.env")

for key in os.environ.keys():
    print(key)

### Get `copy` statements

In [16]:
for statement in redshift.get_copy_commands():
    print(statement, '\n')

COPY location_dim
    FROM 's3://test-lf-ap/etl/optimised/location_dim/full/202306062118/'
    IAM_ROLE 'arn:aws:iam::618572314333:role/service-role/AmazonRedshift-CommandsAccessRole-20230513T114656'
    FORMAT AS PARQUET; 

COPY procedure_dim
    FROM 's3://test-lf-ap/etl/optimised/procedure_dim/full/202306062118/'
    IAM_ROLE 'arn:aws:iam::618572314333:role/service-role/AmazonRedshift-CommandsAccessRole-20230513T114656'
    FORMAT AS PARQUET; 

COPY policyholder_dim
    FROM 's3://test-lf-ap/etl/optimised/policyholder_dim/full/202306062121/'
    IAM_ROLE 'arn:aws:iam::618572314333:role/service-role/AmazonRedshift-CommandsAccessRole-20230513T114656'
    FORMAT AS PARQUET; 

COPY provider_dim
    FROM 's3://test-lf-ap/etl/optimised/provider_dim/full/202306062121/'
    IAM_ROLE 'arn:aws:iam::618572314333:role/service-role/AmazonRedshift-CommandsAccessRole-20230513T114656'
    FORMAT AS PARQUET; 

COPY date_dim
    FROM 's3://test-lf-ap/etl/optimised/date_dim/full/202306062121/'
    IAM

In [25]:
for df in [df for df,_ in redshift.loads]:
    df.show(1, truncate=True)
    df.printSchema()
    print(df.schema)

                                                                                

+------------+--------------------+--------------------+--------+----------+----------+-------------------+-------------------+------------------+--------------------+--------------------+
|location_key|             address|              street|postcode|    suburb|track_hash|    record_start_ts|      record_end_ts|record_active_flag|       record_upd_ts|    record_insert_ts|
+------------+--------------------+--------------------+--------+----------+----------+-------------------+-------------------+------------------+--------------------+--------------------+
|           0|Level 2 85 Rhonda...|Level 2 85 Rhonda...|    2987| Jimmyfurt|      null|1970-01-01 00:00:00|2999-12-31 00:00:00|                 1|2023-06-06 11:36:...|2023-06-06 11:36:...|
+------------+--------------------+--------------------+--------+----------+----------+-------------------+-------------------+------------------+--------------------+--------------------+
only showing top 1 row

root
 |-- location_key: long (n

                                                                                

+--------+------------+----------------+-------------+--------+--------------------+-------------+-------------+------+-------------+
|claim_id|provider_key|policyholder_key|procedure_key|date_key|total_procedure_cost|medibank_pays|medicare_pays|excess|out_of_pocket|
+--------+------------+----------------+-------------+--------+--------------------+-------------+-------------+------+-------------+
|  154100|           1|               7|            4|20220717|             2295.05|      1377.03|       459.01|229.51|        229.5|
+--------+------------+----------------+-------------+--------+--------------------+-------------+-------------+------+-------------+
only showing top 1 row

root
 |-- claim_id: integer (nullable = true)
 |-- provider_key: long (nullable = true)
 |-- policyholder_key: long (nullable = true)
 |-- procedure_key: long (nullable = true)
 |-- date_key: integer (nullable = true)
 |-- total_procedure_cost: float (nullable = true)
 |-- medibank_pays: float (nullable =