### Test Connection to Oracle Objec Storage through boto3

In [37]:
import os, sys
import logging
import json, re
from typing import Dict, List, Any
import tempfile
from pypdf import PdfReader
import ollama

from io import BytesIO


from botocore.config import Config
import boto3, base64, hashlib

boto3.set_stream_logger("botocore", "DEBUG")

config = Config(
    signature_version='s3v4',
    s3={

        "addressing_style": "path",
        "use_expect_header": False,     
        "payload_signing_enabled": True,     
        "checksum_validation": False,
    },
    
    retries={"max_attempts": 5, "mode": "standard"},
)


OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "phi4-mini:latest")
EMBED_MODEL  = os.getenv("EMBED_MODEL",  "nomic-embed-text")


# --- Logging ---
logging.basicConfig(
    filename="trigger.log",
    level=os.getenv("LOG_LEVEL", "INFO"),
    format="%(asctime)s | %(levelname)s | %(message)s",
)

sys.path.append("/home/abhi_ubuntu/oracle_cloud/storage/")

from gen_sec_key import (
    ORACLE_S3_SECRET_KEY,
    ORACLE_S3_ACCESS_KEY,
    ORACLE_REGION,
    ORACLE_INGEST_BUCKET,
    ORACLE_S3_ENDPOINT,
    ORACLE_JSON_DESTINATION_BUCKET,
)


ALLOWED_EXT = os.getenv("ALLOWED_EXT",".pdf").split(",")

# Default checkpoint path: ./_checkpoints/<ingest_bucket>.checkpoint.json
DEFAULT_CHECKPOINT_PATH = os.path.join(
    os.getcwd(),
    "_checkpoints",
    f"{ORACLE_INGEST_BUCKET}.checkpoint.json",
)
CHECKPOINT_PATH = os.getenv("CHECKPOINT_PATH", DEFAULT_CHECKPOINT_PATH)


session = boto3.session.Session(
    aws_access_key_id=ORACLE_S3_ACCESS_KEY,
    aws_secret_access_key=ORACLE_S3_SECRET_KEY,
    region_name=ORACLE_REGION,
)

s3 = session.client("s3", endpoint_url=ORACLE_S3_ENDPOINT, config=config)
# assert s3._client_config.s3["payload_signing_enabled"] is False


# def list_keys(prefix=""):
#     resp = s3.list_objects_v2(Bucket=ORACLE_INGEST_BUCKET, Prefix=prefix)
#     return [x["Key"] for x in resp.get("Contents", [])]

# if __name__ == "__main__":
#     print(list_keys())

2025-08-24 13:09:25,894 botocore.hooks [DEBUG] Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane
2025-08-24 13:09:25,894 botocore.hooks [DEBUG] Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane
2025-08-24 13:09:25,894 botocore.hooks [DEBUG] Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane
2025-08-24 13:09:25,894 botocore.hooks [DEBUG] Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane
2025-08-24 13:09:25,904 botocore.hooks [DEBUG] Changing event name from before-call.apigateway to before-call.api-gateway
2025-08-24 13:09:25,904 botocore.hooks [DEBUG] Changing event name from before-call.apigateway to before-call.api-gateway
2025-08-24 13:09:25,904 botocore.hooks [DEBUG] Changing event name from before-call.apigateway to before-call.api-gateway
2025-08-24 13:09:25,904 botocore.hooks [DEBUG] Changing event 

In [38]:
# ORACLE_S3_ENDPOINT

In [39]:
def ensure_parent_dir(path: str):
    os.makedirs(os.path.dirname(path), exist_ok=True)



def save_checkpoint_local(processed: Dict[str, Dict[str, Any]]):

    ensure_parent_dir(CHECKPOINT_PATH)
    tmp_path = CHECKPOINT_PATH + ".tmp"
    with open(tmp_path, "w", encoding="utf-8") as f:
        json.dump({"processed": processed}, f, ensure_ascii=False, indent=2)

    os.replace(tmp_path, CHECKPOINT_PATH)
    logging.debug(f"Checkpoint saved to {CHECKPOINT_PATH}")


def load_checkpoint_local():

    try:

        if not os.path.exists(CHECKPOINT_PATH):

            logging.info(f"No checkpoint found at {CHECKPOINT_PATH}; creating a new one.")
            ensure_parent_dir(CHECKPOINT_PATH)
            save_checkpoint_local({})
            return {}

        with open(CHECKPOINT_PATH, "r", encoding="utf-8") as f:

            blob = json.load(f)
        
        processed = blob.get("processed",{})
        logging.info(f"Loaded checkpoint with {len(processed)} entries from {CHECKPOINT_PATH}")
        return processed

    except Exception as e:

        print(f"Error occured in load_checkpoint: {e}")

def list_objects(bucket: str, prefix: str = ""):

    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []) or []:
            yield {
                "Key": obj["Key"],
                "ETag": str(obj["ETag"]).strip('"'),
                "Size": obj["Size"],
                "LastModified": obj["LastModified"],
            }

def should_process(key: str) -> bool:

    if key.endswith("/"):
        return False
    ext = os.path.splitext(key)[1].lower()
    return ext in ALLOWED_EXT


def derive_dest_key(src_key: str) -> str:

    base = os.path.basename(src_key)
    name, _ext = os.path.splitext(base)
    return f"{name}.json"


def read_pdf(path:str):

    '''
    Read input PDF file, pagewise, and return text :str
    '''
    out = []
    r = PdfReader(path)
    
    for page in r.pages:

        t = page.extract_text() or ""
        out.append(t)

    return "\n".join(out)

import requests

def put_object_via_presigned(s3_client, bucket: str, key: str, body: bytes, content_type: str = "application/json; charset=utf-8"):
    # Generate a presigned URL for a PUT
    url = s3_client.generate_presigned_url(
        "put_object",
        Params={"Bucket": bucket, "Key": key, "ContentType": content_type},
        ExpiresIn=3600,
    )
    # Plain HTTP PUT -> requests sets Content-Length automatically for bytes
    r = requests.put(url, data=body, headers={"Content-Type": content_type})
    r.raise_for_status()

In [40]:

SYSTEM = '''
You are a precise information extractor for Duke Energy utility bills. 
Return ONLY one JSON object that matches the provided schema. 
Use bill content only; do not guess. 
Dates must be ISO-8601 (YYYY-MM-DD). 
Money values must be numbers (e.g., 142.58). 
For missing data, use null or empty lists. 
No extra keys. No commentary.

User:
Extract the following fields and return ONLY JSON.

SCHEMA (example, not data):
{
  "schema_version": "duke.v1",
  "source_file": "<filename>",
  "account": { "number": "", "service_address": "" },
  "bill": {
    "bill_date": "YYYY-MM-DD",
    "period_start": "YYYY-MM-DD",
    "period_end": "YYYY-MM-DD",
    "due_date": "YYYY-MM-DD",
    "previous_amount_due": 0.00,
    "payment_received_date": "YYYY-MM-DD",
    "payment_received_amt": 0.00,
    "current_elec_charges": 0.00,
    "taxes_amount_usd": 0.00,
    "total_amount_due_usd": 0.00,
    "after_due_amount_usd": 0.00,
    "rate_plan": ""
  },
  "charges": [
    { "item": "", "qty": null, "unit": "", "rate_usd": null, "amount_usd": 0.00,
      "rider_code": null, "item_type": "CONNECTION|ENERGY|RIDER" }
  ],
  "usage": [
    { "meter": "", "prev_read_date": "YYYY-MM-DD", "prev_read": null,
      "curr_read_date": "YYYY-MM-DD", "curr_read": null,
      "kwh": null, "billed_kwh": null, "days": null }
  ],
  "taxes": [
    { "jurisdiction": "", "tax_name": "", "amount_usd": 0.00 }
  ],
  "notes": ""
}

TEXT PAGES:
--- PAGE 1 ---
<insert page 1 text>
--- PAGE 2 ---
<insert page 2 text>
...

TABLES (optional TSV):
--- TABLE 1 (TSV) ---
<tsv>
--- TABLE 2 (TSV) ---
<tsv>
'''


def call_ollama(model, system_msg, user_msg):
    resp = ollama.chat(
        model=model,
        messages=[{"role":"system","content":system_msg},{"role":"user","content":user_msg}],
        options={"temperature":0},
        format="json"
    )
    return resp["message"]["content"]

In [None]:


def main():

    processed = load_checkpoint_local()
    print("Loaded initial Processed Checkpoint: ",processed)

    newly_processed = 0

    print("Object from ORACLE_INGEST_BUCKET")
    for obj in list_objects(ORACLE_INGEST_BUCKET):

        key = obj["Key"]
        etag = obj["ETag"]


        if not should_process(key):
            continue

        prev = processed.get(key)
        if prev and prev.get("etag") == etag:
            continue

        dest_key = derive_dest_key(key)
        logging.info(f"Processing {key} (etag={etag}) -> s3://{ORACLE_JSON_DESTINATION_BUCKET}/{dest_key}")

        suffix = os.path.splitext(key)[1]
        with tempfile.NamedTemporaryFile(suffix=suffix, delete=True) as tmp:
            tmp_path = tmp.name
            s3.download_fileobj(ORACLE_INGEST_BUCKET, key, tmp)

            text  = read_pdf(tmp_path)

            messages =[
                {"role": "system", "content": SYSTEM},
                {"role": "user", "content": text}
            ]

            response = ollama.chat(
                model = OLLAMA_MODEL,
                format="json",
                options={"temperature": 0},
                messages = messages
            )

            print("Exracted record ............... ")
            print(type(response.message.content))

            payload = response.message.content

            if isinstance(payload, (dict, list, int, float, bool)) or payload is None:
                obj = payload
            else:
                # if it's a JSON string, parse it; if plain text, wrap it
                try:
                    obj = json.loads(str(payload))
                except json.JSONDecodeError:
                    obj = {"text": str(payload)}

            # Serialize ONCE to BYTES
            body_bytes = json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8")


            print(type(body_bytes))
            print(len(body_bytes))

            print(body_bytes)
                  
            # content_md5 = base64.b64encode(hashlib.md5(body_bytes).digest()).decode('ascii')

            # Upload with explicit byte length
            # s3.put_object(
            #     Bucket=ORACLE_JSON_DESTINATION_BUCKET,
            #     Key=str(dest_key).lstrip("/"),
            #     Body=body_bytes,                                  
            #     ContentType="application/json; charset=utf-8",
            #     ContentLength=len(body_bytes),
            #     # ContentMD5=content_md5
            # )

            put_object_via_presigned(
                s3, ORACLE_JSON_DESTINATION_BUCKET, str(dest_key).lstrip("/"), body_bytes
            )

            
            # Update local checkpoint immediately
            processed[key] = {"etag": etag, "dest_key": dest_key}
            save_checkpoint_local(processed)
            newly_processed += 1
            logging.info(f"✅ Done: {key} -> {dest_key}")
            print("done")



            



In [42]:
if __name__ == "__main__":
    main()

2025-08-24 13:09:53,328 botocore.loaders [DEBUG] Loading JSON file: /home/abhi_ubuntu/.venv/lib/python3.12/site-packages/botocore/data/s3/2006-03-01/paginators-1.json
2025-08-24 13:09:53,328 botocore.loaders [DEBUG] Loading JSON file: /home/abhi_ubuntu/.venv/lib/python3.12/site-packages/botocore/data/s3/2006-03-01/paginators-1.json
2025-08-24 13:09:53,328 botocore.loaders [DEBUG] Loading JSON file: /home/abhi_ubuntu/.venv/lib/python3.12/site-packages/botocore/data/s3/2006-03-01/paginators-1.json
2025-08-24 13:09:53,328 botocore.loaders [DEBUG] Loading JSON file: /home/abhi_ubuntu/.venv/lib/python3.12/site-packages/botocore/data/s3/2006-03-01/paginators-1.json
2025-08-24 13:09:53,337 botocore.loaders [DEBUG] Loading JSON file: /home/abhi_ubuntu/.venv/lib/python3.12/site-packages/botocore/data/s3/2006-03-01/paginators-1.sdk-extras.json


2025-08-24 13:09:53,337 botocore.loaders [DEBUG] Loading JSON file: /home/abhi_ubuntu/.venv/lib/python3.12/site-packages/botocore/data/s3/2006-03-01/paginators-1.sdk-extras.json
2025-08-24 13:09:53,337 botocore.loaders [DEBUG] Loading JSON file: /home/abhi_ubuntu/.venv/lib/python3.12/site-packages/botocore/data/s3/2006-03-01/paginators-1.sdk-extras.json
2025-08-24 13:09:53,337 botocore.loaders [DEBUG] Loading JSON file: /home/abhi_ubuntu/.venv/lib/python3.12/site-packages/botocore/data/s3/2006-03-01/paginators-1.sdk-extras.json
2025-08-24 13:09:53,343 botocore.hooks [DEBUG] Event before-parameter-build.s3.ListObjectsV2: calling handler <function set_list_objects_encoding_type_url at 0x7f2f747e09a0>
2025-08-24 13:09:53,343 botocore.hooks [DEBUG] Event before-parameter-build.s3.ListObjectsV2: calling handler <function set_list_objects_encoding_type_url at 0x7f2f747e09a0>
2025-08-24 13:09:53,343 botocore.hooks [DEBUG] Event before-parameter-build.s3.ListObjectsV2: calling handler <functio

Loaded initial Processed Checkpoint:  {}
Object from ORACLE_INGEST_BUCKET


2025-08-24 13:09:53,783 botocore.hooks [DEBUG] Event before-parse.s3.ListObjectsV2: calling handler <function _handle_200_error at 0x7f2f747e1c60>
2025-08-24 13:09:53,783 botocore.hooks [DEBUG] Event before-parse.s3.ListObjectsV2: calling handler <function _handle_200_error at 0x7f2f747e1c60>
2025-08-24 13:09:53,783 botocore.hooks [DEBUG] Event before-parse.s3.ListObjectsV2: calling handler <function _handle_200_error at 0x7f2f747e1c60>
2025-08-24 13:09:53,783 botocore.hooks [DEBUG] Event before-parse.s3.ListObjectsV2: calling handler <function _handle_200_error at 0x7f2f747e1c60>
2025-08-24 13:09:53,788 botocore.hooks [DEBUG] Event before-parse.s3.ListObjectsV2: calling handler <function handle_expires_header at 0x7f2f747e1a80>
2025-08-24 13:09:53,788 botocore.hooks [DEBUG] Event before-parse.s3.ListObjectsV2: calling handler <function handle_expires_header at 0x7f2f747e1a80>
2025-08-24 13:09:53,788 botocore.hooks [DEBUG] Event before-parse.s3.ListObjectsV2: calling handler <function 

Exracted record ............... 
<class 'str'>
<class 'bytes'>
1096
b'{\n  "schema_version": "duke.v1",\n  "source_file": "",\n  "account": {\n    "number": null,\n    "service_address": "418 S GRANT ST APT 12, BLOOMINGTON IN 47401-4772"\n  },\n  "bill": {\n    "bill_date": "2024-05-06",\n    "period_start": "2024-04-03",\n    "period_end": "2024-05-02",\n    "due_date": "2024-05-28",\n    "previous_amount_due": 42.78,\n    "payment_received_date": "2024-04-09",\n    "payment_received_amt": 42.78,\n    "current_elec_charges": 38.8,\n    "taxes_amount_usd": 2.72,\n    "total_amount_due_usd": 41.52,\n    "after_due_amount_usd": null,\n    "rate_plan": "Residential Electric Service (RS)"\n  },\n  "charges": [\n    {\n      "item": "",\n      "qty": null,\n      "unit": "",\n      "rate_usd": null,\n      "amount_usd": 38.8\n    }\n  ],\n  "usage": [\n    {\n      "meter": "318452714",\n      "prev_read_date": "2024-04-03",\n      "prev_read": 43245,\n      "curr_read_date": "2024-05-02",\

['April2024.pdf', 'December2023.pdf', 'Feb2024.pdf', 'Jan2024.pdf', 'March2024.pdf', 'Nov2023.pdf', 'October2023.pdf', 'September2023.pdf']
