In [None]:
import boto3
lambda_client = boto3.client("lambda")
s3_client = boto3.client("s3")

In [None]:
import json

create_corpus_arn = ""

def create_corpus(name, s3_uri):

  json_data = { 
    "CorpusName": name,
    "S3Uri": s3_uri
  }
  
  payload = json.dumps({ "body": json.dumps(json_data) })

  response = lambda_client.invoke(
      FunctionName='',
      InvocationType='RequestResponse',
      Payload=payload
  )

  print(response)
  
  json_obj = json.loads(response['Payload'].read())
  data = json.loads(json_obj['body'])
  return data

In [None]:
import time

sfn_client=boto3.client('stepfunctions')
def wait_for_sfn_sm(sm_execution_arn):
    status = 'RUNNING'
    while status == 'RUNNING':
        response = sfn_client.describe_execution(executionArn=sm_execution_arn)
        status = response.get('status')
        if status == 'RUNNING':
            time.sleep(15)
        
    return status

In [None]:
def s3_bucket_keys(s3_client, bucket_name, bucket_prefix):
    """Generator for listing S3 bucket keys matching prefix"""

    kwargs = {'Bucket': bucket_name, 'Prefix': bucket_prefix}
    while True:
        resp = s3_client.list_objects_v2(**kwargs)
        for obj in resp['Contents']:
            yield obj['Key']

        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break

In [None]:
def delete_corpus(corpus_id):
  payload = json.dumps( { "body": "{ \"CorpusId\": \"" + corpus_id + "\" }" } )

  print(payload)
  response = lambda_client.invoke(
      FunctionName='',
      InvocationType='RequestResponse',
      Payload=payload
  )

  print(response)

In [None]:
s3_uris = []

universe_bucket=""
bucket_prefix = "mkduc-01/documents/"

for key in s3_bucket_keys(s3_client=s3_client, bucket_name=universe_bucket, bucket_prefix=bucket_prefix):
    folder = key.rsplit("/", 1)[0]
    s3_uris.append(f"s3://{universe_bucket}/{folder}/")

s3_uris = list(set(s3_uris))

In [None]:
import re

sms = []
count = 0
max_count = 1000

filter_names = []
for s3_uri in s3_uris:
    m=re.match(r".+\/id=(\w+)\/", s3_uri)
    if m:
        name = f"mkduc01-{m[1]}"
        if filter_names and name not in filter_names:
            continue
        response = create_corpus(name=name, s3_uri=s3_uri)
        sms.append( (name, response['CorpusStateMachine'],  response['CorpusId'], s3_uri) )
        count += 1
    if count >= max_count:
        break
    
print(f"Fast Corpus State Machines running count: {len(sms)}")

In [None]:
create_corpus_failed = []
for name, sm, corpus_id, s3_uri in sms:
    status = wait_for_sfn_sm(sm_execution_arn=sm)
    if status != "SUCCEEDED":
        delete_corpus(corpus_id=corpus_id)
        create_corpus_failed.append((name, s3_uri))

if create_corpus_failed:
    print(f"Fast Corpus Failed: {create_corpus_failed}")

In [None]:

while len(create_corpus_failed) > 0:
    sms.clear()
    for name, s3_uri in create_corpus_failed:
        response = create_corpus(name=name, s3_uri=s3_uri)
        sms.append( (name, response['CorpusStateMachine'],  response['CorpusId'], s3_uri) )
        time.sleep(60)

    create_corpus_failed.clear()
    for name, sm, corpus_id, s3_uri in sms:
        status = wait_for_sfn_sm(sm_execution_arn=sm)
        if status != "SUCCEEDED":
            delete_corpus(corpus_id=corpus_id)
            create_corpus_failed.append((name, s3_uri))

    if create_corpus_failed:
        print(f"Fast corpus Failed: {create_corpus_failed}")