In [None]:
import json
import boto3
import os

def lambda_handler(event, context):
    step_function_client = boto3.client('stepfunctions')
    state_machine_arn = os.environ['STATEMACHINEARN']
    
    # Extract relevant information from the S3 event
    s3_bucket = event['Records'][0]['s3']['bucket']['name']
    s3_object_key = event['Records'][0]['s3']['object']['key']
    
    # Prepare the input for the state machine
    step_state = {
        "s3Bucket": s3_bucket,
        "s3ObjectKey": s3_object_key
    }
    
    # Start the execution of the state machine
    response = step_function_client.start_execution(
        stateMachineArn=state_machine_arn,
        input=json.dumps(step_state)
    )
    print(json.dumps(step_state))
    return step_state#, json.dumps(response, default=str)

In [None]:
##### doc-to-json-textract LAMBDA ######
#### "Start extraction" step in step function #####
##### USING CALL_TEXTRACT FUNCTION #####
##### JUST GETTING TABLES #####


import boto3
import json

from textractcaller.t_call import call_textract, Textract_Features

def lambda_handler(event, context):
    print(json.dumps(event))
    textract_client = boto3.client('textract', region_name='us-east-1')
    s3_client = boto3.client('s3')

    # Retrieve input from the state machine
    step_state = event['Input']
    s3_bucket = step_state['s3Bucket']   # this is the initial upload bucket
    s3_document_key = step_state['s3ObjectKey']   # this is the initial upload object

    # Create the document uri:
    s3_uri_of_document = f's3://{s3_bucket}/{s3_document_key}'

    textract_json = call_textract(input_document=s3_uri_of_document, features=[Textract_Features.FORMS, Textract_Features.TABLES], boto3_textract_client = textract_client)
    
    # Save the JSON to S3
    s3_textract_json_key = s3_document_key + "_textract.json"
    s3_client.put_object(Body=json.dumps(textract_json), Bucket=s3_bucket, Key=s3_textract_json_key)

    # Update step_state with the S3 key of textractJson
    step_state = {
        "s3Bucket": s3_bucket,
        "s3ObjectKey": s3_document_key,
        "s3TextractJsonKey": s3_textract_json_key
    }

    # This returns the updated step_state back to the state machine.
    return step_state


In [None]:
##### THIS IS PARSE_TEXTRACT_JSON_OBJ_FOR_RDS LAMBDA #####
###### "Parse Textract JSON" step in step function #####

import pandas as pd
import numpy as np
import json
from trp import Document
import boto3
import datetime
import uuid
import re

def textract_to_dataframes(textract_json):
    doc = Document(textract_json)
    
    df_dict = {}
    table_counter = 1
    
    for page in doc.pages:
        for table in page.tables:
            table_rows = [[cell.text for cell in row.cells] for row in table.rows]
            df = pd.DataFrame(table_rows)
            df_dict[f'df{table_counter}'] = df
            table_counter += 1
    print(df_dict)
    return df_dict




def lambda_handler(event, context):
    print(json.dumps(event))
    s3_client = boto3.client('s3')

    # Retrieve input from the state machine
    step_state = event['Input']['Payload']
    s3_bucket = step_state['s3Bucket']   
    s3_textract_json_key = step_state['s3TextractJsonKey']
    s3_object_key = step_state['s3ObjectKey']

    # Define validation bucket
    validation_bucket = "validation-bucket---doc-parser"

    # Get the Textract JSON from S3
    textract_json_object = s3_client.get_object(Bucket=s3_bucket, Key=s3_textract_json_key)
    textract_json = json.load(textract_json_object['Body'])

    # Convert the JSON to DataFrames
    df_dict = textract_to_dataframes(textract_json)

    
    # Find first occurrence of the years 2018-2022 in s3_object_key
    match = re.search(r'2018|2019|2020|2021|2022', s3_object_key)
    year = match.group() if match else 'N/A'

    # Find school name in s3_object_key
    match = re.search(r'bixby|mcalester|altus|middleberg|middleburg|newcastle|thomas|wynona', s3_object_key.lower())
    school = match.group() if match else 'N/A'


    match = re.search(r'pre|post', s3_object_key.lower())
    pre_post = match.group() if match else 'N/A'



    ########################## Write DataFrame to "menu.csv" in validation bucket
    for df_name, df in df_dict.items():
        if df[0].str.contains('WEEK 1').any():

            # Create entree 2 empty df
            entree_2_df = pd.DataFrame()
            new_columns = ['week_num', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday']
            entree_2_df.columns = new_columns[:len(entree_2_df.columns)]

            # Remove first row
            fixed_df = df.copy().drop(df.index[0])

            # Strip extra spaces from column names and cells
            fixed_df = fixed_df.applymap(str.strip)
            fixed_df.columns = fixed_df.columns.astype(str).str.strip()
            fixed_df = fixed_df.drop(fixed_df.index[0])

            # Rename columns
            new_columns = ['week_num', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday']
            fixed_df.columns = new_columns[:len(fixed_df.columns)]

            # Replace empty strings with np.nan
            fixed_df.replace("", np.nan, inplace=True)

            # Fill missing values in 'WEEK_NUM' column
            fixed_df['week_num'].ffill(inplace=True)

            # Delete any instances of "Entrée" and the rest of the string till a digit and a colon appears
            for index, row in fixed_df.iterrows():
                for col in fixed_df.columns:
                    if pd.notnull(row[col]): 
                        fixed_df.at[index, col] = re.sub(r'Entrée[^0-9]*[0-9]:\s*', '', row[col])

            # Remove rows that don't contain any alphanumeric characters in the weekday columns
            weekday_columns = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday']
            fixed_df = fixed_df[fixed_df[weekday_columns].apply(lambda row: row.astype(str).str.strip().str.contains('[A-Za-z0-9]').any(), axis=1)]
            


            def splitDataFrameList(df,target_column,separator):
                ''' df = dataframe to split,
                target_column = the column containing the values to split
                separator = the symbol used to perform the split
                returns: a dataframe with each entry for the target column separated, with each element moved into a new row. 
                The values in the other columns are duplicated across the newly divided rows.
                '''
                def splitListToRows(row,row_accumulator,target_column,separator):
                    split_row = row[target_column].split(separator)
                    for s in split_row:
                        new_row = row.to_dict()
                        new_row[target_column] = s
                        row_accumulator.append(new_row)
                new_rows = []
                df.apply(splitListToRows,axis=1,args = (new_rows,target_column,separator))
                new_df = pd.DataFrame(new_rows)
                return new_df

            # filter out duplicates if there are only three weeks of data in the df
            if not fixed_df['week_num'].isin(["4"]).any() and not fixed_df['week_num'].isin(["5"]).any() and not fixed_df['week_num'].isin(["6"]).any():
                fixed_df = splitDataFrameList(fixed_df, 'monday', ' ')
                fixed_df = splitDataFrameList(fixed_df, 'tuesday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'wednesday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'thursday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'friday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9]]
                fixed_df = fixed_df.reset_index(drop=True)

            # filter out duplicates if there are four weeks of data in the df
            elif not fixed_df['week_num'].isin(["5"]).any() and not fixed_df['week_num'].isin(["6"]).any():
                fixed_df = splitDataFrameList(fixed_df, 'monday', ' ')
                fixed_df = splitDataFrameList(fixed_df, 'tuesday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'wednesday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'thursday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'friday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13]]
                fixed_df = fixed_df.reset_index(drop=True)

            # filter out duplicates if there are five weeks of data in the df
            elif not fixed_df['week_num'].isin(["6"]).any():
                fixed_df = splitDataFrameList(fixed_df, 'monday', ' ')
                fixed_df = splitDataFrameList(fixed_df, 'tuesday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13, 14, 17]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'wednesday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13, 14, 17]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'thursday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13, 14, 17]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'friday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13, 14, 17]]
                fixed_df = fixed_df.reset_index(drop=True)

            # filter out duplicates if there are six weeks of data in the df
            else:
                fixed_df = splitDataFrameList(fixed_df, 'monday', ' ')
                fixed_df = splitDataFrameList(fixed_df, 'tuesday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'wednesday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'thursday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21]]
                fixed_df = fixed_df.reset_index(drop=True)
                fixed_df = splitDataFrameList(fixed_df, 'friday', ' ')
                fixed_df = fixed_df.iloc[[0, 1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 21]]
                fixed_df = fixed_df.reset_index(drop=True)

            # Add new 'Entree' column
            fixed_df['entree'] = np.where(fixed_df.index % 2 == 0, '1', '2')

            # Add new 'Year' column
            fixed_df['year'] = year

            # Add new 'School' column
            fixed_df['school'] = school
            
            # Add new 'Pre_Post' column
            fixed_df['pre_post'] = pre_post

            # Add new 'uuid' column
            fixed_df['uuid'] = [uuid.uuid4() for i in range(len(fixed_df))]

############## END OF ADDING COLUMNS ###################


            csv_string = fixed_df.to_csv(index=False)
            # create the menu.csv key with s3ObjectKey
            menu_key = f"{s3_object_key}_menu.csv"
            s3_client.put_object(Body=csv_string, Bucket=validation_bucket, Key=menu_key)
            step_state["menuCsvKey"] = menu_key
            print(csv_string)
            break

    #################### Write DataFrame to "food.csv" in validation bucket
    for df_name, df in df_dict.items():
        if df[0].str.contains('Entrees').any():
            # Strip extra spaces from column names and cells
            df = df.applymap(str.strip)
            df.columns = df.columns.astype(str).str.strip()

            # Set column names to first row's values
            df.columns = df.iloc[0]
            df = df[1:]

            # rename columns
            new_columns = ['entree', 'veg', 'fruit', 'grain']
            df.columns = new_columns[:len(df.columns)]

            # Add new 'Year' column
            df['year'] = year

            # Add new 'School' column
            df['school'] = school
                
            # Add new 'Pre_Post' column
            df['pre_post'] = pre_post

            # Add new 'uuid' column
            df['uuid'] = [uuid.uuid4() for i in range(len(df))]

            csv_string = df.to_csv(index=False)
            # create the food.csv key with s3ObjectKey
            food_key = f"{s3_object_key}_food.csv"
            s3_client.put_object(Body=csv_string, Bucket=validation_bucket, Key=food_key)
            step_state["foodCsvKey"] = food_key
            print(csv_string)
            break


    ################# Write DataFrame to "choices.csv" in validation bucket
    for df_name, df in df_dict.items():
        if df[0].str.contains("# of entrée choices daily").any():
            # Strip extra spaces from column names and cells
            df = df.applymap(str.strip)
            df.columns = df.columns.astype(str).str.strip()
            
            # Set column names to first row's values
            df.columns = df.iloc[0]
            df = df[1:]

            # renaming columns:
            new_columns = ["num_entree", "num_veg", "num_fruit"]
            df.columns = new_columns[:len(df.columns)]

            # Add new 'Year' column
            df['year'] = year

            # Add new 'School' column
            df['school'] = school

            # Add new 'Pre_Post' column
            df['pre_post'] = pre_post

            # Add new 'uuid' column
            df['uuid'] = [uuid.uuid4() for i in range(len(df))]

            csv_string = df.to_csv(index=False)
            # create the choices.csv key with s3ObjectKey
            choices_key = f"{s3_object_key}_choices.csv"
            s3_client.put_object(Body=csv_string, Bucket=validation_bucket, Key=choices_key)
            step_state["choicesCsvKey"] = choices_key
            print(csv_string)
            break



    # Add validation bucket to step_state
    step_state["validationBucket"] = validation_bucket

    print(json.dumps(step_state, default=str))
    # This returns the updated step_state back to the state machine.
    return step_state

In [None]:
####### "Validate table" step in step function #####
####### THIS IS table_validator LAMBDA #####


##### (not inserted in lambda function yet) #####

import pandas as pd
import json
import boto3
import datetime
from io import StringIO
import re

def lambda_handler(event, context):
    step_state = event['Input']['Payload']
    s3_client = boto3.client('s3')

    # Retrieve input from the state machine
    validation_bucket = step_state['validationBucket']
    validation_key = 'validation.csv'  


    try:
        # Try to load existing validation data from S3
        validation_object = s3_client.get_object(Bucket=validation_bucket, Key=validation_key)
        validation_df = pd.read_csv(validation_object['Body'])
    except s3_client.exceptions.NoSuchKey:
        # If validation.csv does not exist yet, create an empty DataFrame
        validation_df = pd.DataFrame(columns=["Timestamp", "MenuIsValid", "FoodIsValid", "ChoicesIsValid"])





    ########## MENU validation ###############


    try:
        menu_csv_key = step_state['menuCsvKey']
        menu_csv_object = s3_client.get_object(Bucket=validation_bucket, Key=menu_csv_key)
        menu_df = pd.read_csv(menu_csv_object['Body'])

        if menu_df.shape[0] >= 1 and menu_df.shape[1] == 11:
            step_state["menuIsValid"] = True
        else:
            step_state["menuIsValid"] = False

    except KeyError:
        step_state["menuIsValid"] = False
        print("MenuCsvKeyError")

    
    ########## FOOD validation ###############

    try:
        food_csv_key = step_state['foodCsvKey']
        food_csv_object = s3_client.get_object(Bucket=validation_bucket, Key=food_csv_key)
        food_df = pd.read_csv(food_csv_object['Body'])

        if food_df.shape[0] >= 1 and food_df.shape[1] == 8 and food_df.iloc[0].astype(str).str.contains(r'[a-zA-Z0-9]').any():
            step_state["foodIsValid"] = True
        else:
            step_state["foodIsValid"] = False

    except KeyError:
        step_state["foodIsValid"] = False
        print("FoodCsvKeyError")



    ########## CHOICES validation ###############

    try:
        choices_csv_key = step_state['choicesCsvKey']
        choices_csv_object = s3_client.get_object(Bucket=validation_bucket, Key=choices_csv_key)
        choices_df = pd.read_csv(choices_csv_object['Body'])

        if choices_df.shape[0] == 1 and choices_df.shape[1] == 7:
            step_state["choicesIsValid"] = True
        else:
            step_state["choicesIsValid"] = False

    except KeyError:
        step_state["choicesIsValid"] = False
        print("ChoicesCsvKeyError")

    # Add new validation results
    new_results = {
        "Timestamp": datetime.datetime.now(),
        "MenuIsValid": step_state["menuIsValid"],
        "FoodIsValid": step_state["foodIsValid"],
        "ChoicesIsValid": step_state["choicesIsValid"]
    }

    new_results_df = pd.DataFrame([new_results])

    validation_df = pd.concat([validation_df, new_results_df])

    # Save the updated validation_df back to S3
    csv_buffer = StringIO()
    validation_df.to_csv(csv_buffer, index=False)
    s3_client.put_object(Body=csv_buffer.getvalue(), Bucket=validation_bucket, Key=validation_key)

    # put CSV keys in step_state
    step_state["validationCsvKey"] = validation_key
    step_state["menuCsvKey"] = menu_csv_key
    step_state["foodCsvKey"] = food_csv_key
    step_state["choicesCsvKey"] = choices_csv_key
    


    return step_state

In [None]:
####### INSERT-TO-PROD LAMBDA #########
#######  This is INSERT TO PROD Step function step ######



import pandas as pd
import json
import boto3
import datetime
from io import StringIO
import re

def lambda_handler(event, context):
    step_state = event['Input']['Payload']
    s3_client = boto3.client('s3')

    # Retrieve input from the state machine

    validation_bucket = step_state['validationBucket']
    choices_csv_key = step_state['choicesCsvKey']
    food_csv_key = step_state['foodCsvKey']
    menu_csv_key = step_state['menuCsvKey']
    validation_key = step_state['validationCsvKey']

    step_state['productionBucket'] = 'prod-bucket---------doc-parser'
    production_bucket = step_state['productionBucket']



######## APPEND TO PRODUCTION BUCKET or CREATE IF DOESN'T EXIST #########

    ########## MENU append ###############

    if step_state["menuIsValid"] == True:

        try:
            # Try to load existing production data from S3
            prod_menu_object = s3_client.get_object(Bucket=production_bucket, Key="prod_menu_table.csv")
            prod_menu_df = pd.read_csv(prod_menu_object['Body'])
        except s3_client.exceptions.NoSuchKey:
            # If production.csv does not exist yet, create an empty DataFrame
            prod_menu_df = pd.DataFrame(columns=["week_num", "monday", "tuesday", "wednesday", "thursday", "friday", "entree", "year", "school", "pre_post", "uuid"])

        menu_df = pd.read_csv(s3_client.get_object(Bucket=validation_bucket, Key=menu_csv_key)['Body'])
        prod_menu_df = pd.concat([prod_menu_df, menu_df])

        # reset the index, dropping the old one (bc concat keeps the old index)
        prod_menu_df = prod_menu_df.reset_index(drop=True)

        # Save the updated prod_menu_df back to S3
        csv_buffer = StringIO()
        prod_menu_df.to_csv(csv_buffer, index=False)
        s3_client.put_object(Body=csv_buffer.getvalue(), Bucket=production_bucket, Key="prod_menu_table.csv")

        # Save the updated prod_menu_df back to S3
        csv_buffer = StringIO()
        prod_menu_df.to_csv(csv_buffer, index=False)
        response = s3_client.put_object(Body=csv_buffer.getvalue(), Bucket=production_bucket, Key="prod_menu_table.csv")

        # if successful, delete menu_csv_key from validation bucket
        if "ETag" in response:
            s3_client.delete_object(Bucket=validation_bucket, Key=menu_csv_key)
        else:
            print("Error while saving to production bucket. Did not delete from validation bucket.")



    ########## FOOD append ###############

    if step_state["foodIsValid"] == True:

        try:
            # Try to load existing production data from S3
            prod_food_object = s3_client.get_object(Bucket=production_bucket, Key="prod_food_table.csv")
            prod_food_df = pd.read_csv(prod_food_object['Body'])
        except s3_client.exceptions.NoSuchKey:
            # If production.csv does not exist yet, create an empty DataFrame
            prod_food_df = pd.DataFrame(columns=["entree", "veg", "fruit", "grain", "year", "school", "pre_post", "uuid"])

        food_df = pd.read_csv(s3_client.get_object(Bucket=validation_bucket, Key=food_csv_key)['Body'])
        prod_food_df = pd.concat([prod_food_df, food_df])

        # reset the index, dropping the old one (bc concat keeps the old index)
        prod_food_df = prod_food_df.reset_index(drop=True)

        # Save the updated prod_food_df back to S3
        csv_buffer = StringIO()
        prod_food_df.to_csv(csv_buffer, index=False)
        response = s3_client.put_object(Body=csv_buffer.getvalue(), Bucket=production_bucket, Key="prod_food_table.csv")

        # if successful, delete food_csv_key from validation bucket
        if "ETag" in response:
            s3_client.delete_object(Bucket=validation_bucket, Key=food_csv_key)
        else:
            print("Error while saving to production bucket. Did not delete from validation bucket.")



    ########## CHOICES append ###############
    if step_state["choicesIsValid"] == True:

        try:
            #try to load existing production data from S3
            prod_choices_object = s3_client.get_object(Bucket=production_bucket, Key="prod_choices_table.csv")
            prod_choices_df = pd.read_csv(prod_choices_object['Body'])
        except s3_client.exceptions.NoSuchKey:
            # If production.csv does not exist yet, create an empty DataFrame
            prod_choices_df = pd.DataFrame(columns=["num_entree", "num_veg", "num_fruit", "year", "school", "pre_post", "uuid"])

        choices_df = pd.read_csv(s3_client.get_object(Bucket=validation_bucket, Key=choices_csv_key)['Body'])
        prod_choices_df = pd.concat([prod_choices_df, choices_df])

        # reset the index, dropping the old one (bc concat keeps the old index)
        prod_choices_df = prod_choices_df.reset_index(drop=True)

        # Save the updated prod_choices_df back to S3
        csv_buffer = StringIO()
        prod_choices_df.to_csv(csv_buffer, index=False)
        response = s3_client.put_object(Body=csv_buffer.getvalue(), Bucket=production_bucket, Key="prod_choices_table.csv")

        # if successful, delete choices_csv_key from validation bucket
        if "ETag" in response:
            s3_client.delete_object(Bucket=validation_bucket, Key=choices_csv_key)
        else:
            print("Error while saving to production bucket. Did not delete from validation bucket.")

