# 02 - Cloud Build

## Setup

If you haven't already, install the toolkit and dependencies using the [Setup](./00-Setup.ipynb) notebook.

## Build

In [None]:
%reload_ext dotenv
%dotenv

import os

from graphrag_toolkit.lexical_graph import LexicalGraphIndex, set_logging_config
from graphrag_toolkit.lexical_graph.storage import GraphStoreFactory
from graphrag_toolkit.lexical_graph.storage import VectorStoreFactory
from graphrag_toolkit.lexical_graph.indexing.load import S3BasedDocs
from graphrag_toolkit.lexical_graph.indexing.build import Checkpoint

set_logging_config('INFO')

docs = S3BasedDocs(
    region='us-east-1',
    bucket_name='ccms-rag-extract-188967239867',
    key_prefix='key_prefix',
    collection_id='demo123'
)
checkpoint = Checkpoint('s3-extraction-checkpoint')

graph_store = GraphStoreFactory.for_graph_store(os.environ['GRAPH_STORE'])
vector_store = VectorStoreFactory.for_vector_store(os.environ['VECTOR_STORE'])

graph_index = LexicalGraphIndex(
    graph_store, 
    vector_store
)

graph_index.build(docs, checkpoint=checkpoint, show_progress=True)

print('Build complete')

## Build from S3 and DynamoDB

In [None]:
%reload_ext dotenv
%dotenv

import os
import boto3
from datetime import datetime, timezone
import time

from graphrag_toolkit.lexical_graph import LexicalGraphIndex, set_logging_config
from graphrag_toolkit.lexical_graph.storage import GraphStoreFactory, VectorStoreFactory
from graphrag_toolkit.lexical_graph.indexing.load import S3BasedDocs
from graphrag_toolkit.lexical_graph.indexing.build import Checkpoint

set_logging_config('INFO')

# Initialize AWS clients
dynamodb = boto3.resource('dynamodb', region_name=os.environ['AWS_REGION'])
table = dynamodb.Table(os.environ['DYNAMODB_NAME'])

# Scan all IN_PROGRESS records
try:
    response = table.scan(
        FilterExpression='#st = :status',
        ExpressionAttributeNames={'#st': 'status'},
        ExpressionAttributeValues={':status': 'IN_PROGRESS'}
    )
    items = response.get('Items', [])
    if not items:
        print("No IN_PROGRESS collections found.")
        exit(0)
except Exception as e:
    print(f"Error scanning DynamoDB: {str(e)}")
    exit(1)

# Loop over each IN_PROGRESS record
for record in items:
    collection_id = record['collection_id']
    print(f"\nProcessing collection_id: {collection_id}")

    try:
        s3_region = record['aws_region']
        s3_bucket = record['s3_bucket']
        s3_prefix = record['s3_key_prefix']
        checkpoint_name = record.get('checkpoint', 's3-extraction-checkpoint')
    except KeyError as e:
        print(f"Skipping collection_id {collection_id} — missing field: {e}")
        continue

    docs = S3BasedDocs(
        region=s3_region,
        bucket_name=s3_bucket,
        key_prefix=s3_prefix,
        collection_id=collection_id
    )
    checkpoint = Checkpoint(checkpoint_name)

    graph_store = GraphStoreFactory.for_graph_store(os.environ['GRAPH_STORE'])
    vector_store = VectorStoreFactory.for_vector_store(os.environ['VECTOR_STORE'])
    graph_index = LexicalGraphIndex(graph_store, vector_store)

    start_time = time.time()
    status = 'BUILD'
    error_message = None
    now_iso = datetime.now(timezone.utc).isoformat()

    # Update status to BUILD
    record.update({
        'status': status,
        'start_time': datetime.fromtimestamp(start_time, tz=timezone.utc).isoformat(),
        'completion_date': now_iso,
        'error_message': None
    })

    try:
        table.put_item(Item=record)
        print(f"Set collection_id {collection_id} to BUILD")
    except Exception as e:
        print(f"Error updating status to BUILD for {collection_id}: {e}")
        continue

    # Run build process
    try:
        graph_index.build(docs, checkpoint=checkpoint, show_progress=True)
        status = 'COMPLETED'
    except Exception as e:
        status = 'FAILED'
        error_message = str(e)
        print(f"Graph build failed for {collection_id}: {error_message}")

    # Update final status
    duration = int(time.time() - start_time)
    record.update({
        'status': status,
        'completion_date': datetime.now(timezone.utc).isoformat(),
        'duration': duration,
        'error_message': error_message
    })

    try:
        table.put_item(Item=record)
        print(f"Updated collection_id {collection_id} with final status: {status}")
    except Exception as e:
        print(f"Failed to update final status for {collection_id}: {e}")
