In [None]:
import boto3
from botocore.exceptions import ClientError
from langchain.tools import tool
from langchain_community.llms import Ollama
from langchain.agents import initialize_agent, AgentType
from langchain_openai import ChatOpenAI
import os

In [None]:
@tool
def get_credentials(session_name:str):
    """
    Retrieves temporary AWS credentials. Use this tool when the agent needs to authenticate to AWS .
    """
    try:
        sts_client = boto3.client('sts')
        assumed_role = sts_client.assume_role(
            RoleArn=role_arn,
            RoleSessionName=session_name
        )
        global creds
        creds = assumed_role['Credentials']
        return {"status": "success"}
    except Exception as e:
        return {"status": "error", "message": e}

__CloudWatch__

In [None]:
@tool
def get_cloudWatch_logs(log_group: str):
    """
    Fetch logs from AWS CloudWatch.
    - log_group: full log group path like '/aws/lambda/function_name'
    """
    try:
        logs_client = boto3.client(
            'logs',
            aws_access_key_id=creds['AccessKeyId'],
            aws_secret_access_key=creds['SecretAccessKey'],
            aws_session_token=creds['SessionToken'],
            region_name=region
        )

        streams = logs_client.describe_log_streams(
            logGroupName=log_group,
            orderBy='LastEventTime',
            descending=True,
            limit=1
        )
        if not streams['logStreams']:
            return {"status": "error", "message": f"No log streams found for {log_group}"}

        stream_name = streams['logStreams'][0]['logStreamName']

        logs = logs_client.get_log_events(
            logGroupName=log_group,
            logStreamName=stream_name,
            limit=50
        )

        extracted_logs = []
        for event in logs['events']:
            extracted_logs.append({
                "timestamp": event['timestamp'],
                "message": event['message'].strip()
            })

        return {"status": "success", "logs": extracted_logs}

    except Exception as e:
        return {"status": "error", "message": str(e)}

__S3__

In [None]:
s3_client = None

@tool
def get_s3_client(region="us-east-1"):
    """
    Initializes S3 client using temporary AWS credentials. This tool should be called once per session. Call this tool again only if credentials expire or a new session is started.
    """
    try:
        global s3_client
        s3_client = boto3.client("s3",
            aws_access_key_id=creds['AccessKeyId'],
            aws_secret_access_key=creds['SecretAccessKey'],
            aws_session_token=creds['SessionToken'],
            region_name=region)
        return {"status": "success"}
    except Exception as e:
        return {"status": "error", "message": e}

In [None]:
@tool
def create_bucket(bucket_name: str, region="us-east-1"):
    """
    Creates a new S3 bucket in AWS.
    Parameters:
        bucket_name (str): The name for the new S3 bucket (must be globally unique).
    Returns:
        str: Success or error message.
    """
    try:
        s3_client.create_bucket(Bucket=bucket_name)
        result = {
        "Status": "success",
        "Bucket": bucket_name,
        "Region": region
        }
        return result
    except Exception as e:
        error = {
            "Status": "failure",
            "Bucket": bucket_name,
            "Region": region,
            "Error": e
        }
        return error

@tool
def delete_bucket(bucket_name: str):
    """
    Deletes the S3 bucket.
    Parameters:
        bucket_name (str)
    Returns:
        str: Success or error message.
    """
    try:
        s3_client.delete_bucket(Bucket=bucket_name)
        result = {
        "Status": "success",
        "Bucket": bucket_name
        }
        return result
    except Exception as e:
        error = {
            "Status": "error",
            "Bucket": bucket_name,
            "Region": region
        }
        return error

@tool
def list_s3_buckets(region: str):
    """
    Lists all S3 buckets in the AWS account.
    """
    try:
        response = s3_client.list_buckets()
        buckets = [bucket['Name'] for bucket in response.get('Buckets', [])]
        return {
            "Status": "success",
            "BucketCount": len(buckets),
            "Buckets": buckets
        }
    except ClientError as e:
        return {
            "Status": "error",
            "Message": str(e)
        }

@tool
def upload_file_to_bucket(file_path: str, bucket_name: str):
    """
    Uploads a file to an S3 bucket.
    Input:
      - file_path (str): Local path of the file to upload.
    """
    object_name = os.path.basename(file_path)
    try:
        s3_client.upload_file(file_path, bucket_in_use, object_name)
        return {
            "Status": "success",
            "Bucket": bucket_name,
            "Object": object_name,
            "Message": f"File '{file_path}' uploaded to bucket '{bucket_name}' as '{object_name}'."
        }
    except Exception as e:
        return {
            "Status": "error",
            "Message": str(e)
        }

@tool
def list_bucket_objects(bucket_name: str):
    """
    Lists the object inside a S3 Bucket.
    """
    try:
        obj_list = s3_client.list_objects_v2(Bucket=bucket_name)
        return {"status": "success", "objects list": obj_list, "bucket name": bucket_name}
    except Exception as e:
        return {"status": "error", "mesage": e}

@tool
def apply_bucket_policy(policy: str, bucket_name: str):
    """
    Apply a policy to an S3 Bucket.
    """
    try:
        s3_client.put_bucket_policy(Bucket= bucket_in_use, Policy=policy)
        return {"status": "success", "applied policy": policy, "bucket": bucket_in_use}
    except Exception as e:
        return {"status": "error", "mesage": e}

@tool
def delete_object(key: str, bucket_name: str):
    """
    Delete an object from a S3 bucket.
    """
    try:
        s3_client.delete_object(
            Bucket=bucket_name,
            Key=key,
        )
        return {"status": "success"}
    except Exception as e:
        return {"status": "error", "mesage": e}

@tool
def download_file(key: str, bucket_name: str):
    """
    Downloads file from a S3 bucket and store it in filepath.
    """
    try:
        s3_client.download_file(bucket_name, key, key)
        return {"status": "success", "file downloaded": key}
    except Exception as e:
        return {"status": "error", "mesage": e}

In [None]:
llm = ChatOpenAI(openai_api_key=openai_key, model="gpt-4-turbo")

tools = [get_s3_client, delete_object, delete_bucket, download_file, get_credentials, deploy_to_lambda, get_cloudWatch_logs, create_bucket, upload_file_to_bucket, list_s3_buckets, list_bucket_objects, apply_bucket_policy]

agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True,
    handle_parsing_errors=True
)

res = agent.run("")

print(res)