In [1]:
import boto3
import yaml
from typing import Any, Dict

In [9]:
with open("../config/tenants/dev.yaml", "r", encoding="utf-8") as f:
    tenant_config: Dict[str, Any] = yaml.safe_load(f)

dynamodb_config = tenant_config.get("dynamodb", {}) or {}
dynamodb = boto3.resource( "dynamodb", region_name=dynamodb_config.get("region"))

messages_table = dynamodb.Table(dynamodb_config["messages_table"])
processes_table = dynamodb.Table(dynamodb_config["processes_table"])
tasks_table = dynamodb.Table(dynamodb_config["tasks_table"])

s3_config = tenant_config.get("s3", {}) or {}
_bucket = s3_config.get("bucket")
_s3 = boto3.client("s3", region_name=s3_config.get("region"))

folder = "whatsapp_media/525571969848/"

In [5]:
def clear_table(table):
    key_names = [k["AttributeName"] for k in table.key_schema]

    scan_kwargs = {}
    while True:
        resp = table.scan(**scan_kwargs)
        items = resp.get("Items", [])

        with table.batch_writer() as batch:
            for item in items:
                batch.delete_item(
                    Key={k: item[k] for k in key_names}
                )

        lek = resp.get("LastEvaluatedKey")
        if not lek:
            break
        scan_kwargs["ExclusiveStartKey"] = lek

In [6]:
clear_table(messages_table)
clear_table(processes_table)
clear_table(tasks_table)

In [7]:
def clear_s3_folder(s3_client, bucket: str, prefix: str):
    paginator = s3_client.get_paginator("list_objects_v2")

    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        contents = page.get("Contents", [])
        if not contents:
            continue

        objects = [{"Key": obj["Key"]} for obj in contents]

        s3_client.delete_objects(
            Bucket=bucket,
            Delete={"Objects": objects}
        )

In [10]:
clear_s3_folder(_s3, _bucket, folder)