In [None]:
import pickle
import boto3
import botocore
from botocore.exceptions import ClientError
import os, time, json, time
from datetime import datetime

from misc import load_from_yaml, save_to_yaml
import s3, iam, lftn, glue, lambdafn as lfn, sns, eventbridge as event
from lambdafn import build_lambda_package, print_latest_lambda_logs

from dotenv import load_dotenv
load_dotenv(os.getenv("MY_AWS_DIR", "") + "/.env")

from mylogger import CustomLogger
logger = CustomLogger()

In [None]:
ACCOUNT_ID        = os.environ['AWS_ACCOUNT_ID_ROOT']
REGION            = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
VPC_ID            = os.environ['AWS_DEFAULT_VPC']
SECURITY_GROUP_ID = os.environ['AWS_DEFAULT_SG_ID']
SUBNET_IDS        = SUBNET_IDS = os.environ["AWS_DEFAULT_SUBNET_IDS"].split(":")
SUBNET_ID         = SUBNET_IDS[0]

logger.info(f"VPC_ID: {ACCOUNT_ID}")

In [None]:
sts_client           = boto3.client('sts')
rds_client           = boto3.client('rds')
iam_client           = boto3.client('iam')
s3_client            = boto3.client('s3')
glue_client          = boto3.client('glue')
lakeformation_client = boto3.client('lakeformation')
ec2_client           = boto3.client('ec2', region_name=REGION)
ec2_resource         = boto3.resource('ec2', region_name=REGION)
dynamodb_client      = boto3.client('dynamodb')
events_client        = boto3.client('events')
lambda_client        = boto3.client('lambda')
sns_client           = boto3.client('sns')
cw_logs_client       = boto3.client('logs')
logs_client = boto3.client("logs")

# Create a CloudWatch client for Logs
logs_client = boto3.client('logs')

### [Knowledge Amplifier: Build Serverless DataLake using Glue , Lambda , Cloudwatch](https://www.youtube.com/watch?v=3f7UY5R9Q9U&t=0s)

<div style="text-align:center" ><b style="color:green">WORKING AS EXPECTED</b></div>

<div style="text-align:center" ><img src="./design_diagram.png" width="600" height="300" /></div>

- Demonstration of adding external libraries to Glue Job
- Demonstrate how Incremental Data Processing (Job Bookmark) works in Glue.

##### Load Pickled Variables

In [None]:
with open("event_based_etl_ipynb.pkl", "rb") as f:
    GLUE_ROLE_NAME, LFN_ROLE_NAME, GLUE_ROLE_ARN, LFN_ROLE_ARN, S3_BUCKET_DATALAKE, S3_BUCKET_GLUE_ASSETS, GLUE_CATALOG_DB, DATALAKE_LOCATION_URI, TOPIC_NAME, JOB_COMPLETE_TOPIC_ARN, LFN_CRAWLER_NAME, LFN_CRAWLER_ARN, S3_CUSTOMER_CRAWLER_NAME, S3_CUSTOMER_CRAWLER_TARGET, S3_SALES_CRAWLER, S3_SALES_CRAWLER_TARGET, LFN_JOB_TRIGGERER_NAME, LFN_JOB_TRIGGERER_ARN, TEM_DIR, SPARK_EVENT_LOG_PATH, TABLE_NAME, TARGET, JOB_NAME, JOB_SCRIPT_LOCATION, ARGS, S3_CUSTOMER_CRAWLER_RULE_NAME, S3_CUSTOMER_CRAWLER_RULE_ARN, JOB_COMPLETE_RULE_NAME, JOB_COMPLETE_RULE_ARN = pickle.load(f)

print(GLUE_ROLE_NAME, LFN_ROLE_NAME, GLUE_ROLE_ARN, LFN_ROLE_ARN, S3_BUCKET_DATALAKE, S3_BUCKET_GLUE_ASSETS, GLUE_CATALOG_DB, DATALAKE_LOCATION_URI, TOPIC_NAME, JOB_COMPLETE_TOPIC_ARN, LFN_CRAWLER_NAME, LFN_CRAWLER_ARN, S3_CUSTOMER_CRAWLER_NAME, S3_CUSTOMER_CRAWLER_TARGET, S3_SALES_CRAWLER, S3_SALES_CRAWLER_TARGET, LFN_JOB_TRIGGERER_NAME, LFN_JOB_TRIGGERER_ARN, TEM_DIR, SPARK_EVENT_LOG_PATH, TABLE_NAME, TARGET, JOB_NAME, JOB_SCRIPT_LOCATION, ARGS, S3_CUSTOMER_CRAWLER_RULE_NAME, S3_CUSTOMER_CRAWLER_RULE_ARN, JOB_COMPLETE_RULE_NAME, JOB_COMPLETE_RULE_ARN, sep="\n")

#### Create IAM Role

- Create aws glue role by the name of `glue_role_name`.
- Assign Power User Access Policy (`PowerUserAccess`) to the role.

In [None]:
GLUE_ROLE_NAME = 'glue-pipeline-role'
LFN_ROLE_NAME = 'lfn-pipeline-role'

In [None]:
policy_arns = [
    "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole",
    "arn:aws:iam::aws:policy/CloudWatchFullAccess",
    "arn:aws:iam::aws:policy/AmazonS3FullAccess",
    "arn:aws:iam::aws:policy/AdministratorAccess",
    "arn:aws:iam::aws:policy/PowerUserAccess"
]

##### Glue Role

In [None]:
assume_role_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "glue.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
GLUE_ROLE_ARN = iam_client.create_role(
    RoleName=GLUE_ROLE_NAME,
    AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
    Description="Glue Service Role"
)['Role']['Arn']

In [None]:
# Attach AWS managed policy with the role
[iam_client.attach_role_policy(RoleName=GLUE_ROLE_NAME, PolicyArn=parn) for parn in policy_arns]

In [None]:
# glue_put_event_policy = {
#   "Version": "2012-10-17",
#   "Statement": [
#     {
#       "Effect": "Allow",
#       "Action": [
#         "events:PutEvents"
#       ],
#       "Resource": "*"
#     }
#   ]
# }

# # Attach the inline policy to the IAM role
# iam_client.put_role_policy(
#     RoleName=GLUE_ROLE_NAME,
#     PolicyName="glue_put_event",
#     PolicyDocument=json.dumps(glue_put_event_policy)
# )

##### Lambda Role

In [None]:
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

# Create the IAM role with the assume role policy document
LFN_ROLE_ARN = iam_client.create_role(
    RoleName=LFN_ROLE_NAME,
    AssumeRolePolicyDocument=json.dumps(assume_role_policy_document)
)['Role']['Arn']

In [None]:
[iam_client.attach_role_policy(RoleName=LFN_ROLE_NAME, PolicyArn=parn) for parn in policy_arns]

In [None]:

# #### Create IAM Role Policy (, S3, Logs Permissions)
# policy_document = {
#     "Version": "2012-10-17",
#     "Statement": [
#         {   # StartCrawler permission
#             "Effect": "Allow",
#             "Action": [
#                 "glue:StartCrawler"
#             ],
#             "Resource": f"arn:aws:glue:region:account-id:crawler/*"
#             # "Resource": f"arn:aws:glue:region:account-id:crawler/{crawler-name}"
#         },
#         {
#             "Effect": "Allow",
#             "Action": [
#                 "glue:StartJobRun",
#                 "glue:GetJob",
#                 "glue:GetJobRun"
#             ],
#             "Resource": f"arn:aws:glue:region:account-id:job/*"
#         },
#         {   # s3 full access
#             "Effect": "Allow",
#             "Action": [
#                 "s3:*",
#                 "s3-object-lambda:*"
#             ],
#             "Resource": "*"
#         },
#         {
#             "Effect": "Allow",
#             "Action": [
#                 "logs:*"
#             ],
#             "Resource": "*"
#         }
#     ]
# }

# policy_name = "s3_logs_policies"

# # Attach the inline policy to the IAM role
# iam_client.put_role_policy(
#     RoleName=LFN_ROLE_NAME,
#     PolicyName=policy_name,
#     PolicyDocument=json.dumps(policy_document)
# )
# print(f"Policy {policy_name} attached to role {LFN_ROLE_NAME}")


In [None]:
logger.info(LFN_ROLE_ARN)

#### Create S3 Bucket and Folders

In [None]:
S3_BUCKET_DATALAKE = "htech-datalake-bkt"
S3_BUCKET_GLUE_ASSETS = "htech-glue-assets-bkt"

In [None]:
acl = 'public-read'                         # Set the ACL (e.g., 'private', 'public-read')
enable_versioning = False                   # Enable versioning
enable_encryption = False                   # Enable server-side encryption

prefixes1 = [
    "raw/customers/",
    "raw/sales/",
    "processed/customers/",
    "processed/sales/",
    "misc/"
]
prefixes2 = [
    "temporary/",
    "sparkHistoryLogs/",
    "libraries/",
    "misc/"
]

s3.create_s3_bucket(S3_BUCKET_DATALAKE, prefixes1)
s3.create_s3_bucket(S3_BUCKET_GLUE_ASSETS, prefixes2)

#### Create Glue Catalog Database

In [None]:
GLUE_CATALOG_DB = 'htech-glue-catalog-db'

In [None]:
## Example usage
DATALAKE_LOCATION_URI = f"s3://{S3_BUCKET_DATALAKE}"

create_database_response = glue_client.create_database(
    CatalogId=ACCOUNT_ID,
    DatabaseInput={
        "Name": GLUE_CATALOG_DB,
        "Description": "",
        "LocationUri": DATALAKE_LOCATION_URI,
    },
)


In [None]:
logger.info(create_database_response)

In [None]:
# response = glue_client.get_database(CatalogId=ACCOUNT_ID, Name=CATALOG_DB_NAME)
# print(json.dumps(response, indent=4, default=str))

- Grant `CREATE_TABLE` permission to `glue_role_name` on data catalog DB.

In [None]:
# Arn for glue_role_name
lf_principle = GLUE_ROLE_ARN

# Grant 'CREATE_TABLE' and ''DROP LF Permission to `glue_role_name` Role
response = lakeformation_client.grant_permissions(
    Principal={"DataLakePrincipalIdentifier": lf_principle},
    Resource={"Database": {"Name": GLUE_CATALOG_DB}},
    Permissions=["CREATE_TABLE", "DROP"],
    PermissionsWithGrantOption=[],
)

In [None]:
# lftn.grant_table_level_permissions(GLUE_ROLE_ARN, CATALOG_DB_NAME, 'employees', ['DROP'])

#### SNS

In [None]:
TOPIC_NAME = "htech-sns-topic"
JOB_COMPLETE_TOPIC_ARN = "arn:aws:sns:us-east-1:530976901147:htech-sns-topic"

In [None]:
JOB_COMPLETE_TOPIC_ARN = sns_client.create_topic(Name=TOPIC_NAME)['TopicArn']

protocol="email"
endpoint="bbcredcap3@gmail.com"

sns_client.subscribe(
    TopicArn=JOB_COMPLETE_TOPIC_ARN,
    Protocol=protocol,
    Endpoint=endpoint
)

In [None]:
# Get attributes
response = sns_client.get_topic_attributes(TopicArn=JOB_COMPLETE_TOPIC_ARN)

# # Print attributes
# for key, value in response["Attributes"].items():
#     logger.info(f"{key}: {value}")

logger.info(response["Attributes"])

#### **Lambda 1**:

In [None]:
lfn_scripts = ["lambdas/lfn1/crawler_triggerer.py"]
build_lambda_package(lfn_scripts, "lambdas/lfn1")

In [None]:
LFN_CRAWLER_NAME = "crawler_triggerer"
zip_file = "./lambdas/lfn1/package.zip"  # Change this to the actual zip file path

# Create Lambda function
with open(zip_file, 'rb') as f:
    zipped_code = f.read()

LFN_CRAWLER_ARN = lambda_client.create_function(
    FunctionName=LFN_CRAWLER_NAME,
    Runtime='python3.9',
    Role=LFN_ROLE_ARN,
    Handler='crawler_triggerer.lambda_handler',
    Code={'ZipFile': zipped_code},
    Timeout=120,
    Environment={
        'Variables': {
            'foo': 'BAR'
        }
    }
)['FunctionArn']

In [None]:
# lfn.update_lambda_function_code(LFN_CRAWLER_NAME, zip_file)

In [None]:
response = lambda_client.add_permission(
    FunctionName=LFN_CRAWLER_NAME,  # Replace with your Lambda function name
    StatementId='s3-invoke-permission',  # An identifier for this statement, unique for each permission you add
    Action='lambda:InvokeFunction',
    Principal='s3.amazonaws.com',
    SourceArn=f"arn:aws:s3:::{S3_BUCKET_DATALAKE}",
    SourceAccount=ACCOUNT_ID
)

In [None]:
# Add S3 trigger to the Lambda function
response = s3_client.put_bucket_notification_configuration(
    Bucket=S3_BUCKET_DATALAKE,
    NotificationConfiguration={
        'LambdaFunctionConfigurations': [
            {
                'LambdaFunctionArn': LFN_CRAWLER_ARN,
                'Events': [
                    's3:ObjectCreated:*'  # Trigger Lambda on object creation
                ],
                'Filter': {
                    'Key': {
                        'FilterRules': [
                            {
                                'Name': 'prefix',
                                'Value': 'raw/customers/'  # Trigger only on this prefix
                            },
                        ]
                    }
                }
            }
        ]
    }
)

logger.info("S3 bucket notification configuration updated successfully.")

When using Amazon S3 to trigger a Lambda function, you cannot directly attach a custom payload to the Lambda event â€” S3 always sends a predefined event structure that includes information about the object created, such as:

```json
{
  "Records": [
    {
      "eventSource": "aws:s3",
      "eventName": "ObjectCreated:Put",
      "s3": {
        "bucket": {
          "name": "your-bucket",
        },
        "object": {
          "key": "some/path/file.csv",
          "size": 12345
        }
      }
    }
  ]
}
```

#### **Crawler 1**: Catalog Data from `raw/customers` as a table by the name `raw_customers`

In [None]:
S3_CUSTOMER_CRAWLER_NAME = "htech-s3-customer-crawler"
S3_CUSTOMER_CRAWLER_TARGET = {
    'S3Targets': [{'Path': f"s3://{S3_BUCKET_DATALAKE}/{'raw/customers'}"},]
}
glue.create_glue_crawler(
    S3_CUSTOMER_CRAWLER_NAME,
    GLUE_ROLE_ARN,
    GLUE_CATALOG_DB,
    S3_CUSTOMER_CRAWLER_TARGET,
    table_prefix="raw_",
)

In [None]:
# glue_client.start_crawler(Name=S3_CUSTOMER_CRAWLER_NAME)

#### **Lambda-2**: It Triggers the Glue Job when executed

In [None]:
lfn_scripts = ["lambdas/lfn2/glue_job_triggerer.py"]
build_lambda_package(lfn_scripts, "lambdas/lfn2/")

In [None]:
LFN_JOB_TRIGGERER_NAME = "glue_job_triggerer"
zip_file = "./lambdas/lfn2/package.zip"  # Change this to the actual zip file path

# Create Lambda function
with open(zip_file, 'rb') as f:
    zipped_code = f.read()

In [None]:
LFN_JOB_TRIGGERER_ARN = lambda_client.create_function(
    FunctionName=LFN_JOB_TRIGGERER_NAME,
    Runtime='python3.9',
    Role=LFN_ROLE_ARN,
    Handler='glue_job_triggerer.lambda_handler',
    Code={'ZipFile': zipped_code},
    Timeout=120,
    Environment={
        'Variables': {
            'foo': 'BAR'
        }
    }
)['FunctionArn']

In [None]:
# lfn.update_lambda_function_code()

#### **Job**: Transforme data from `raw/customers`  and load into `processed/customers`

In [None]:
file_name1 = './glue_scripts/jb1_s3csv_s3parquet.py'       # The local file you want to upload
object_name1 = "glues_scripts/jb1_s3csv_s3parquet.py"     # The name to save the file as in the S3 bucket
s3.upload_file_to_s3(S3_BUCKET_GLUE_ASSETS, file_name1, object_name1)

In [None]:
JOB_NAME = 'jb1_s3csv_s3parquet'
JOB_SCRIPT_LOCATION = f"s3://{S3_BUCKET_GLUE_ASSETS}/{object_name1}"
TABLE_NAME = "raw_customers"
TARGET = f"s3://{S3_BUCKET_DATALAKE}/processed/customers/"
TEM_DIR = f"s3://{S3_BUCKET_GLUE_ASSETS}/temporary/"
SPARK_EVENT_LOG_PATH = f"s3://{S3_BUCKET_GLUE_ASSETS}/sparkHistoryLogs/"
ARGS = {
    "--TempDir": f"s3://{S3_BUCKET_GLUE_ASSETS}/temporary/",
    "--library-path": f"s3://{S3_BUCKET_GLUE_ASSETS}/libraries",  # Path to external libraries (JARs, compiled libraries, JDBC drivers)
    "--extra-py-files": f"s3://{S3_BUCKET_GLUE_ASSETS}/libraries/package.zip",  # Python modules/packages (Python .zip)
    "--spark-event-logs-path": SPARK_EVENT_LOG_PATH,
    "--job-bookmark-option": "job-bookmark-enable",
    "--job-language": "python",
}

In [None]:
# glue.create_glue_job(JOB_NAME, JOB_SCRIPT_LOCATION, GLUE_ROLE_ARN, TEM_DIR, SPARK_EVENT_LOG_PATH)
glue.create_glue_job_v2(JOB_NAME, JOB_SCRIPT_LOCATION, GLUE_ROLE_ARN, ARGS)

-    Package up and upload the **external libraries**

In [None]:
logger_module = os.getenv("UTILS") + "/mylogger.py"
build_lambda_package(
    [logger_module], 
    "./external_py_lib", 
    py_libs=['coloredlogs', 'termcolor']
)

In [None]:
s3.upload_folder_to_s3(S3_BUCKET_GLUE_ASSETS,"./external_py_lib", "libraries")

##### Parametarization of the Job [`SUCCESS`]

In [None]:
# s3_client.delete_object(Bucket=S3_BUCKET_GLUE_ASSETS, Key="libraries/package.zip")

In [None]:
ARGS.update(
    {
        "--JOB_NAME": JOB_NAME,
        "--S3_BUCKET_DATALAKE": S3_BUCKET_DATALAKE,
        "--CATALOG_DB_NAME": GLUE_CATALOG_DB,
        "--TABLE_NAME": TABLE_NAME,
        "--TARGET": TARGET,
    }
)

In [None]:
# glue.start_glue_job(JOB_NAME, arguments=ARGS)

In [None]:
glue_client.start_job_run(JobName=JOB_NAME, Arguments=ARGS)

In [None]:
# ! aws logs tail --follow /aws-glue/jobs --filter-pattern "SUCCEEDED"

#### **Event Rule 1**: It matches "Glue Crawler State Change" pattern with target (LFN_JOB_TRIGGERER_NAME)

-   `Event Source`: AWS Glue Crawler (S3_RAW_CRAWLER_NAME)
-   `Event Type`: "Glue Crawler State Change" (crawler_rule_event_pattern)
-   `Evnet Target`: Lambda Function (LFN_JOB_TRIGGERER)

In [None]:
S3_CUSTOMER_CRAWLER_RULE_NAME = "htech-customer-crawler-rule"

In [None]:
crawler_rule_event_pattern = {
    "source": ["aws.glue"],
    "detail-type": ["Glue Crawler State Change"],
    "detail": {"state": ["Succeeded"], "crawlerName": [S3_CUSTOMER_CRAWLER_NAME]},
}

# Create EventBridge Rule to catch Glue Crawler State Change events
S3_CUSTOMER_CRAWLER_RULE_ARN = events_client.put_rule(
    Name=S3_CUSTOMER_CRAWLER_RULE_NAME,
    EventPattern=json.dumps(crawler_rule_event_pattern),
    State="ENABLED",
    Description="Rule to capture AWS Glue Crawler state changes",
)["RuleArn"]

In [None]:
input_payload = {
    "JOB_NAME": JOB_NAME,
    "S3_BUCKET_DATALAKE": S3_BUCKET_DATALAKE,
    "CATALOG_DB_NAME": GLUE_CATALOG_DB,
    "TABLE_NAME": TABLE_NAME,
    "TARGET": TARGET,
}

events_client.put_targets(
    Rule=S3_CUSTOMER_CRAWLER_RULE_NAME,
    Targets=[
        {
            "Id": f"{S3_CUSTOMER_CRAWLER_RULE_NAME}_job_trigger_lfn",
            "Arn": LFN_JOB_TRIGGERER_ARN,
            "Input": json.dumps(input_payload),  # Convert to proper JSON string
        }
    ],
)

In [None]:
input_payload["CATALOG_DB_NAME"]

-   `Input` data is sent into Lambda function as `event`:

```json
{
    "JOB_NAME": "jb1_s3csv_s3parquet",
    "S3_BUCKET_DATALAKE": "htech-datalake-bkt",
    "CATALOG_DB_NAME": "htech-catalog-db",
    "TABLE_NAME": "raw_customers",
    "TARGET": "s3://htech-datalake-bkt/processed/customers/"
}
```

In [None]:
# Grant EventBridge permission to invoke the Lambda function
lambda_client.add_permission(
    FunctionName=LFN_JOB_TRIGGERER_NAME,
    StatementId=f"{S3_CUSTOMER_CRAWLER_RULE_NAME}-invoke-permission",
    Action="lambda:InvokeFunction",
    Principal="events.amazonaws.com",
    SourceArn=S3_CUSTOMER_CRAWLER_RULE_ARN
)

In [None]:
# Attach the SNS Topic as a target to the EventBridge Rule for notification purposes
events_client.put_targets(
    Rule=S3_CUSTOMER_CRAWLER_RULE_NAME,
    Targets=[{
        'Id': f"{S3_CUSTOMER_CRAWLER_RULE_NAME}_sns_topic",
        'Arn': JOB_COMPLETE_TOPIC_ARN,
    }]
)

- **event** Data sent by 'Crawler state change event' into SNS Topic

```json
{
    "version": "0",
    "id": "f971dd0e-4705-d8ba-f46c-7028e8f5e0ab",
    "detail-type": "Glue Crawler State Change",
    "source": "aws.glue",
    "account": "381492255899",
    "time": "2024-10-20T15:44:53Z",
    "region": "us-east-1",
    "resources": [],
    "detail": {
        "tablesCreated": "1",
        "warningMessage": "N/A",
        "partitionsUpdated": "0",
        "tablesUpdated": "0",
        "message": "Crawler Succeeded",
        "partitionsDeleted": "0",
        "accountId": "999999999999",
        "runningTime (sec)": "26",
        "tablesDeleted": "0",
        "crawlerName": "htech-s3crawler",
        "completionDate": "2024-10-20T15:44:53Z",
        "state": "Succeeded",
        "partitionsCreated": "0",
        "cloudWatchLogLink": "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws-glue/crawlers;stream=httx-s3crawler"
    }
}
```

#### **Event Rule 2**:

-   `Event Source`: AWS Glue Job (JOB_NAME)
-   `Event Type`: "Glue Job State Change" (job_rule_event_pattern)
-   `Evnet Target`: SNS Topic (JOB_COMPLETE_RULE_NAME)

In [None]:
JOB_COMPLETE_RULE_NAME = 'htech-job-complete-rule'
job_rule_event_pattern = {
    "source": ["aws.glue"],
    "detail-type": ["Glue Job State Change"], # Event Type
    "detail": {
        "jobName": [JOB_NAME],
        "state": ["SUCCEEDED"]
    }
}

response = events_client.put_rule(
    Name=JOB_COMPLETE_RULE_NAME,
    EventPattern=json.dumps(job_rule_event_pattern),
    State='ENABLED',
    Description='Rule to capture AWS Glue job state changes',
)


In [None]:

# Attach the SNS Topic as a target to the EventBridge Rule for notification purposes
events_client.put_targets(
    Rule=JOB_COMPLETE_RULE_NAME,
    Targets=[{
        'Id': f"{JOB_COMPLETE_RULE_NAME}_target",
        'Arn': JOB_COMPLETE_TOPIC_ARN,
    }]
)

- Response sent to SNS topic by "Job Run State Change"

```json
{
    "version": "0",
    "id": "ee14807c-1f89-490e-b4e6-43f071980d95",
    "detail-type": "Glue Job State Change",
    "source": "aws.glue",
    "account": "999999999999",
    "time": "2024-10-20T15:00:35Z",
    "region": "us-east-1",
    "resources": [],
    "detail": {
        "jobName": "jb1_s3csv_s3parquet",
        "severity": "INFO",
        "state": "STOPPED",
        "jobRunId": "jr_d8165d895a8eee53494238118f07659a75b1a0d192048ed8a7a429c9ce176d5c",
        "message": "Job run stopped"
    }
}
```

#### TEST THE PIPELINE

In [None]:
s3_client.delete_object(Bucket=S3_BUCKET_DATALAKE, Key='raw/customers/customers1.csv')
# # s3_client.delete_object(Bucket=S3_BUCKET_DATALAKE, Key='raw/customers/customers2.csv')
# # s3_client.delete_object(Bucket=S3_BUCKET_DATALAKE, Key='processed/customers')

In [None]:
# AMShah_arn = iam_client.get_user(UserName="AMShah")["User"]["Arn"]
# lftn.grant_table_level_permissions(AMShah_arn, GLUE_CATALOG_DB, "raw_customers", ["DROP"])
# glue_client.delete_table(DatabaseName=GLUE_CATALOG_DB, Name="raw_customers")

In [None]:
s3_client.upload_file('../../data/customers1.csv', S3_BUCKET_DATALAKE, 'raw/customers/customers1.csv')

-   **Verify Job Bookmarks are acting as expected**

In [None]:
s3_client.upload_file('../../data/customers2.csv', S3_BUCKET_DATALAKE, 'raw/customers/customers2.csv')

In [None]:
# !aws events list-rules --name-prefix {JOB_COMPLETE_RULE_NAME}

##### Pickle Variables

In [None]:
import pickle

GLUE_ROLE_NAME = "glue-pipeline-role"
LFN_ROLE_NAME = "lfn-pipeline-role"
GLUE_ROLE_ARN = "arn:aws:iam::530976901147:role/glue-pipeline-role"
LFN_ROLE_ARN = "arn:aws:iam::530976901147:role/lfn-pipeline-role"
S3_BUCKET_DATALAKE = "htech-datalake-bkt"
S3_BUCKET_GLUE_ASSETS = "htech-glue-assets-bkt"
DQ_OBJECT = "Data_Quality(DQ)"
GLUE_CATALOG_DB = "htech-glue-catalog-db"
DATALAKE_LOCATION_URI = f"s3://{S3_BUCKET_DATALAKE}"
TOPIC_NAME = "htech-sns-topic"
JOB_COMPLETE_TOPIC_ARN = "arn:aws:sns:us-east-1:530976901147:htech-sns-topic"
LFN_CRAWLER_NAME = "crawler_triggerer"
LFN_CRAWLER_ARN = "arn:aws:lambda:us-east-1:530976901147:function:crawler_triggerer"
S3_CUSTOMER_CRAWLER_NAME = "htech-s3-customer-crawler"
S3_CUSTOMER_CRAWLER_TARGET = {"S3Targets": [{"Path": f"s3://{S3_BUCKET_DATALAKE}/{'raw/customers'}"},]}
LFN_JOB_TRIGGERER_NAME = "glue_job_triggerer"
LFN_JOB_TRIGGERER_ARN = ("arn:aws:lambda:us-east-1:530976901147:function:glue_job_triggerer")
TEM_DIR = f"s3://{S3_BUCKET_GLUE_ASSETS}/temporary/"
SPARK_EVENT_LOG_PATH = f"s3://{S3_BUCKET_GLUE_ASSETS}/sparkHistoryLogs/"
TABLE_NAME = "raw_customers"
TARGET = f"s3://{S3_BUCKET_DATALAKE}/processed/customers/"
JOB_NAME = "jb1_s3csv_s3parquet"
file_name1 = "./glue_scripts/jb1_s3csv_s3parquet.py"
object_name1 = "glues_scripts/jb1_s3csv_s3parquet.py"
JOB_SCRIPT_LOCATION = f"s3://{S3_BUCKET_GLUE_ASSETS}/{object_name1}"
ARGS = {
    "--TempDir": f"s3://{S3_BUCKET_GLUE_ASSETS}/temporary/",
    "--library-path": f"s3://{S3_BUCKET_GLUE_ASSETS}/libraries",  # Path to external libraries (JARs, compiled libraries, JDBC drivers)
    "--extra-py-files": f"s3://{S3_BUCKET_GLUE_ASSETS}/libraries/package.zip",  # Python modules/packages (Python .zip)
    "--spark-event-logs-path": SPARK_EVENT_LOG_PATH,
}

S3_CUSTOMER_CRAWLER_RULE_NAME = "htech-raw-crawler-rule"
S3_CUSTOMER_CRAWLER_RULE_ARN = ("arn:aws:events:us-east-1:530976901147:rule/htech-raw-crawler-rule")
JOB_COMPLETE_RULE_NAME = "htech-job-complete-rule"
JOB_COMPLETE_RULE_ARN = ("arn:aws:events:us-east-1:530976901147:rule/htech-job-complete-rule")
S3_SALES_CRAWLER = "htech-s3-sales-crawler"
S3_SALES_CRAWLER_TARGET = {"S3Targets": [{"Path": f"s3://{S3_BUCKET_DATALAKE}/raw/sales"}]}

# Save to file
with open("event_based_etl_ipynb.pkl", "wb") as f:
    pickle.dump((GLUE_ROLE_NAME, LFN_ROLE_NAME, GLUE_ROLE_ARN, LFN_ROLE_ARN, S3_BUCKET_DATALAKE, S3_BUCKET_GLUE_ASSETS, GLUE_CATALOG_DB, DATALAKE_LOCATION_URI, TOPIC_NAME, JOB_COMPLETE_TOPIC_ARN, LFN_CRAWLER_NAME, LFN_CRAWLER_ARN, S3_CUSTOMER_CRAWLER_NAME, S3_CUSTOMER_CRAWLER_TARGET, S3_SALES_CRAWLER, S3_SALES_CRAWLER_TARGET, LFN_JOB_TRIGGERER_NAME, LFN_JOB_TRIGGERER_ARN, TEM_DIR, SPARK_EVENT_LOG_PATH, TABLE_NAME, TARGET, JOB_NAME, JOB_SCRIPT_LOCATION, ARGS, S3_CUSTOMER_CRAWLER_RULE_NAME, S3_CUSTOMER_CRAWLER_RULE_ARN, JOB_COMPLETE_RULE_NAME, JOB_COMPLETE_RULE_ARN), f)


#### **Crawler 2**: NOT TESTED !

Catalog Data from `processed/customer` as a table by the name `processed_customers`

In [None]:
S3_PROCESSED_CRAWLER_NAME = "httx-s3-processed-crawler"
S3_PROCESSED_CRAWLER_TARGET = {
    'S3Targets': [{'Path': f"s3://{S3_BUCKET_DATALAKE}/{'processed/customers'}"},]
}
glue.create_glue_crawler(S3_PROCESSED_CRAWLER_NAME, GLUE_ROLE_ARN, GLUE_CATALOG_DB, S3_PROCESSED_CRAWLER_TARGET, table_prefix='processed_')

In [None]:
# glue_client.start_crawler(Name=S3_CRAWLER_NAME)

In [None]:
AMominNJ_arn = iam_client.get_user(UserName='AMominNJ')['User']['Arn']
lftn.grant_table_level_permissions(AMominNJ_arn, GLUE_CATALOG_DB, 'processed_customers', ['DROP'])

#### Delete Resources

In [None]:
glue_client.delete_database(CatalogId=ACCOUNT_ID,Name=GLUE_CATALOG_DB)

In [None]:
s3 = boto3.resource('s3')
bucket1 = s3.Bucket(S3_BUCKET_DATALAKE)
bucket2 = s3.Bucket(S3_BUCKET_GLUE_ASSETS)

# Delete all objects in the bucket
bucket1.objects.all().delete()
bucket2.objects.all().delete()

# Delete all object versions (if versioning is enabled)
# bucket1.object_versions.all().delete()
# bucket2.object_versions.all().delete()

# Finally, delete the bucket
bucket1.delete()
bucket2.delete()

In [None]:
glue_client.delete_crawler(Name=S3_CUSTOMER_CRAWLER_NAME)
# glue_client.delete_crawler(Name=S3_PROCESSED_CRAWLER_NAME)

In [None]:
glue_client.delete_job(JobName=JOB_NAME)

In [None]:
sns.delete_sns_topic(JOB_COMPLETE_TOPIC_ARN)

In [None]:
targets = events_client.list_targets_by_rule(Rule=S3_CUSTOMER_CRAWLER_RULE_NAME)['Targets']

In [None]:
events_client.remove_targets(Rule=S3_CUSTOMER_CRAWLER_RULE_NAME, Ids=[targets[0]['Id']])

In [None]:
events_client.delete_rule(Name=S3_CUSTOMER_CRAWLER_RULE_NAME,Force=True)

In [None]:
targets = events_client.list_targets_by_rule(Rule=JOB_COMPLETE_RULE_NAME)['Targets']

In [None]:
events_client.remove_targets(Rule=JOB_COMPLETE_RULE_NAME, Ids=[targets[0]['Id']])

In [None]:
events_client.delete_rule(Name=JOB_COMPLETE_RULE_NAME,Force=True)

In [None]:
# response = lambda_client.remove_permission(
#     FunctionName=LFN_CRAWLER_NAME,
#     StatementId='s3-invoke-permission'
# )

In [None]:
## DELETE IAM ROLE AT THE END AFTER DELETING ALL OTHER RESOURCES.
iam.delete_iam_role(GLUE_ROLE_NAME)
iam.delete_iam_role(LFN_ROLE_NAME)

In [None]:
lambda_client.delete_function(FunctionName=LFN_CRAWLER_NAME)
lambda_client.delete_function(FunctionName=LFN_JOB_TRIGGERER_NAME)

In [None]:
lgroups = [
    "/aws-glue/crawlers",
    "/aws-glue/jobs/output",
    "/aws-glue/jobs/error",
    "/aws-glue/jobs/logs-v2",
    "/aws/lambda/crawler_triggerer",
    "/aws/lambda/glue_job_triggerer"
]

In [None]:
# Delete the log group:
[logs_client.delete_log_group(logGroupName=lg) for lg in lgroups]

### [KA: Complete Master Class on Pydeequ & AWS Glue Data Quality for ETL Pipelines](https://www.youtube.com/watch?v=699jUhUE9hY&list=PLO95rE9ahzRsdzmZ_ZT-3uOn1Nh2eEpWB&index=29&t=4151s)

- [Doc: PyDeequ](https://pydeequ.readthedocs.io/en/latest/)

-   os.environ["PYSPARK_PYTHON"] = "3.1"
-   pydeequ=="1.1.0"
-   Glue Version: Glue 3.0 - Supports spark 3.1, Scala 2, Python 3


-   Jar Download: https://mvnrepository.com/artifact/com.amazon.deequ/deequ/2.0.0-spark-3.1
-   PyDeeqe Download: https://pypi.org/project/pydeequ/
    -   Used Python Module: pydeequ==1.1.0
job_parameters={
    '--aditional-python-modules': 'pydeequ==1.1.0'
}

In [None]:
job_parameters={
    "--aditional-python-modules": "pydeequ==1.1.0"
}

```txt
ImportError: Unable to import required dependencies:
numpy: Error importing numpy: you should not try to import numpy from
        its source directory; please exit the numpy source tree, and relaunch
        your python interpreter from there.
```

In [None]:
s3_client.put_object(Bucket=S3_BUCKET_DATALAKE, Key="raw/iris")
s3_client.put_object(Bucket=S3_BUCKET_DATALAKE, Key="Data-Quality(DQ)/iris")

In [None]:
# s3_client.upload_file('../../data/iris.csv', S3_BUCKET_DATALAKE, 'raw/iris/iris.csv')
s3_client.upload_file(
    "./external_java/deequ-2.0.7-spark-3.5.jar",
    S3_BUCKET_GLUE_ASSETS,
    "libraries/deequ-2.0.7-spark-3.5.jar",
)
# s3_client.delete_object(Bucket=S3_BUCKET_DATALAKE, Key="raw/iris/iris.csv")


-   **Iris Job**

In [None]:
file_name1 = './glue_scripts/iris_job_dq_val.py'       # The local file you want to upload
object_name1 = "glues_scripts/iris_job_dq_val.py"     # The name to save the file as in the S3 bucket
s3.upload_file_to_s3(S3_BUCKET_GLUE_ASSETS, file_name1, object_name1)

In [None]:
IRIS_JOB_NAME = 'iris_dq_val_job'
SOURCE_DATA = f"s3://{S3_BUCKET_DATALAKE}/raw/iris/iris.csv"
DQ_VALIDATION_OUTPUT = f"s3://{S3_BUCKET_DATALAKE}/Data-Quality(DQ)/iris/"

IRIS_JOB_SCRIPT_LOCATION = f"s3://{S3_BUCKET_GLUE_ASSETS}/{object_name1}"
TEM_DIR = f"s3://{S3_BUCKET_GLUE_ASSETS}/temporary/"
SPARK_EVENT_LOG_PATH = f"s3://{S3_BUCKET_GLUE_ASSETS}/sparkHistoryLogs/"
DEFAULT_ARGS = {
    "--TempDir": f"s3://{S3_BUCKET_GLUE_ASSETS}/temporary/",
    "--library-path": f"s3://{S3_BUCKET_GLUE_ASSETS}/libraries/deequ-2.0.7-spark-3.5.jar",  # Path to external libraries (JARs, compiled libraries, JDBC drivers)
    "--aditional-python-modules": "pydeequ==1.5.0",
    "--extra-py-files": f"s3://{S3_BUCKET_GLUE_ASSETS}/libraries/package.zip",  # Python modules/packages (Python .zip)
    "--spark-event-logs-path": SPARK_EVENT_LOG_PATH,
    # "--job-bookmark-option": "job-bookmark-enable",
    "--job-language": "python",
}

In [None]:
glue.create_glue_job_v2(IRIS_JOB_NAME, IRIS_JOB_SCRIPT_LOCATION, GLUE_ROLE_ARN, DEFAULT_ARGS, glue_version='5.0')

-    Package up and upload the **external libraries**

In [None]:
logger_module = os.getenv("UTILS") + "/mylogger.py"
build_lambda_package(
    [logger_module],
    "./external_py_lib",
    py_libs=[
        "coloredlogs",
        "termcolor",
        "pydeequ==1.5.0",
    ],
)

In [None]:
# s3_client.delete_object(Bucket=S3_BUCKET_GLUE_ASSETS, Key="libraries/package.zip")
s3.upload_folder_to_s3(S3_BUCKET_GLUE_ASSETS,"./external_py_lib", "libraries")

In [None]:
JOB_ARGUMENTS = {
    "--IRIS_JOB_NAME": IRIS_JOB_NAME,
    "--SOURCE_DATA": SOURCE_DATA,
    "--DQ_VALIDATION_OUTPUT": DQ_VALIDATION_OUTPUT,
}

In [None]:
IRIS_JOB_RUN_ID = glue_client.start_job_run(JobName=IRIS_JOB_NAME, Arguments=JOB_ARGUMENTS)["JobRunId"]

In [None]:
IRIS_JOB_RUN_ID