### Single vectorization and event trigger

You can fetch raw files from S3 by using python connector to S3 using boto3 or using Hasura's S3 data connector. 
Advantage of using Hasura S3 connector is your data queries are secure with authentication and RBAC support out of the box.

##### Chunking

In [1]:
# Recursive character splitter: divides the input into smaller chunks in a hierarchical manner.
# If the output of the first iteration didn't produce chunks of desired size, 
# it recursively chunks with the different separator or criterion until the desired size is achieved.
from langchain.text_splitter import RecursiveCharacterTextSplitter

def chunking(text, chunk_size=1000, chunk_overlap=500, separators=["."]):
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size,
                                               chunk_overlap=chunk_overlap, 
                                               separators=separators)
    chunks = splitter.split_text(text)
    return chunks

##### Utility - GraphQL query execution

In [2]:
# write the chunks to the database
# Utility method to execute GQL queries
import os
from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
from dotenv import load_dotenv

load_dotenv()

def execute_gql(query, variable_values={}):
    transport=RequestsHTTPTransport(
        url=os.environ.get('HASURA_GRAPHQL_URL'),
        headers={'x-hasura-admin-secret': os.environ.get('HASURA_GRAPHQL_ADMIN_SECRET')},
    )
    client = Client(
        transport=transport,
    )
    query = gql(query)
    return client.execute(query, variable_values=variable_values)



##### AWS s3 utilities

In [3]:
# Function to fetch file from S3 using Boto3
import boto3
from io import BytesIO
from PyPDF2 import PdfReader

# Function to read pdf file and return text 
def read_pdf_file(f):
    pdf = PdfReader(f)
    resume_text = ""
    for page in pdf.pages:
        resume_text += page.extract_text()
        resume_text += "\n\n  "
    return resume_text

# Function to fetch list of files from S3 using bucket name and prefix
def read_list_of_files_from_bucket(bucket_name, prefix):
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    files = []
    for obj in bucket.objects.filter(Prefix=prefix):
        files.append(obj.key)
    return files

# Fetch requested file given bucket name, prefix and filename
def read_file_from_bucket(bucket_name, prefix, filename):
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    obj = bucket.Object(prefix + "/" + filename)
    fs = obj.get()['Body'].read()
    resume_text = read_pdf_file(fs)
    return resume_text

# Fetch requested file by uri (s3://<bucket_name>/<prefix>/<filename>)
def read_file_from_uri(uri):
    bucket_name = uri.split("/")[2]
    prefix = "/".join(uri.split("/")[3:-1])
    filename = uri.split("/")[-1]
    resume_text = read_file_from_bucket(bucket_name, prefix, filename)
    return resume_text


##### GQL queries

In [4]:
# Hasura GraphQL query to insert the chunks into the database
query_insert_list_of_chunk = """
mutation InsertChunks($objects: [ResumeChunks_insert_input!]!) {
  insert_ResumeChunks(objects: $objects) {
    affected_rows
  }
}
"""
# -- Test
# Comment this when not testing
list_of_chunks = {"objects":[{"application_id": "1", "chunk_id": 1, "content": "first chunk"},
						{"application_id": "1", "chunk_id": 2, "content": "second chunk"}]}
execute_gql(query_insert_list_of_chunk,
            variable_values=list_of_chunks
)


{'insert_ResumeChunks': {'affected_rows': 2}}

##### Hasura workflow and event trigger end point

In [7]:
from hasura.workflow import Step, Workflow

application_id = 4
test_workflow1 = Workflow(name="write_chunked_resume_to_db", elements=[
                Step(name="read_file", function=read_pdf_file, required_args=["f"], return_result=True),
                Step(name="chunking", function=chunking, return_result=True),
                Step(name="structure_chunks", function=lambda input_chunks: {"variable_values":{"objects":
                                                                                [ {
                                                                                    "application_id": str(application_id),
                                                                                    "chunk_id": idx+1,
                                                                                    "content": c
                                                                                    } for idx, c in enumerate(input_chunks) ]}}, 
                                                                            return_result=True),
                Step(name="write_to_db", function=execute_gql, required_args=["query"])
])  

test_workflow1.execute(f="sample.pdf", query=query_insert_list_of_chunk)

{'insert_ResumeChunks': {'affected_rows': 10}}

In [8]:
# deploy workflow for event trigger
test_workflow1.deploy()

'Successfully deployed workflow at http://'

##### Bulk vectorization

In [None]:
# Test integration with S3
# read_file_from_bucket(bucket_name="", prefix="", filename="")

In [None]:
#
test_workflow2 = Workflow(name="write_chunked_resume_to_db", elements=[
                Step(name="read_file", function=read_file_from_bucket, required_args=["bucket_name",
                                                                                      "prefix",
                                                                                      "filename"], return_result=True),
                Step(name="chunking", function=chunking, return_result=True),
                Step(name="structure_chunks", function=lambda input_chunks: {"variable_values":{"objects":
                                                                                [ {
                                                                                    "application_id": str(application_id),
                                                                                    "chunk_id": idx+1,
                                                                                    "content": c
                                                                                    } for idx, c in enumerate(input_chunks) ]}}, 
                                                                            return_result=True),
                Step(name="write_to_db", function=execute_gql, required_args=["query"])
])  

test_workflow1.execute(bucket_name="", prefix="", filename="", query=query_insert_list_of_chunk)

In [None]:
# Batch processing of files but not concurrent

bucket_name = ""
prefix = ""

for filename in read_list_of_files_from_bucket(bucket_name, prefix):
    test_workflow2.execute(bucket_name=bucket_name, 
                      prefix=prefix, 
                      filename=filename, 
                      query=query_insert_list_of_chunk)

In [None]:
# Batch processing of files with concurrency

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=10) as executor:
    for file in read_list_of_files_from_bucket(bucket_name, prefix):
        executor.submit(test_workflow2.execute, 
                        bucket_name=bucket_name, 
                        prefix=prefix, 
                        filename=file, 
                        query=query_insert_list_of_chunk)