
The steps in this AWS Glue hands-on tutorial are the following:

Step 1. Enter credentials

Step 2. Create an S3 bucket and load the dataset into the bucket

Step 3. Create a database for the crawled data

Step 4. Create a service role, that will used to access S3 and use Glue features

Step 5. Create the crawler and run the crawl job

Step 6. Write a Pyspark file with the operations to be performed on the dataset

Step 7. Generate a file with ETL job and load it into the S3 bucket

Step 8. Configure the ETL Glue Job and run it manually

#### Enter credentials

In [98]:
import getpass

In [99]:
accessKeyID = getpass.getpass()

 ····················


In [100]:
secretAccessKeyID = getpass.getpass()

 ········································


#### Create an S3 bucket

In [101]:
import boto3

bucket='glue-test-az'

session = boto3.Session(aws_access_key_id=accessKeyID,
                        aws_secret_access_key=secretAccessKeyID)

dev_s3_client = session.client('s3')

response = dev_s3_client.create_bucket(Bucket=bucket)
print(response)

{'ResponseMetadata': {'RequestId': 'J2X72G163065Y2EF', 'HostId': 'tWBRZHoQRdfXvESq5uhkeqtpXinAWG4wXwYXzxthCcHGOdTNSw+JndDrX1bfo6DoURD84spjiog=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'tWBRZHoQRdfXvESq5uhkeqtpXinAWG4wXwYXzxthCcHGOdTNSw+JndDrX1bfo6DoURD84spjiog=', 'x-amz-request-id': 'J2X72G163065Y2EF', 'date': 'Sun, 22 May 2022 08:11:50 GMT', 'location': '/glue-test-az', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'Location': '/glue-test-az'}


#### Upload local file to S3

In [102]:
fileToUpload = 'TSLA.csv'
dev_s3_client.upload_file(f'{fileToUpload}', 
                          bucket,
                          f'raw_files/{fileToUpload}')

Verify the file was uploaded.

In [103]:
response = dev_s3_client.list_objects(Bucket=f'{bucket}')

In [104]:
for key in response["Contents"]:
    print(key['Key'])

raw_files/TSLA.csv


#### Create a database where the crawler can store the results

In [105]:
session = boto3.session.Session(aws_access_key_id=accessKeyID, aws_secret_access_key=secretAccessKeyID)
glue_client = session.client('glue', region_name='us-east-1')

In [106]:
dbName = 'mydbaz1'
glue_client.create_database(DatabaseInput={'Name': dbName})

{'ResponseMetadata': {'RequestId': 'b863a171-8446-467b-84f7-bb4e2d469e78',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sun, 22 May 2022 08:11:54 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'b863a171-8446-467b-84f7-bb4e2d469e78'},
  'RetryAttempts': 0}}

#### Create a service role

In [107]:
import json

In [108]:
trust_policy={
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "glue.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

In [109]:
iam_client = session.client('iam')

In [110]:
iam_response = iam_client.create_role(RoleName = 'AWSGlueServiceRole-AZ1', 
                       AssumeRolePolicyDocument = json.dumps(trust_policy))

print(iam_response)

{'Role': {'Path': '/', 'RoleName': 'AWSGlueServiceRole-AZ1', 'RoleId': 'AROASLARW5M7MZJWRKTMZ', 'Arn': 'arn:aws:iam::161098427198:role/AWSGlueServiceRole-AZ1', 'CreateDate': datetime.datetime(2022, 5, 22, 8, 11, 57, tzinfo=tzutc()), 'AssumeRolePolicyDocument': {'Version': '2012-10-17', 'Statement': [{'Sid': '', 'Effect': 'Allow', 'Principal': {'Service': 'glue.amazonaws.com'}, 'Action': 'sts:AssumeRole'}]}}, 'ResponseMetadata': {'RequestId': '237aaeb9-2917-4b75-9aae-9ddbd0ecea2e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '237aaeb9-2917-4b75-9aae-9ddbd0ecea2e', 'content-type': 'text/xml', 'content-length': '823', 'date': 'Sun, 22 May 2022 08:11:57 GMT'}, 'RetryAttempts': 0}}


#### Attach the policy AWSGlueServiceRole to the new role

In [111]:
iam_client.attach_role_policy(RoleName="AWSGlueServiceRole-AZ1",
                              PolicyArn="arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole")

{'ResponseMetadata': {'RequestId': 'c528e3f9-20ae-4e84-9cc1-c3949a718457',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c528e3f9-20ae-4e84-9cc1-c3949a718457',
   'content-type': 'text/xml',
   'content-length': '212',
   'date': 'Sun, 22 May 2022 08:11:57 GMT'},
  'RetryAttempts': 0}}

In [112]:
iam_client.attach_role_policy(RoleName="AWSGlueServiceRole-AZ1",
                              PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess")

{'ResponseMetadata': {'RequestId': '03048f9f-ac90-4afd-8b91-f8abb1715201',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '03048f9f-ac90-4afd-8b91-f8abb1715201',
   'content-type': 'text/xml',
   'content-length': '212',
   'date': 'Sun, 22 May 2022 08:11:57 GMT'},
  'RetryAttempts': 0}}

#### Create the crawler

The crawler needs to use the role *AWSGlueServiceRole-AZ1* that was created before.

In [115]:
import json

response = glue_client.create_crawler(Name='CrawlerAZ1',
                                      Role='AWSGlueServiceRole-AZ1',
                                      DatabaseName = dbName,
                                      Targets={ 
                                          'S3Targets': [
                                              {
                                                  'Path': f's3://{bucket}',
                                              },
                                          ],
                                      },
                                      SchemaChangePolicy={
                                          'UpdateBehavior': 'UPDATE_IN_DATABASE',
                                          'DeleteBehavior': 'DEPRECATE_IN_DATABASE'
                                      },
                                      RecrawlPolicy={
                                          'RecrawlBehavior': 'CRAWL_EVERYTHING'
                                      },
                                      LineageConfiguration={
                                          'CrawlerLineageSettings': 'DISABLE'
                                      })

print(response)

{'ResponseMetadata': {'RequestId': '372bc07b-62fd-4d4b-8533-7ce90036b168', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Sun, 22 May 2022 08:12:11 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '372bc07b-62fd-4d4b-8533-7ce90036b168'}, 'RetryAttempts': 0}}


#### List crawlers

In [116]:
glue_client.list_crawlers()

{'CrawlerNames': ['CrawlerAZ1'],
 'ResponseMetadata': {'RequestId': 'a6830343-1748-4aa4-86a0-8eeeea95f742',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sun, 22 May 2022 08:12:15 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '31',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'a6830343-1748-4aa4-86a0-8eeeea95f742'},
  'RetryAttempts': 0}}

The crawler I just created is present in the list

If the database doesn't exist it will be created.

#### Run the crawler manually

In [117]:
glue_client.start_crawler(Name = 'CrawlerAZ1')

{'ResponseMetadata': {'RequestId': 'd276b65a-49c8-4404-bdc3-c25f84592569',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sun, 22 May 2022 08:12:40 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'd276b65a-49c8-4404-bdc3-c25f84592569'},
  'RetryAttempts': 0}}

#### Check crawler status

In [119]:
response = glue_client.get_crawler(Name = 'CrawlerAZ1')
response['Crawler']['State']

'RUNNING'

Wait until it says **STOPPING**.

#### See the generated table

The table get the name of the bucket it crawled.

In [49]:
response = glue_client.get_tables(DatabaseName=dbName)

In [50]:
response['TableList'][0]

As can be seen above the new table contains the columns corresponding to excel file columns.

#### Querying the database with AWS Athena

In [51]:
athena_client = session.client('athena')

In [52]:
queryStart = athena_client.start_query_execution(
    QueryString = 'SELECT count(*) FROM glue_test_az',
    QueryExecutionContext = {
        'Database': f'{dbName}'
    }, 
    ResultConfiguration = { 'OutputLocation': 's3://glue-test-az'}
)

queryStart

{'QueryExecutionId': '33d67d26-506f-4740-8e66-d7990516bd12',
 'ResponseMetadata': {'RequestId': '214abe97-732a-4281-aede-2e73b18180c4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1',
   'date': 'Sat, 21 May 2022 14:58:52 GMT',
   'x-amzn-requestid': '214abe97-732a-4281-aede-2e73b18180c4',
   'content-length': '59',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}

In [53]:
queryExecution = athena_client.get_query_execution(QueryExecutionId=queryStart['QueryExecutionId'])

queryExecution

{'QueryExecution': {'QueryExecutionId': '33d67d26-506f-4740-8e66-d7990516bd12',
  'Query': 'SELECT count(*) FROM glue_test_az',
  'StatementType': 'DML',
  'ResultConfiguration': {'OutputLocation': 's3://glue-test-az/33d67d26-506f-4740-8e66-d7990516bd12.csv'},
  'QueryExecutionContext': {'Database': 'mydbaz1'},
  'Status': {'State': 'QUEUED',
   'SubmissionDateTime': datetime.datetime(2022, 5, 21, 16, 58, 53, 54000, tzinfo=tzlocal())},
  'Statistics': {'TotalExecutionTimeInMillis': 203,
   'QueryQueueTimeInMillis': 203},
  'WorkGroup': 'primary',
  'EngineVersion': {'SelectedEngineVersion': 'AUTO',
   'EffectiveEngineVersion': 'Athena engine version 2'}},
 'ResponseMetadata': {'RequestId': '8b3e78cb-2cdc-42c4-be20-9a929ae10c60',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1',
   'date': 'Sat, 21 May 2022 14:58:52 GMT',
   'x-amzn-requestid': '8b3e78cb-2cdc-42c4-be20-9a929ae10c60',
   'content-length': '1012',
   'connection': 'keep-alive'},
  'R

In [96]:
results = athena_client.get_query_results(QueryExecutionId=queryStart['QueryExecutionId'])
print(json.dumps(results, indent=4, sort_keys=True))

ClientError: An error occurred (UnrecognizedClientException) when calling the GetQueryResults operation: The security token included in the request is invalid.

#### The python file for the ETL job

The file contains a Pyspark code that load the csv file in a dataframe, create a new column with a moving averange of the specified dataframe column. Finally save the new dataframe in a new csv file.

In [56]:
%%writefile glueETL.py
from pyspark.sql.window import Window
from pyspark.sql import functions as f
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ETLjob1').getOrCreate()

df = spark.read.option("header", "true").csv("s3://glue-test-az/raw_files/TSLA.csv")

df_new = df.withColumn("Part", f.lit(1)).withColumn("movingAverage", f.avg(df["Adj Close"]).over(Window.partitionBy("Part").orderBy("Date").rowsBetween(-5,0)))

df_new.coalesce(1).write.option("header", "true").csv("s3://glue-test-az/processed/TSLA.csv", mode='overwrite')

Overwriting glueETL.py


#### Load the file into the bucket

In [57]:
fileToUpload = 'glueETL.py'
dev_s3_client.upload_file(f'{fileToUpload}', 
                          bucket,
                          f'scripts/{fileToUpload}')

Verify the file was uploaded.

In [58]:
response = dev_s3_client.list_objects(Bucket=f'{bucket}')

In [59]:
for key in response["Contents"]:
    print(key['Key'])

33d67d26-506f-4740-8e66-d7990516bd12.csv
33d67d26-506f-4740-8e66-d7990516bd12.csv.metadata
raw_files/TSLA.csv
scripts/glueETL.py


#### Creating a Glue ETL job

In [60]:
glue_client = session.client('glue')

In [65]:
import boto3
import json


response = glue_client.create_job(
    Name='AZJob',
    Description='Test',
    Role='arn:aws:iam::471807755212:role/AWSGlueServiceRole-AZ1',
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': f's3://{bucket}/scripts/glueETL.py',
        'PythonVersion': '3'
    },
    DefaultArguments={
      '--TempDir': f's3://{bucket}/temp_dir',
      '--job-bookmark-option': 'job-bookmark-disable'
    },
    MaxRetries=1,
    GlueVersion='3.0',
    NumberOfWorkers=2,
    WorkerType='G.1X'
)

print(json.dumps(response, indent=4, sort_keys=True, default=str))

{
    "Name": "AZJob",
    "ResponseMetadata": {
        "HTTPHeaders": {
            "connection": "keep-alive",
            "content-length": "16",
            "content-type": "application/x-amz-json-1.1",
            "date": "Sat, 21 May 2022 15:07:17 GMT",
            "x-amzn-requestid": "19da2578-bc8d-4111-8166-db431ec40d00"
        },
        "HTTPStatusCode": 200,
        "RequestId": "19da2578-bc8d-4111-8166-db431ec40d00",
        "RetryAttempts": 0
    }
}


As a side note, for the role parameter the complete ARN role name must be used... otherwise the parameter is not accepted, with no error message and the ETL job won't run. 

I spent few hours, to find it out 🤦‍♂️

To find the ARN Role, find it in the response when the role was created. Or use the next query:

In [72]:
response = iam_client.get_role(RoleName = 'AWSGlueServiceRole-AZ1')

response['Role']['Arn']

'arn:aws:iam::471807755212:role/AWSGlueServiceRole-AZ1'

#### Delete a Job

If something didn't work you can always delete the job with:

<code>glue_client.delete_job(JobName='AZJob')</code>

#### Update an ETL job

In [66]:
response = glue_client.update_job(
    JobName='AZJob',
    JobUpdate={
        'Role': 'arn:aws:iam::471807755212:role/AWSGlueServiceRole-AZ1',
        'Description': 'Testing AWS Glue ETL',
        'Command': {
            'Name': 'glueetl',
            'ScriptLocation': f's3://{bucket}/scripts/glueETL.py',
            'PythonVersion': '3'
        },
    }
)
print(response)

{'JobName': 'AZJob', 'ResponseMetadata': {'RequestId': 'd3b7b619-b553-4e52-89e4-a8c856bb0636', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Sat, 21 May 2022 15:10:01 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '19', 'connection': 'keep-alive', 'x-amzn-requestid': 'd3b7b619-b553-4e52-89e4-a8c856bb0636'}, 'RetryAttempts': 0}}


#### Start a Job

In [84]:
start_job_response = glue_client.start_job_run(JobName='AZJob')

In [85]:
start_job_response

{'JobRunId': 'jr_42f838c9e55a3f271f64ac8e6233a07cfbce1fe64134061ed6993a3e89a6fef8',
 'ResponseMetadata': {'RequestId': '6fc1945e-33b0-459f-899e-cf3a568c7b25',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 21 May 2022 15:31:36 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'connection': 'keep-alive',
   'x-amzn-requestid': '6fc1945e-33b0-459f-899e-cf3a568c7b25'},
  'RetryAttempts': 0}}

#### To see the status of the Job

In [95]:
response = glue_client.get_job_run(
    JobName='AZJob',
    RunId=start_job_response['JobRunId']
)

response['JobRun']['JobRunState']

'SUCCEEDED'

Wait until it says **SUCCEEDED**... or, worst case, **FAILED**. In the last case good luck with debugging.

### References

https://hands-on.cloud/working-with-aws-glue-in-python-using-boto3/#h-creating-an-aws-glue-crawler

Boto3 IAM API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html

JSON policy element reference: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Principal_specifying

AWS services that work with IAM: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html 

https://bobbyhadz.com/blog/aws-cli-create-role

AWS Glue - Web API reference: https://docs.aws.amazon.com/glue/latest/webapi/web-api.pdf#WebAPI_Welcome