In [None]:
from boto3client import create_dynamodb_client, create_dynamodb_resource

In [None]:
client = create_dynamodb_client()
resource = create_dynamodb_resource()

In [None]:
def create_table(table_name: str, pk: str):
    """Create a dynamodb table

    Reference:
        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/create_table.html

    Table name: sample
    Partition key: ID

    Required Parameters:
        TableName
        KeySchema
        AttributeDefinitions
        BillingMode or ProvisionedThroughput
    """

    response = client.create_table(
        AttributeDefinitions=[{"AttributeName": pk, "AttributeType": "S"}],
        TableName=table_name,
        KeySchema=[{"AttributeName": pk, "KeyType": "HASH"}],
        BillingMode="PAY_PER_REQUEST",
    )
    return response


create_table(table_name="sample", pk="ID")

In [None]:
def put_item(table_name: str, pk: str, pk_value: str, name: str, age: str):
    """Put some items into the table

    https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/put_item.html

    Required Parameters:
        TableName
        Item
            Partition key
    """
    response = client.put_item(
        TableName=table_name,
        Item={pk: {"S": pk_value}, "Name": {"S": name}, "Age": {"S": age}},
    )
    return response


put_item(table_name="sample", pk="ID", pk_value="1", name="John", age="30")
put_item(table_name="sample", pk="ID", pk_value="2", name="Jane", age="25")
put_item(table_name="sample", pk="ID", pk_value="3", name="Jack", age="20")
put_item(table_name="sample", pk="ID", pk_value="4", name="Jill", age="15")
put_item(table_name="sample", pk="ID", pk_value="5", name="Joe", age="10")

In [None]:
def scan_table(table_name: str):
    """Scan the table

    Reference:
        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/scan.html

    Required Parameters:
        TableName
    """

    response = client.scan(TableName=table_name)
    return response


scan_table(table_name="sample")

In [None]:
def query_table(table_name: str, pk: str, pk_value: str):
    """Query the table

    Reference:
        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/query.html

    Required Parameters:
        TableName
        KeyConditionExpression
        ExpressionAttributeValues
    """

    response = client.query(
        TableName=table_name,
        KeyConditionExpression=f"{pk} = :pk_value",
        ExpressionAttributeValues={":pk_value": {"S": pk_value}},
    )
    return response


query_table(table_name="sample", pk="ID", pk_value="1")

In [None]:
table = resource.Table("sample")
response = table.get_item(Key={"ID": "1"})
print(response["Item"])