In [None]:
'''
> Goto Lambda Function > Select your lambda function

In the code tab , upload your code.

> Scroll down to Runtime settings [ edit Handler name to lambda_function.lambda_handler]

This was done because in my provided code, the Lambda handler function is named lambda_handler, not handler. 
Hence I have to update the AWS Lambda function configuration to use the correct handler
'''

In [5]:
import boto3
import awswrangler as wr
import pandas as pd
import csv
import json
import warnings 
import logging
import tempfile
import requests

from botocore.exceptions import ClientError
warnings.filterwarnings('ignore')

import configparser

config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))


KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')
REGION                 = "us-west-2"

In [2]:

import os
from io import StringIO

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def create_dynamodb_table(table_name):
    dynamodb = boto3.resource('dynamodb')
    
    # Check if the table already exists
    existing_tables = dynamodb.tables.all()
    existing_table_names = [table.name for table in existing_tables]

    if table_name in existing_table_names:
        # Table already exists, return the existing table
        return dynamodb.Table(table_name)

    # Define the DynamoDB table schema
    table = dynamodb.create_table(
        TableName=table_name,
        KeySchema=[
            {'AttributeName': 'Email', 'KeyType': 'HASH'},  # Assuming 'Email' as the primary key
        ],
        AttributeDefinitions=[
            {'AttributeName': 'Email', 'AttributeType': 'S'},
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    # Wait for the table to be created
    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)

    return table

In [3]:
def load_data_to_dynamodb(table, csv_content):
    csvreader = csv.DictReader(StringIO(csv_content))

    for row in csvreader:
        # Convert Salary to int
        row['Salary'] = int(row['Salary'].replace(',', '')) if 'Salary' in row else None

        # Perform additional data type conversions or transformations if needed
        # Example: Convert 'Phone' to a clean phone number
        row['Phone'] = ''.join(c for c in row['Phone'] if c.isdigit())

        # Example: Set a default value for 'Department' if it's missing
        row['Department'] = row.get('Department', 'Unknown')

        table.put_item(Item=row)

In [4]:
def lambda_handler(event, context):
    
    logger.info("Lambda function started.")
    
    # Extract information from the S3 event
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    # Assuming your CSV file has a header, and the first row contains column names
    table_name = os.path.splitext(os.path.basename(key))[0]

    # Create DynamoDB table
    dynamodb_table = create_dynamodb_table(table_name)

    # Download the CSV file from S3
    s3_client = boto3.client('s3')
    response = s3_client.get_object(Bucket=bucket, Key=key)
    csv_content = response['Body'].read().decode('utf-8')

    # Load data into DynamoDB table
    load_data_to_dynamodb(dynamodb_table, csv_content)

    # Trigger AWS Glue job
    glue = boto3.client('glue')
    glue_job_name = 's3-to-dynamodb-etl'
    glue_job_arguments = {
        '--s3_source_path': f's3://{bucket}/{key}',
        '--dynamodb_table_name': table_name
    }

    glue.start_job_run(JobName=glue_job_name, Arguments=glue_job_arguments)
    
    logger.info("Lambda function completed.")
    
    return {
        'statusCode': 200,
        'body': json.dumps(f'Table {table_name} created, and Glue job started successfully!')
    }


In [14]:
import boto3
import logging
import time

# Add logging configuration
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def query_dynamodb_with_retry(email_to_query, retry_count=3):
    # Initialize the DynamoDB resource and specify the table name
    dynamodb = boto3.resource('dynamodb', aws_access_key_id=KEY, aws_secret_access_key=SECRET)
    table_name = 'employee_data'  # Replace with your actual table name
    table = dynamodb.Table(table_name)

    for attempt in range(retry_count):
        try:
            # Log information about DynamoDB table query
            logger.info(f"Querying DynamoDB table with name: {table_name}")

            # Perform the query
            response = table.query(
                KeyConditionExpression='#email = :email',
                ExpressionAttributeNames={'#email': 'Email'},
                ExpressionAttributeValues={':email': email_to_query}
            )

            # Log the queried items
            items = response['Items']
            for item in items:
                logger.info(item)

            return response

        except Exception as e:
            logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
            time.sleep(2 ** attempt)  # Exponential backoff

    raise Exception(f"Failed to query DynamoDB after {retry_count} attempts")

# Define the query parameters
email_to_query = 'rthompson@example.org'  # Replace with the email you want to query

# Call the function with retry logic
query_dynamodb_with_retry(email_to_query)


Attempt 1 failed: An error occurred (ResourceNotFoundException) when calling the Query operation: Requested resource not found
Attempt 2 failed: An error occurred (ResourceNotFoundException) when calling the Query operation: Requested resource not found
Attempt 3 failed: An error occurred (ResourceNotFoundException) when calling the Query operation: Requested resource not found


Exception: Failed to query DynamoDB after 3 attempts