In [None]:
import time, logging
import boto3, s3fs, pandas as pd
import os
import zlib
import base64
from botocore.exceptions import ClientError

region = os.getenv('AWS_REGION')
access_key = os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')

session = boto3.Session()
credentials = session.get_credentials()

# ── CONFIG ────────────────────────────────────────────
S3_PATH    = "s3://manual-ref-data/bger-2024-3-text.parquet"
TABLE_NAME = "manual-fed-court-decisions"   # your DynamoDB table
BATCH_SIZE = 25
MAX_BACKOFF = 32

# ── SETUP ─────────────────────────────────────────────
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(levelname)s %(message)s")


ddb = boto3.client("dynamodb", region_name=region)
fs  = s3fs.S3FileSystem(anon=False, client_kwargs={"region_name": region})

def compress_text(txt: str) -> str:
    """Compress with zlib (deflate) then base64-encode to a string."""
    compressed = zlib.compress(txt.encode("utf-8"), level=9)
    return base64.b64encode(compressed).decode("ascii")

def load_parquet(s3_path):
    logging.info(f"Reading {s3_path} …")
    df = pd.read_parquet(s3_path, filesystem=fs)
    logging.info(f"Loaded {len(df)} rows.")
    return df

def batch_write(items: list[dict]):
    """
    items: list of dicts with keys 'docref', 'url', 'text_compressed'
    Retries any unprocessed items with exponential backoff.
    """
    # build PutRequest list
    requests = []
    for row in items:
        requests.append({
            "PutRequest": {
                "Item": {
                    "docref":           {"S": row["docref"]},
                    "url":              {"S": row["url"]},
                    "text_compressed":  {"S": row["text_compressed"]}
                }
            }
        })

    backoff = 1
    while True:
        try:
            resp = ddb.batch_write_item(RequestItems={TABLE_NAME: requests})
        except ClientError as e:
            logging.error("DynamoDB ClientError: %s", e)
            raise

        unproc = resp.get("UnprocessedItems", {}).get(TABLE_NAME, [])
        if not unproc:
            return

        # retry only the unprocessed subset
        requests = unproc
        logging.warning("Retrying %d unprocessed items after %ds", len(requests), backoff)
        time.sleep(backoff)
        backoff = min(backoff * 2, MAX_BACKOFF)


def main():
    df = load_parquet(S3_PATH)
    records = df.to_dict(orient="records")
    total   = len(records)
    count   = 0

    for i in range(0, total, BATCH_SIZE):
        batch = records[i : i + BATCH_SIZE]
        # add compressed text
        for r in batch:
            r["text_compressed"] = compress_text(r["text"])
        batch_write(batch)
        count += len(batch)
        logging.info("Written %d/%d items", count, total)

    logging.info("All done — %d items loaded into %s", total, TABLE_NAME)

main()


2025-04-30 21:06:28,484 INFO Found credentials in environment variables.
2025-04-30 21:06:28,486 INFO Reading s3://manual-ref-data/bger-2024-3-text.parquet …
2025-04-30 21:09:03,289 INFO Loaded 127477 rows.
2025-04-30 21:09:05,304 INFO Written 25/127477 items
2025-04-30 21:09:05,532 INFO Written 50/127477 items
2025-04-30 21:09:05,668 INFO Written 75/127477 items
2025-04-30 21:09:05,802 INFO Written 100/127477 items
2025-04-30 21:09:05,940 INFO Written 125/127477 items
2025-04-30 21:09:06,076 INFO Written 150/127477 items
2025-04-30 21:09:06,209 INFO Written 175/127477 items
2025-04-30 21:09:06,359 INFO Written 200/127477 items
2025-04-30 21:09:06,489 INFO Written 225/127477 items
2025-04-30 21:09:06,629 INFO Written 250/127477 items
2025-04-30 21:09:06,772 INFO Written 275/127477 items
2025-04-30 21:09:06,917 INFO Written 300/127477 items
2025-04-30 21:09:07,051 INFO Written 325/127477 items
2025-04-30 21:09:07,188 INFO Written 350/127477 items
2025-04-30 21:09:07,338 INFO Written 375

In [5]:

# ── CONFIG ───────────────────────────────────────────────────────────────
KEY_NAME   = "docref"                          # your PK name
PAGE_SIZE  = 1000                              # items per Scan page
THROTTLE_DELAY = 0.01                          # seconds between updates

# ── SETUP ────────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger()

region = os.getenv("AWS_REGION", "us-east-1")
ddb = boto3.resource("dynamodb", region_name=region)
table = ddb.Table(TABLE_NAME)

def remove_text_attribute(key_value):
    try:
        table.update_item(
            Key={KEY_NAME: key_value},
            UpdateExpression="REMOVE #t",
            ExpressionAttributeNames={"#t": "text"},
            ReturnValues="NONE"
        )
    except ClientError as e:
        log.error("Failed to remove text from %s=%s: %s", KEY_NAME, key_value, e)

def main():
    log.info("Starting scan of %s …", TABLE_NAME)
    scan_kwargs = {
        "ProjectionExpression": KEY_NAME,
        "Limit": PAGE_SIZE
    }
    done = False
    last_evaluated_key = None
    total = 0

    while not done:
        if last_evaluated_key:
            scan_kwargs["ExclusiveStartKey"] = last_evaluated_key

        resp = table.scan(**scan_kwargs)
        items = resp.get("Items", [])
        log.info("Scanned %d keys", len(items))

        for itm in items:
            key = itm[KEY_NAME]
            remove_text_attribute(key)
            total += 1
            # tiny delay to avoid hot-spotting
            time.sleep(THROTTLE_DELAY)

        last_evaluated_key = resp.get("LastEvaluatedKey")
        done = last_evaluated_key is None

    log.info("Done! Removed text from %d items.", total)

main()


2025-04-30 21:25:13,480 INFO Starting scan of manual-fed-court-decisions …
2025-04-30 21:25:13,859 INFO Scanned 151 keys
2025-04-30 21:25:33,393 INFO Scanned 147 keys
2025-04-30 21:25:54,760 INFO Scanned 162 keys
2025-04-30 21:26:18,366 INFO Scanned 136 keys
2025-04-30 21:26:38,334 INFO Scanned 175 keys
2025-04-30 21:27:02,872 INFO Scanned 158 keys
2025-04-30 21:27:24,854 INFO Scanned 170 keys
2025-04-30 21:27:48,465 INFO Scanned 135 keys
2025-04-30 21:28:06,411 INFO Scanned 159 keys
2025-04-30 21:28:28,341 INFO Scanned 145 keys
2025-04-30 21:28:47,970 INFO Scanned 162 keys
2025-04-30 21:29:11,916 INFO Scanned 158 keys
2025-04-30 21:29:31,594 INFO Scanned 159 keys
2025-04-30 21:29:51,546 INFO Scanned 159 keys
2025-04-30 21:30:12,511 INFO Scanned 153 keys
2025-04-30 21:30:32,557 INFO Scanned 149 keys
2025-04-30 21:30:52,522 INFO Scanned 148 keys
2025-04-30 21:31:14,814 INFO Scanned 155 keys
2025-04-30 21:31:36,746 INFO Scanned 169 keys
2025-04-30 21:32:00,362 INFO Scanned 148 keys
2025-