## AWS Glue Data Catalog & Glue Jobs


In [11]:
# Initialize Working Variables
import boto3
import os
import sys
import json

os.environ['AWS_DEFAULT_REGION'] = 'us-east-2'

if './python' not in sys.path: sys.path.append('./python')
#import glue_functions as glu
import glue_functions_2212 as glu

os.environ['GlueTableInputTemplateUrl'] = 'file://data/glue_table_input_template_parquet.json'

glue_database_name = 'daab-lab-glue-fiscaldata'

s3_bucket = 'daab-dtl-lab-datawork'
s3_folder = 'rmyers07/FISCALDATA/PARQUET'

#zip_archive_name = f's3://{s3_bucket}/{s3_folder}'
#s3://daab-dtl-lab-datawork/rmyers07/data/arrived/

data_file_names = [     # members extracted from ZIP archive
]

glue_table_names = [    # formulated from data_file_names
]


['ERROR: Unable to get file://glue_table_input_template_tsv.json', FileNotFoundError(2, 'No such file or directory')]


In [2]:
# create a Glue Database Using Python SDK
# (shoulda been created by the CloudFormation stack)
glue_client = boto3.client('glue')

try:
    response = glue_client.create_database(
        DatabaseInput={
            'Name': f'{glue_database_name}',
            'Description': 'Created from boto3 script in glue_workbook.ipynb',
            'LocationUri': f's3://{s3_bucket}/{s3_folder}'
        }
    )
    print(f"Created Glue Database '{glue_database_name}'")
except glue_client.exceptions.AlreadyExistsException:
    print(f"Using Existing Glue Database '{glue_database_name}'")

  

Created Glue Database 'daab-lab-glue-fiscaldata'


In [7]:
# create Glue Tables Using Python SDK
import boto3
glue_client = boto3.client("glue")

import sys
if './python' not in sys.path: sys.path.append('./python')
import glue_functions_2212 as glu

glue_table_names = [
  'avg_interest_rates',
  'top_federal',
  'top_state'
]

glue_table_input = json.loads(glu.get_file('file://python/glue_table_input_template_parquet.json'))
for glue_table_name in glue_table_names:
  
  with open( f"metadata/{glue_table_name}.raml" , "r" ) as f:
    raml = f.read()
  column_metadata = glu.import_raml_to_glue( "", "", raml )
  s3_url_table_location = f's3://{s3_bucket}/{s3_folder}/{glue_table_name}'

  glue_table_input['TableInput']['Name'] = glue_table_name
  glue_table_input['TableInput']['StorageDescriptor']['Columns'] = column_metadata
  glue_table_input['TableInput']['StorageDescriptor']['Location'] = s3_url_table_location

  try:
    response = glue_client.create_table(
      DatabaseName = glue_database_name,
      TableInput = glue_table_input['TableInput']  
    )
    status = 'Created'
  except glue_client.exceptions.AlreadyExistsException:
    response = glue_client.update_table (
        DatabaseName = glue_database_name,
        TableInput = glue_table_input['TableInput']
    )
    status = 'Updated'

  print(f"Glue Table '{glue_table_name}' {status} in Database '{glue_database_name}'" )


Glue Table 'avg_interest_rates' Updated in Database 'daab-lab-glue-fiscaldata'
Glue Table 'top_federal' Updated in Database 'daab-lab-glue-fiscaldata'
Glue Table 'top_state' Updated in Database 'daab-lab-glue-fiscaldata'


In [None]:
# list Glue Tables
tables, table_names = glu.list_glue_tables(glue_database_name)
print( json.dumps(tables[1], indent=2, default=str) )

In [None]:
## export a FiscalData CSV Data Dictionary file to a RAML Data Type
import sys
if './python' not in sys.path: sys.path.append('./python')

from dd_functions import export_fddatadict_to_raml
#help(export_fddatadict_to_raml)

dd_files = [
    "metadata/Average Interest Rates on US Treasury Securities Data Dictionary.csv",
    "metadata/Treasury Offset Program Data Dictionary.csv"  # both Federal and State are in this file
]
for csv_input_path in dd_files:
    raml_output_path = csv_input_path.replace('.csv','.raml').replace(' ','_').lower()

    export_fddatadict_to_raml( csv_input_path, raml_output_path )

In [2]:
# Get a Glue Job
import boto3
import json

glue_client = boto3.client('glue')

glue_jobname = "convert_csv_to_parquet"
glue_job = glue_client.get_job(
    JobName = glue_jobname
)

print(json.dumps(glue_job,indent=2,default=str))


{
  "Job": {
    "Name": "convert_csv_to_parquet",
    "Description": "",
    "Role": "arn:aws:iam::{aws_acct}:role/service-role/AWSGlueServiceRole-daab-lab-glue-tutorial",
    "CreatedOn": "2022-12-24 09:19:06.996000-05:00",
    "LastModifiedOn": "2022-12-24 09:19:06.996000-05:00",
    "ExecutionProperty": {
      "MaxConcurrentRuns": 1
    },
    "Command": {
      "Name": "glueetl",
      "ScriptLocation": "s3://aws-glue-assets-{aws_acct}-us-east-2/scripts/convert_csv_to_parquet.py",
      "PythonVersion": "3"
    },
    "DefaultArguments": {
      "--enable-metrics": "true",
      "--enable-spark-ui": "true",
      "--spark-event-logs-path": "s3://aws-glue-assets-{aws_acct}-us-east-2/sparkHistoryLogs/",
      "--enable-job-insights": "true",
      "--enable-glue-datacatalog": "true",
      "--enable-continuous-cloudwatch-log": "true",
      "--job-bookmark-option": "job-bookmark-enable",
      "--job-language": "python",
      "--TempDir": "s3://aws-glue-assets-{aws_acct}-us-east-2

In [6]:
# Get a Glue ETL Job Run
glue_jobrun_id = 'jr_c96836c96968cce949ebbe9d29860b9847aa02c1e37ed24d6af7149d64e49114'

glue_jobruns = glue_client.get_job_run(
    JobName= glue_jobname,
    RunId= glue_jobrun_id,
    PredecessorsIncluded=True|False
)
print(json.dumps(glue_jobruns,indent=2,default=str))

{
  "JobRun": {
    "Id": "jr_c96836c96968cce949ebbe9d29860b9847aa02c1e37ed24d6af7149d64e49114",
    "Attempt": 0,
    "JobName": "convert_csv_to_parquet.py-copy",
    "StartedOn": "2022-12-19 08:38:37.830000-05:00",
    "LastModifiedOn": "2022-12-19 08:40:49.512000-05:00",
    "CompletedOn": "2022-12-19 08:40:49.512000-05:00",
    "JobRunState": "SUCCEEDED",
    "PredecessorRuns": [],
    "AllocatedCapacity": 10,
    "ExecutionTime": 125,
    "Timeout": 120,
    "MaxCapacity": 10.0,
    "WorkerType": "G.1X",
    "NumberOfWorkers": 10,
    "LogGroupName": "/aws-glue/jobs",
    "GlueVersion": "3.0"
  },
  "ResponseMetadata": {
    "RequestId": "32e7a1c2-a106-4d76-b4f3-ae17823b9056",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Mon, 19 Dec 2022 13:43:10 GMT",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "911",
      "connection": "keep-alive",
      "x-amzn-requestid": "32e7a1c2-a106-4d76-b4f3-ae17823b9056"
    },
    "RetryAttempts": 0
  

In [1]:
# CReate or UPdate a Glue Partition
import os
os.environ['AWS_DEFAULT_REGION'] = 'us-east-2'

import sys
if './python' not in sys.path: sys.path.append('./python')
from glue_functions_2212 import crup_glue_partition

### Driver
glue_database_name = 'daab-lab-glue-fiscaldata'

glue_table_names = [
    'avg_interest_rates',
    'top_federal',
    'top_state'
]
partition_folders = [ 
    'D221212.full',
    'D221213.full',
    'D221216.full'
]

for glue_table_name in glue_table_names:

    for partition_folder in partition_folders:

        crup_glue_partition( glue_database_name, glue_table_name, [ partition_folder ] )



Partitions Updated for Glue Table 'avg_interest_rates' in Database 'daab-lab-glue-fiscaldata: ['D221212.full']'
Partitions Updated for Glue Table 'avg_interest_rates' in Database 'daab-lab-glue-fiscaldata: ['D221213.full']'
Partitions Updated for Glue Table 'avg_interest_rates' in Database 'daab-lab-glue-fiscaldata: ['D221216.full']'
Partitions Updated for Glue Table 'top_federal' in Database 'daab-lab-glue-fiscaldata: ['D221212.full']'
Partitions Updated for Glue Table 'top_federal' in Database 'daab-lab-glue-fiscaldata: ['D221213.full']'
Partitions Updated for Glue Table 'top_federal' in Database 'daab-lab-glue-fiscaldata: ['D221216.full']'
Partitions Updated for Glue Table 'top_state' in Database 'daab-lab-glue-fiscaldata: ['D221212.full']'
Partitions Updated for Glue Table 'top_state' in Database 'daab-lab-glue-fiscaldata: ['D221213.full']'
Partitions Updated for Glue Table 'top_state' in Database 'daab-lab-glue-fiscaldata: ['D221216.full']'


In [26]:
# run a Glue ETL job
lambda_resp = json.loads("{\"S3BucketName\": \"daab-dtl-lab-datawork\", \"S3Key\": \"rmyers07/FISCALDATA/ARRIVED/FDMD.FISCALDATA.D221212.FULL.ZIP\", \"S3Url\": \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/FDMD.FISCALDATA.D221212.FULL.ZIP\", \"S3ExtractedUrls\": [\"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_federal/D221212.full/top_federal.D221212.full.csv.gz\", \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_state/D221212.full/top_state.D221212.full.csv.gz\", \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/avg_interest_rates/D221212.full/avg_interest_rates.D221212.full.csv.gz\"]}")

glue_jobname = 'convert_csv_to_parquet.py-copy'

import datetime
execName = datetime.datetime.now().strftime("%y%m%d-%H%M%S-") + lambda_resp['S3Url'].split('/')[-1]

import boto3
glue_client = boto3.client('glue')

table_names = []
partition_folders = []
for ext in lambda_resp['S3ExtractedUrls']:
    filename = ext.split('/')[-1].split('.')
    table_names.append( filename[0] )
    folder = f"{filename[1]}.{filename[2]}"
    if folder not in partition_folders:
        partition_folders.append( f"{filename[1]}.{filename[2]}")

response = glue_client.start_job_run(
    JobName = glue_jobname ,
    #JobRunId = execName ,
    Arguments = {
        '--GlueTableNames' : ','.join(table_names),
        '--PartitionFolders' : ','.join(partition_folders)
    }
)

In [8]:
input = {
  "statusCode": 200,
  "body": {
    "source": "aws.s3",
    "detail": {
      "bucket": {
        "name": "daab-dtl-lab-datawork"
      },
      "object": {
        "key": "rmyers07/FISCALDATA/ARRIVED/FDMD.FISCALDATA.D221212.FULL.ZIP"
      }
    },
    "batch_parms": {
      "GlueDatabaseName": "daab-lab-glue-fiscaldata",
      "S3OutputBucket": "daab-dtl-lab-datawork",
      "S3OutputFolder": "rmyers07/FISCALDATA/PARQUET",
      "StepFnArn": "arn:aws:states:us-east-2:{aws_acct}:stateMachine:daab-lab-Extract_ZIP_to_Parquet",
      "BatchId": "FDMD.FISCALDATA.D221212.FULL.ZIP",
      "ExecName": "221222-205127-FDMD.FISCALDATA.D221212.FULL.ZIP",
      "ZipExtracted": {
        "GlueTableNames": [
          "top_federal",
          "top_state",
          "avg_interest_rates"
        ],
        "PartitionFolders": [
          "D221212.full"
        ],
        "S3ExtractedUrls": [
          "s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_federal/D221212.full/top_federal.D221212.full.csv.gz",
          "s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_state/D221212.full/top_state.D221212.full.csv.gz",
          "s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/avg_interest_rates/D221212.full/avg_interest_rates.D221212.full.csv.gz"
        ]
      }
    }
  },
  "batch_parms_s": "{\"GlueDatabaseName\": \"daab-lab-glue-fiscaldata\", \"S3OutputBucket\": \"daab-dtl-lab-datawork\", \"S3OutputFolder\": \"rmyers07/FISCALDATA/PARQUET\", \"StepFnArn\": \"arn:aws:states:us-east-2:{aws_acct}:stateMachine:daab-lab-Extract_ZIP_to_Parquet\", \"BatchId\": \"FDMD.FISCALDATA.D221212.FULL.ZIP\", \"ExecName\": \"221222-205127-FDMD.FISCALDATA.D221212.FULL.ZIP\", \"ZipExtracted\": {\"GlueTableNames\": [\"top_federal\", \"top_state\", \"avg_interest_rates\"], \"PartitionFolders\": [\"D221212.full\"], \"S3ExtractedUrls\": [\"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_federal/D221212.full/top_federal.D221212.full.csv.gz\", \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_state/D221212.full/top_state.D221212.full.csv.gz\", \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/avg_interest_rates/D221212.full/avg_interest_rates.D221212.full.csv.gz\"]}}"
}

In [13]:
# run a Glue ETL job
#lambda_resp = json.loads("{\"S3BucketName\": \"daab-dtl-lab-datawork\", \"S3Key\": \"rmyers07/FISCALDATA/ARRIVED/FDMD.FISCALDATA.D221212.FULL.ZIP\", \"S3Url\": \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/FDMD.FISCALDATA.D221212.FULL.ZIP\", \"S3ExtractedUrls\": [\"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_federal/D221212.full/top_federal.D221212.full.csv.gz\", \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_state/D221212.full/top_state.D221212.full.csv.gz\", \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/avg_interest_rates/D221212.full/avg_interest_rates.D221212.full.csv.gz\"]}")

#glue_jobname = 'convert_csv_to_parquet.py-copy'
glue_jobname = 'just_get_args'

import boto3
glue_client = boto3.client('glue')

response = glue_client.start_job_run(
    JobName = glue_jobname ,
    Arguments = {
        '--Body' : input["batch_parms_s"]
    }
)

In [1]:
first = {   # using string arg
  "Type": "Task",
  "Resource": "arn:aws:states:::glue:startJobRun",
  "InputPath": "$.batch_parms_s",
  "Parameters": {
    "JobName": "just_get_args",
    "Arguments": {
      "--Body.$": "$"
    }
  },
  "End": true
}

next = {   # converting json to string
  "Type": "Task",
  "Resource": "arn:aws:states:::glue:startJobRun",
  "InputPath": "$.body.batch_parms",
  "Parameters": {
    "JobName": "just_get_args",
    "Arguments": {
      "--Body.$": "States.JsonToString($)"
    }
  },
  "End": true
}

#InputPath = "$.body.batch_parms"

{
    "JobName": "just_get_args",
    "Arguments": {
      "--Body.$": "States.JsonToString($)"
    }
}

NameError: name 'true' is not defined

In [7]:
result = {
  "statusCode": 200,
  "body": "{\"source\": \"aws.s3\", \"detail\": {\"bucket\": {\"name\": \"daab-dtl-lab-datawork\"}, \"object\": {\"key\": \"rmyers07/FISCALDATA/ARRIVED/FDMD.FISCALDATA.D221212.FULL.ZIP\"}}, \"batch_parms\": {\"GlueDatabaseName\": \"daab-lab-glue-fiscaldata\", \"S3OutputBucket\": \"daab-dtl-lab-datawork\", \"S3OutputFolder\": \"rmyers07/FISCALDATA/PARQUET\", \"StepFnArn\": \"arn:aws:states:us-east-2:{aws_acct}:stateMachine:daab-lab-Extract_ZIP_to_Parquet\", \"BatchId\": \"FDMD.FISCALDATA.D221212.FULL.ZIP\", \"ExecName\": \"221222-191619-FDMD.FISCALDATA.D221212.FULL.ZIP\", \"ZipExtracted\": {\"GlueTableNames\": [\"top_federal\", \"top_state\", \"avg_interest_rates\"], \"PartitionFolders\": [\"D221212.full\"], \"S3ExtractedUrls\": [\"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_federal/D221212.full/top_federal.D221212.full.csv.gz\", \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_state/D221212.full/top_state.D221212.full.csv.gz\", \"s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/avg_interest_rates/D221212.full/avg_interest_rates.D221212.full.csv.gz\"]}}}"
}

import json
body = json.loads(result['body'])

print(json.dumps(body, indent=2))

{
  "source": "aws.s3",
  "detail": {
    "bucket": {
      "name": "daab-dtl-lab-datawork"
    },
    "object": {
      "key": "rmyers07/FISCALDATA/ARRIVED/FDMD.FISCALDATA.D221212.FULL.ZIP"
    }
  },
  "batch_parms": {
    "GlueDatabaseName": "daab-lab-glue-fiscaldata",
    "S3OutputBucket": "daab-dtl-lab-datawork",
    "S3OutputFolder": "rmyers07/FISCALDATA/PARQUET",
    "StepFnArn": "arn:aws:states:us-east-2:{aws_acct}:stateMachine:daab-lab-Extract_ZIP_to_Parquet",
    "BatchId": "FDMD.FISCALDATA.D221212.FULL.ZIP",
    "ExecName": "221222-191619-FDMD.FISCALDATA.D221212.FULL.ZIP",
    "ZipExtracted": {
      "GlueTableNames": [
        "top_federal",
        "top_state",
        "avg_interest_rates"
      ],
      "PartitionFolders": [
        "D221212.full"
      ],
      "S3ExtractedUrls": [
        "s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_federal/D221212.full/top_federal.D221212.full.csv.gz",
        "s3://daab-dtl-lab-datawork/rmyers07/FISCALDATA/ARRIVED/top_s