## AWS Glue DQ

### Set up environment variables

In [1]:
# 3. Import necessary libraries and load environment variables

from dotenv import load_dotenv, find_dotenv
import os
import boto3

# loading environment variables that are stored in local file
local_env_filename = 'dev.env'
load_dotenv(find_dotenv(local_env_filename),override=True)

os.environ['REGION'] = os.getenv('REGION')
os.environ['SQL_DATABASE'] = os.getenv('SQL_DATABASE') # LOCAL, SQLALCHEMY, REDSHIFT
os.environ['SQL_DIALECT'] = os.getenv('SQL_DIALECT') # SQlite, PostgreSQL
os.environ['DATABASE_SECRET_NAME'] = os.getenv('DATABASE_SECRET_NAME')
os.environ['S3_BUCKET_NAME'] = os.getenv('S3_BUCKET_NAME')
os.environ['GLUE_IAM_ROLE_ARN'] = os.getenv('GLUE_IAM_ROLE_ARN')


REGION = os.environ['REGION']
SQL_DATABASE = os.environ['SQL_DATABASE']
SQL_DIALECT = os.environ['SQL_DIALECT']
DATABASE_SECRET_NAME = os.environ['DATABASE_SECRET_NAME']
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
GLUE_IAM_ROLE_ARN = os.environ['GLUE_IAM_ROLE_ARN']
print(f"Using database: {SQL_DATABASE} with sql dialect: {SQL_DIALECT} in region: {REGION}")

Using database: REDSHIFT with sql dialect: PostgreSQL in region: us-east-1


In [2]:
# Initialize DatabaseUtil for Redshift

from utils.database import DatabaseUtil

db_util = DatabaseUtil(
                sql_database= SQL_DATABASE,
                region=REGION,
                secret_name=DATABASE_SECRET_NAME
)

In [3]:
# test Redshift connection and query table
sql_statement = 'SELECT COUNT(*) FROM public.syn_data'
result = db_util.run_sql(sql_statement)
print(result)

  count
0  None


In [98]:
# create AWS Glue Redshift ETL job script

glue_job_script = f'''import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, when, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, ShortType
from awsgluedq.transforms import EvaluateDataQuality
import logging
import boto3
import datetime
import time
from queue import Queue
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from botocore.config import Config
import base64
from io import BytesIO
import base64
import pandas as pd


# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize the Glue context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Get job parameters
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path', 'redshift_connection', 'redshift_table'])
input_path = args['input_path']
output_path = args['output_path']
redshift_connection = args['redshift_connection']
redshift_table = args['redshift_table']

logger.info(f'Input path: {{input_path}}')
logger.info(f'Redshift connection: {{redshift_connection}}')
logger.info(f'Redshift table: {{redshift_table}}')


# Set up the job
job.init(args['JOB_NAME'], args)

def get_redshift_connection_details(connection_name):
    glue_client = boto3.client('glue')
    try:
        response = glue_client.get_connection(Name=connection_name)
        connection_properties = response['Connection']['ConnectionProperties']
        
        return {{
            'jdbc_url': connection_properties['JDBC_CONNECTION_URL'],
            'username': connection_properties['USERNAME'],
            'password': connection_properties['PASSWORD']
        }}
    except Exception as e:
        logger.error(f"Error getting Redshift connection details: {{e}}")
        raise

def get_redshift_table_schema(connection_details, table_name):
    try:
        # Use Spark to query Redshift
        query = f"""
            SELECT t.tablename, t.\\\"column\\\", t.type, c.column_default
            FROM pg_table_def t
            LEFT JOIN information_schema.columns c 
                ON t.\\\"column\\\" = c.column_name 
                AND t.schemaname = c.table_schema 
                AND t.tablename = c.table_name
            WHERE t.schemaname = '{{table_name.split('.')[0]}}'
            AND t.tablename = '{{table_name.split('.')[1]}}'
        """
        df = spark.read.format("jdbc") \
            .option("url", connection_details['jdbc_url']) \
            .option("query", query) \
            .option("user", connection_details['username']) \
            .option("password", connection_details['password']) \
            .option("driver", "com.amazon.redshift.jdbc42.Driver") \
            .load()
        
        # Get column information
        columns_info = df.select("column", "type").collect()
        
        print(f'Redshfit table columns info: {{columns_info}}')
        
        # Create a schema based on Redshift data types
        schema = StructType()
        for col_info in columns_info:
            col_name = col_info['column']
            data_type = col_info['type']
            if data_type == 'integer':
                schema.add(StructField(col_name, IntegerType()))
            elif data_type == 'bigint':
                schema.add(StructField(col_name, LongType()))
            elif data_type in ['double precision', 'real']:
                schema.add(StructField(col_name, DoubleType()))
            elif data_type == 'smallint':
                schema.add(StructField(col_name, ShortType()))
            else:
                schema.add(StructField(col_name, StringType()))
        
        return schema, columns_info
    except Exception as e:
        logger.error(f"Error getting Redshift table schema: {{e}}")
        raise


class BedrockLLMWrapper():
    def __init__(self,
        model_id: str = 'us.anthropic.claude-3-5-sonnet-20241022-v2:0',
        embedding_model_id: str = 'amazon.titan-embed-image-v1',
        system_prompt: str = 'You are a helpful AI Assistant.',
        region: str = 'us-east-1',
        top_k: int = 5,
        top_p: int = 0.7,
        temperature: float = 0.0,
        max_token_count: int = 4000,
        max_attempts: int = 3,
        debug: bool = False

    ):

        
        
        self.embedding_model_id = embedding_model_id
        self.system_prompt = system_prompt
        self.region = region
        self.top_k = top_k
        self.top_p = top_p
        self.temperature = temperature
        self.max_token_count = max_token_count
        self.max_attempts = max_attempts
        self.debug = debug
        config = Config(
            retries = {{
                'max_attempts': 10,
                'mode': 'standard'
            }}
        )

        self.bedrock_runtime = boto3.client(service_name="bedrock-runtime", config=config, region_name=self.region)

        #use cross-region inference if model_id starts with us
        self.model_id = model_id

    def get_valid_format(self, file_format):
        format_mapping = {{
            'jpg': 'jpeg',
            'gif': 'gif',
            'png': 'png',
            'webp': 'webp'
        }}
        return format_mapping.get(file_format.lower(), 'jpeg')  # Default to 'jpeg' if format is not recognized
    
    # def process_image(self, image_path, max_size=(512, 512)):
    #     with open(image_path, "rb") as image_file:
    #         # Read the image file
    #         image = image_file.read()
    #         image = Image.open(BytesIO(image)).convert("RGB")
            
    #         # Resize image while maintaining aspect ratio
    #         image.thumbnail(max_size, Image.LANCZOS)
            
    #         # Create a new image with the target size and paste the resized image
    #         new_image = Image.new("RGB", max_size, (255, 255, 255))
    #         new_image.paste(image, ((max_size[0] - image.size[0]) // 2,
    #                                 (max_size[1] - image.size[1]) // 2))
            
    #         # Save to BytesIO object
    #         buffered = BytesIO()
    #         new_image.save(buffered, format="JPEG")
            
    #         # Encode to base64
    #         input_image_base64 = base64.b64encode(buffered.getvalue()).decode('utf8')
        
    #     return input_image_base64

    def get_embedding(self, input_text=None, image_path=None):
        """
        This function is used to generate the embeddings for a specific chunk of text
        """
        accept = 'application/json'
        contentType = 'application/json'
        request_body = {{}}

        if input_text:
            request_body["inputText"] = input_text
        if image_path:
            # Process and encode the image
            img_base64 = '' #self.process_image(image_path)
            request_body["inputImage"] = img_base64

        # request_body["dimensions"] = 1024
        # request_body["normalize"] = True

        if 'amazon' in self.embedding_model_id:
            embeddingInput = json.dumps(request_body)
            response = self.bedrock_runtime.invoke_model(body=embeddingInput, 
                                                        modelId=self.embedding_model_id, 
                                                        accept=accept, 
                                                        contentType=contentType)
            embeddingVector = json.loads(response['body'].read().decode('utf8'))
            return embeddingVector['embedding']
                
        if 'cohere' in self.embedding_model_id:
            request_body["input_type"] = "search_document" # |search_query|classification|clustering
            request_body["truncate"] = "NONE" # NONE|START|END
            embeddingInput = json.dumps(request_body)
    
            response = self.bedrock_runtime.invoke_model(body=embeddingInput, 
                                                            modelId=self.embedding_model_id, 
                                                            accept=accept, 
                                                            contentType=contentType)
    
            response_body = json.loads(response.get('body').read())
            # print(response_body)
            embeddingVector = response_body['embedding']
            
            return embeddingVector
    
    def generate(self,prompt,attachment_file=None, image_file=None, image_file2=None):
        if self.debug: 
            print('entered BedrockLLMWrapper generate')
        message = {{}}    
        attempt = 1
        if image_file is not None:
            if self.debug: 
                print('processing image1: ', image_file)
            # extract file format from the image file
            file_format = image_file.split('.')[-1]
            valid_format = self.get_valid_format(file_format)

            # Open and read the image file
            with open(image_file, 'rb') as img_file:
                image_bytes = img_file.read()
                if self.debug: 
                    print('image_bytes: ', image_bytes)
                    print('valid_format: ', valid_format)

            message = {{
                "role": "user",
                "content": [
                    {{ "text": "Image 1:" }},
                    {{
                        "image": {{
                            "format": valid_format,
                            "source": {{
                                "bytes": image_bytes 
                            }}
                        }}
                    }},
                    {{ "text": prompt }}
                ],
                    }}
            
        if image_file is not None and image_file2 is not None:
            if self.debug: 
                print('processing image2: ', image_file2)
            # extract file format from the image file
            file_format2 = image_file2.split('.')[-1]
            valid_format2 = self.get_valid_format(file_format2)

            with open(image_file2, 'rb') as img_file:
                image_bytes2 = img_file.read()
                if self.debug: 
                    print('image_bytes2: ', image_bytes2)
                    print('valid_format2: ', valid_format2)
            
            message = {{
            "role": "user",
            "content": [
                {{ "text": "Image 1:" }},
                {{
                    "image": {{
                        "format": valid_format,
                        "source": {{
                            "bytes": image_bytes 
                        }}
                    }}
                }},
                {{ "text": "Image 2:" }},
                {{
                    "image": {{
                        "format": valid_format2,
                        "source": {{
                            "bytes": image_bytes2 
                        }}
                    }}
                }},
                {{ "text": prompt }}
            ],
                }}
        
        if attachment_file is not None:
            with open(attachment_file, 'rb') as attachment_file:
                attachment_bytes = attachment_file.read()
                if self.debug: 
                    print('attachment_bytes: ', attachment_bytes)
            
            message = {{
                "role": "user",
                "content": [
                    {{
                        "document": {{
                            "name": "Document 1",
                            "format": "csv",
                            "source": {{
                                "bytes": attachment_bytes
                            }}
                        }}
                    }},
                    {{ "text": prompt }}
                ]
            }}
            
        if image_file is None and image_file2 is None and attachment_file is None:
            message = {{
                "role": "user",
                "content": [{{"text": prompt}}]
            }}
        messages = []
        messages.append(message)
        
        # model specific inference parameters to use.
        if "anthropic" in self.model_id.lower():
            system_prompts = [{{"text": self.system_prompt}}]
            # Base inference parameters to use.
            inference_config = {{
                                "temperature": self.temperature, 
                                "maxTokens": self.max_token_count,
                                "stopSequences": ["\\n\\nHuman:"],
                                "topP": self.top_p,
                            }}
            additional_model_fields = {{"top_k": self.top_k}}
        else:
            system_prompts = []
            # Base inference parameters to use.
            inference_config = {{
                                "temperature": self.temperature, 
                                "maxTokens": self.max_token_count,
                            }}
            additional_model_fields = {{"top_k": self.top_k}}

        if self.debug: 
            print('Sending: System: ',system_prompts,'Messages: ',str(messages))

        while True:
            try:

                # Send the message.
                response = self.bedrock_runtime.converse(
                    modelId=self.model_id,
                    messages=messages,
                    system=system_prompts,
                    inferenceConfig=inference_config,
                    additionalModelRequestFields=additional_model_fields
                )

                text = response['output'].get('message').get('content')[0].get('text')
                usage = response['usage']
                latency = response['metrics'].get('latencyMs')

                if self.debug: 
                    print(f'text: {{text}} ; and token usage: {{usage}} ; and query_time: {{latency}}')    
                
                break
               
            except Exception as e:
                print("Error with calling Bedrock: "+str(e))
                attempt+=1
                if attempt>self.max_attempts:
                    print("Max attempts reached!")
                    result_text = str(e)
                    break
                else:#retry in 10 seconds
                    print("retry")
                    time.sleep(60)

        # return result_text
        return [text,usage,latency]

    # Threaded function for queue processing
    def thread_request(self, q, results):
        while True:
            try:
                index, prompt = q.get(block=False)
                data = self.generate(prompt)
                results[index] = data
            except Queue.Empty:
                break
            except Exception as e:
                print(f'Error with prompt: {{str(e)}}')
                results[index] = str(e)
            finally:
                q.task_done()

 
    def generate_threaded(self, prompts, attachments=None, images=None, max_workers=15):
        
        if images is None:
            images = [None] * len(prompts)
        elif len(prompts) != len(images):
            raise ValueError("The number of prompts must match the number of images (or images must be None)")
        
        if attachments is None:
            attachments = [None] * len(prompts)
        elif len(prompts) != len(attachments):
            raise ValueError("The number of prompts must match the number of attachments (or attachments must be None)")

        results = [None] * len(prompts)
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            
            future_to_index = {{executor.submit(self.generate, prompt, attachment_file, image_file): i 
                               for i, (prompt, attachment_file, image_file) in enumerate(zip(prompts, attachments, images))}}
            for future in as_completed(future_to_index):
                index = future_to_index[future]
                try:
                    results[index] = future.result()
                except Exception as exc:
                    print(f'Generated an exception: {{exc}}')
                    results[index] = str(exc)
        
        return results

PROMPT_TEMPLATE_FIX = """
You are an expert data analyst specializing in data quality and anomaly detection. 
Your task is to analyze the below data quality anomaly detection result and fix the data quality issues row by row.

Data quality result:
{{DATA_QUALITY_RESULT}}

Target table schema:
{{TARGET_TABLE_SCHEMA}}

Please analyze the data thoroughly.

Return the response in the following JSON format, ensuring that all special characters
are properly escaped and the JSON iswell-formed:

[
  {{{{"column_name1": "column_value1", 
  "column_name2": "column_value2", 
  "column_name3": "column_value3",
  <all fields from  source data> 
  }}}},
  {{{{"<...>"}}}},
]

Do not include the data quality result related columns DataQualityRulesPass, DataQualityRulesFail, DataQualityRulesSkip, DataQualityEvaluationResult.
Only include the columns that are in the target table schema.
Only include JSON and nothing else in the response"""

def bedrock_dq_fix(dq_results_df, redshift_schema, llm=None, prompt_template=None):
    
    if not llm:
        # default to claude sonnet 3.5
        llm = "us.anthropic.claude-3-5-sonnet-20241022-v2:0"

    if not prompt_template:
        # default to the prompt template for fixing data quality issues
        prompt_template = PROMPT_TEMPLATE_FIX

    # convert dataframe to a string
    dataframe_str = dq_results_df.to_string()
    print(f'Data Quality Result:{{dataframe_str}}')

    redshift_schema_str = str(redshift_schema)
    print(f'Target Table Schema:{{redshift_schema_str}}')

    prompt = prompt_template.format(DATA_QUALITY_RESULT=dataframe_str, TARGET_TABLE_SCHEMA=redshift_schema_str)    
    bedrock = BedrockLLMWrapper(debug=False, max_token_count=4096, model_id=llm)
    print(f'Calling Bedrock to generate response for prompt:{{prompt}}')
    result = bedrock.generate(prompt)
    print(f'Got a Bedrock response: {{str(result[0])}}')
    # parse json result[0] to a pandas dataframe
    json_data = json.loads(result[0])
    print(f'Parsed JSON data to pandas dataframe')

    dtype_mapping = {{}}
    for field in redshift_schema:
        col_name = field['column']
        data_type = field['type'].lower()
        
        if col_name == "timestamp":  # Special handling for timestamp
            dtype_mapping[col_name] = 'int64'  # Use int64 for large timestamp values
        elif data_type == 'integer':
            dtype_mapping[col_name] = 'Int64'  # Using nullable integer type
        elif data_type == 'bigint':
            dtype_mapping[col_name] = 'int64'  # Use int64 for large integers
        elif data_type in ['double precision', 'real']:
            dtype_mapping[col_name] = 'float64'
        else:
            dtype_mapping[col_name] = 'object'  # String/text data

    # Create DataFrame with explicit data types
    updated_df = pd.DataFrame(json_data).astype(dtype_mapping)
    print(f'Updated data in PandasDataFrame:{{updated_df.to_string()}}')

    return updated_df

try:
    # Get Redshift connection details
    connection_details = get_redshift_connection_details(redshift_connection)
    
    # Get Redshift table schema
    spark_schema, redshift_schema = get_redshift_table_schema(connection_details, redshift_table)
    logger.info(f"Redshift table schema: {{redshift_schema}}")
    logger.info(f"spark schema: {{spark_schema}}")

    # Read the CSV file from S3
    logger.info(f"Reading data from {{input_path}}")
    if not input_path.startswith('s3://'):
        raise ValueError(f"Invalid S3 path: {{input_path}}. Path must start with 's3://'")
    df = spark.read.csv(input_path, header=True, inferSchema=True)
    
    logger.info(f"Read {{df.count()}} records from S3")

    # AWS GLUE DQ CHECKS
    # https://docs.aws.amazon.com/glue/latest/dg/data-quality-gs-studio-notebooks.html

    EvaluateDataQuality_ruleset = """
        Rules = [
            ColumnDataType "id" = "Integer",
            ColumnDataType "version" = "Integer",
            ColumnLength "name" <= 20
            
        ]
        Analyzers = [
        RowCount ,
        ColumnCount ,
        ColumnLength "name",
        Completeness "id",
        Completeness "version"
        ]
    """

    # convert dataframe to dynamic frame
    dyf = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")

    EvaluateDataQualityMultiframe = EvaluateDataQuality().process_rows(
    frame=dyf,
    ruleset=EvaluateDataQuality_ruleset,
    publishing_options={{
            "dataQualityEvaluationContext": "EvaluateDataQualityMultiframe",
            "enableDataQualityCloudWatchMetrics": False,
            "enableDataQualityResultsPublishing": False
        }},
        additional_options={{"performanceTuning.caching": "CACHE_NOTHING"}}
    )

    # review results
    ruleOutcomes = SelectFromCollection.apply(
        dfc=EvaluateDataQualityMultiframe,
        key="ruleOutcomes",
        transformation_ctx="ruleOutcomes",
    )

    ruleOutcomes.toDF().show(truncate=False)

    # review row level results
    rowLevelOutcomes = SelectFromCollection.apply(
        dfc=EvaluateDataQualityMultiframe,
        key="rowLevelOutcomes",
        transformation_ctx="rowLevelOutcomes",
    )

    rowLevelOutcomes_df = rowLevelOutcomes.toDF() # Convert Glue DynamicFrame to SparkSQL DataFrame
    rowLevelOutcomes_df_passed = rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Passed") # Filter only the Passed records.
    rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Failed").show(5, truncate=False) # Review the Failed records                    
    rowLevelOutcomes_df_error = rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Failed")


    # convert back to DynamicFrame before writing
    rowLevelOutcomes_dyf_passed = DynamicFrame.fromDF(rowLevelOutcomes_df_passed, glueContext, "passed_records")
    rowLevelOutcomes_dyf_error = DynamicFrame.fromDF(rowLevelOutcomes_df_error, glueContext, "error_records")
    
    # write error records to S3 destination for review
    glueContext.write_dynamic_frame.from_options(
        frame = rowLevelOutcomes_dyf_error,
        connection_type = "s3",
        connection_options = {{"path": f'{{output_path}}/etl_detected_dq_errors'}},
        format = "json")

    # get the target table columns from the spark schema
    original_columns = [field.name for field in spark_schema.fields]
         
    # flatten the passed records
    rowLevelOutcomes_df_passed_flat = rowLevelOutcomes_df_passed.select(original_columns)

    # also attempt to fix the data quality issues if rowLevelOutcomes_df_error is not empty
    if rowLevelOutcomes_df_error.count() == 0:
        print(f'No data quality issues found')

        df = rowLevelOutcomes_df_passed_flat

    else:
        # check type of rowLevelOutcomes_df_error
        print(f'rowLevelOutcomes_df_error type: {{type(rowLevelOutcomes_df_error)}}')
        # convert to pandas dataframe
        rowLevelOutcomes_df_error_pandas = rowLevelOutcomes_df_error.toPandas()
        print(f'Attempting to fix {{rowLevelOutcomes_df_error.count()}} data quality issues')
        updated_df = bedrock_dq_fix(rowLevelOutcomes_df_error_pandas, redshift_schema)

        # flatten the updated records (assuming updated_df is a Pandas DataFrame from bedrock_dq_fix)
        print(f'Flattening the updated records')
        updated_df_spark = spark.createDataFrame(updated_df)
        updated_df_flat = updated_df_spark.select(original_columns)

        # combine the two Spark DataFrames
        print(f'Combining the two Spark DataFrames')
        combined_df = rowLevelOutcomes_df_passed_flat.union(updated_df_flat)

        # combined_df is a Spark DataFrame ready for further processing
        print(f'Returning combined Spark DataFrame')
        df = combined_df

    # align df schema with Redshift schema
    for field in spark_schema.fields:
        if field.name in df.columns:
            if isinstance(field.dataType, (IntegerType, LongType, ShortType)):
                df = df.withColumn(field.name, col(field.name).cast(field.dataType))
            elif isinstance(field.dataType, DoubleType):
                df = df.withColumn(field.name, col(field.name).cast(DoubleType()))
            else:
                df = df.withColumn(field.name, col(field.name).cast(StringType()))
        else:
            df = df.withColumn(field.name, lit(None).cast(field.dataType))

    # handle null values (optional, adjust as needed)
    for column in df.columns:
        df = df.withColumn(column, when(col(column) == "", None).otherwise(col(column)))
    
    # convert back from Spark DataFrame to Glue DynamicFrame with only the original columns
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "flattened_passed_and_updated_records")

    # Write to Redshift
    logger.info(f"Writing data to Redshift table {{redshift_table}}")
    glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=dynamic_frame,
        catalog_connection=redshift_connection,
        connection_options={{
            "dbtable": redshift_table,
            "database": connection_details['jdbc_url'].split('/')[-1]
        }},
        redshift_tmp_dir=f"s3://{S3_BUCKET_NAME}/redshift-tmp/",
        transformation_ctx="datasink"
    )
    
    logger.info("Data successfully written to Redshift")

except Exception as e:
    logger.error(f"An error occurred: {{str(e)}}")
    raise

# Commit the job
job.commit()
'''

# Upload the updated script to S3
s3_client = boto3.client('s3')
object_key = 'glue/glue-etl-llm-processing.py'

try:
    s3_client.put_object(Bucket=S3_BUCKET_NAME, Key=object_key, Body=glue_job_script)
    print(f"Successfully uploaded updated Glue job script to s3://{S3_BUCKET_NAME}/{object_key}")
except Exception as e:
    print(f"Error uploading Glue job script to S3: {e}")

s3_script_path = f's3://{S3_BUCKET_NAME}/{object_key}'
print(f"S3 script path: {s3_script_path}")

Successfully uploaded updated Glue job script to s3://felixh-demo/glue/glue-etl-llm-processing.py
S3 script path: s3://felixh-demo/glue/glue-etl-llm-processing.py


In [35]:
glue_client = boto3.client('glue', region_name=REGION)

glue_job_name = 'RedshiftETL-DQ-bedrock'

In [36]:
# create AWS Glue job via boto3
 
response = glue_client.create_job(
    Name=glue_job_name,
    Description='ETL job to load data from S3 to Redshift with LLM auto-correction',
    Role=GLUE_IAM_ROLE_ARN,
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    },
    DefaultArguments={
        # required for Amazon Bedrock use this if you have internet access
        #'--additional-python-modules': 'boto3>=1.35.59', 
        # required for Amazon Bedrock use this if you do not have internet access
        '--additional-python-modules': 's3://felixh-demo/lib/boto3-1.35.60-py3-none-any.whl,s3://felixh-demo/lib/botocore-1.35.60-py3-none-any.whl,s3://felixh-demo/lib/jmespath-1.0.1-py3-none-any.whl,s3://felixh-demo/lib/python_dateutil-2.9.0.post0-py2.py3-none-any.whl,s3://felixh-demo/lib/s3transfer-0.10.3-py3-none-any.whl,s3://felixh-demo/lib/six-1.16.0-py2.py3-none-any.whl,s3://felixh-demo/lib/urllib3-1.26.20-py2.py3-none-any.whl',
        '--python-modules-installer-option': '--upgrade',
        '--connection-names': 'dev-redshift-connection'
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': s3_script_path,
        'PythonVersion': '3'
    },
    MaxRetries=0,
    Timeout=1440,
    Tags={
        'usecase': 'Glue ETL DQ'
    },
    GlueVersion='4.0',
    NumberOfWorkers=1,
    WorkerType='Standard',
    Connections={
        'Connections': ['dev-redshift-connection']
    }
)
print(response)

{'Name': 'RedshiftETL-DQ-bedrock', 'ResponseMetadata': {'RequestId': 'c1dc2aa4-fbfe-4bca-8a4a-765a1ecf8bc6', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 14 Nov 2024 17:26:45 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '33', 'connection': 'keep-alive', 'x-amzn-requestid': 'c1dc2aa4-fbfe-4bca-8a4a-765a1ecf8bc6'}, 'RetryAttempts': 0}}


In [60]:
# run AWS Glue job with arguments
args = {
    '--JOB_NAME': glue_job_name,
    '--input_path': f's3://felixh-demo/etl-data/data_anomaly0.csv',
    '--output_path': f's3://felixh-demo/',
    '--redshift_connection': 'dev-redshift-connection',
    '--redshift_table': 'public.syn_data'
}

response = glue_client.start_job_run(
    JobName=glue_job_name,
    Arguments=args
)
job_run_id = response['JobRunId']
print(f'Job run ID: {job_run_id}')

Job run ID: jr_8370337001b52848fc6dd622f6fb0c9737e7cfa988fcf4403d578fcc1fe7915d


In [None]:
# wait on the status of the Glue job
import time

def wait_for_job_completion(job_name, run_id, max_attempts=30):
    """Wait for an AWS Glue job to complete, checking status every 30 seconds."""
    for i in range(max_attempts):
        response = glue_client.get_job_run(JobName=job_name, RunId=run_id)
        status = response['JobRun']['JobRunState']
        
        print(f"Job status: {status} (attempt {i+1}/{max_attempts})")
        
        if status in ['SUCCEEDED', 'FAILED', 'STOPPED', 'TIMEOUT']:
            return status
            
        time.sleep(30)
    
    return 'TIMEOUT'

# Wait for job completion
final_status = wait_for_job_completion(glue_job_name, job_run_id)
print(f"Final job status: {final_status}")

In [61]:
# this last job should have loaded all records to Redshift 

# so let'scheck if any records were loaded to Redshift
sql_statement = 'SELECT * FROM public.syn_data'
result = db_util.run_sql(sql_statement)
print(result)

   id      timestamp           name  version
0   1  1683849600000     John Smith        5
1   2  1683849660000  Sarah Johnson        6
2   3  1683849720000     Mike Brown        5
3   4  1683849780000    Emily Davis        7
4   5  1683849840000  Robert Wilson        5


In [99]:
# run AWS Glue job again with different input file that has anomalies
args = {
    '--JOB_NAME': glue_job_name,
    '--input_path': f's3://felixh-demo/etl-data/data_anomaly2.csv',
    '--output_path': f's3://felixh-demo/',
    '--redshift_connection': 'dev-redshift-connection',
    '--redshift_table': 'public.syn_data'
}

response = glue_client.start_job_run(
    JobName=glue_job_name,
    Arguments=args
)
job_run_id = response['JobRunId']
print(f'Job run ID: {job_run_id}')

Job run ID: jr_187d814e96cdeedec7f735217fa60019cfce1754d0e813056a1265114d5b03b2


In [100]:
# Wait for job completion
final_status = wait_for_job_completion(glue_job_name, job_run_id)
print(f"Final job status: {final_status}")

Job status: WAITING (attempt 1/30)
Job status: RUNNING (attempt 2/30)
Job status: RUNNING (attempt 3/30)
Job status: RUNNING (attempt 4/30)
Job status: RUNNING (attempt 5/30)
Job status: RUNNING (attempt 6/30)
Job status: SUCCEEDED (attempt 7/30)
Final job status: SUCCEEDED


In [None]:
# this last job should have loaded all records to Redshift 
# despite the original data quality issues

# so let'scheck if any records were loaded to Redshift
sql_statement = 'SELECT * FROM public.syn_data'
result = db_util.run_sql(sql_statement)
print(result)

Empty DataFrame
Columns: [id, timestamp, name, version]
Index: []


## Conclusion
This notebook demonstrates how to use AWS Glue Data Quality to check for data quality issues in a CSV file and then use an LLM to auto-correct the issues and load the updated records into an Amazon Redshift table.