In [None]:
!gcloud auth application-default login

In [None]:
!pip install --upgrade -r requirements.txt

In [None]:
#Performing Necessary Imports
import os
import configparser
from google.cloud import storage
import vertexai
from vertexai.preview.generative_models import GenerativeModel, Part
import pandas as pd
from xlsxwriter.workbook import Workbook
from datetime import datetime
import json
import logging
import os
import time

In [None]:
# Load configuration from config.ini
config = configparser.ConfigParser()
config.read('config.ini')

# Extract values from the DEFAULT section
project_name = config['DEFAULT']['project_name']
region = config['DEFAULT']['region']
bucket_name = config['DEFAULT']['bucket_name']
sample_images_path = config['DEFAULT']['sample_images_path']

def create_bucket_if_not_exists(bucket_name, project_name, region):
    """Create a GCS bucket if it does not exist."""
    storage_client = storage.Client(project=project_name)
    bucket = storage_client.lookup_bucket(bucket_name)
    
    if bucket:
        print(f"Bucket {bucket_name} already exists.")
    else:
        bucket = storage_client.create_bucket(bucket_name, location=region)
        print(f"Bucket {bucket_name} created in region {region}.")
    
    return bucket

def delete_all_files_from_bucket(bucket_name):
    """Delete all files from the specified GCS bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # List and delete all blobs in the bucket
    blobs = bucket.list_blobs()
    for blob in blobs:
        blob.delete()
        print(f"Deleted {blob.name} from bucket {bucket_name}.")

def upload_files_to_bucket(bucket_name, folder_path):
    """Upload all files from a specified folder to the given GCS bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # Iterate over all files in the specified folder and upload them to the bucket
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        if os.path.isfile(file_path):
            blob = bucket.blob(file_name)
            blob.upload_from_filename(file_path)
            print(f"Uploaded {file_name} to bucket {bucket_name}.")

# Create the bucket if it doesn't exist
create_bucket_if_not_exists(bucket_name, project_name, region)

# Delete all existing files from the bucket
delete_all_files_from_bucket(bucket_name)

# Upload files from the sample images path to the bucket
upload_files_to_bucket(bucket_name, sample_images_path)

In [None]:
# Initialize logger
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

def is_empty_or_whitespace(s):
    """Function to check if a string is empty or consists only of whitespace."""
    return s is None or s.isspace() or not s

def delete_all_files_in_folder(folder_path):
    """Delete all files in the specified local folder."""
    if os.path.exists(folder_path):
        for file_name in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file_name)
            if os.path.isfile(file_path):
                os.remove(file_path)
                logger.info(f"Deleted local file: {file_name}")
    else:
        logger.warning(f"The folder {folder_path} does not exist.")

def generate_text_response(client, gcs_path, context, task, output_format, default_prompt_flag):
    """Generates text response based on image content using Vertex AI."""
    # Initialize Vertex AI
    vertexai.init(project=project_name_param, location=region)

    # Load the model
    multimodal_model = GenerativeModel("gemini-pro-vision")
    #multimodal_model = GenerativeModel("gemini-1.5-flash")
    

    # Default prompt setup
    default_prompt = """
        Context : Need to convert flowchart to Steps
        Task : - Generate a JSON structure representing a list of steps. Each step Desciption must have at least 10 words.
               - Additional Steps can be created to highlight relation between each steps.
               - Steps must be reference other steps if there are dependencies.
               - Each step should be a dictionary with the following keys:
                "Step number": an integer representing the step number.
                "Step Description": a string describing the step with at least 10 words.
        Format :
                [
                    {
                      "Step number":1,
                      "Step Description": "Discuss project concept with stakeholders."
                    },
                    {
                      "Step number":2,
                      "Step Description": "Do stakeholders approve of project concept?"
                    }
                ]            
    """

    # Use default prompt if necessary
    prompt = default_prompt if default_prompt_flag else "\n".join([context, task, output_format])

    # Query the model
    response = multimodal_model.generate_content(
        [
            Part.from_uri(gcs_path, mime_type="image/jpeg"),
            prompt
        ],
        generation_config={
            "max_output_tokens": 2048,
            "temperature": 0.4,
            "top_p": 1,
            "top_k": 32
        }
    )

    # Return the text part of the response
    return response.text

def process_image_file(client, bucket_name, file_name, local_file_path, writer):
    """Process an image file by generating a response and saving it to an Excel sheet."""
    sheet_name = file_name.split('.')[0][:30]
    file_name = file_name.split('/')[-1]
    logger.info(f"Processing file: {file_name} in bucket {bucket_name}")

    gcs_file_path = f"gs://{bucket_name}/{file_name}"
    local_image_path = os.path.join(local_file_path, file_name)

    # Generate the text response
    text = generate_text_response(client, gcs_file_path, context, task, output_format, default_prompt_flag)

    try:
        final_dictionary = json.loads(text) 
        df = pd.DataFrame(final_dictionary)
        df.to_excel(writer, sheet_name=sheet_name, index=False)
        workbook = writer.book
        worksheet = writer.sheets[sheet_name]

        # Formatting Excel sheet
        num_rows, num_cols = df.shape
        header_format = workbook.add_format({
            'bold': True,
            'text_wrap': True,
            'valign': 'vcenter',
            'align': 'center',
            'fg_color': '#00FFFF',
            'border': 1,
            'font_size': 12,
            'font_color': 'black'
        })

        for col_num, value in enumerate(df.columns.values):
            worksheet.write(0, col_num, value, header_format)

        # Apply cell formatting
        cell_format = workbook.add_format({'border': 1})
        worksheet.conditional_format(1, 0, num_rows, num_cols - 1, {'type': 'no_blanks', 'format': cell_format})
        worksheet.autofit()

        # Download and insert image into Excel
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(file_name)
        blob.download_to_filename(local_image_path)
        worksheet.insert_image(num_rows + 2, 0, local_image_path, {'x_offset': 15, 'y_offset': 10, 'x_scale': 0.2, 'y_scale': 0.2})
        logger.info(f"Processed file: {file_name}")

    except ValueError as e:
        logger.error(f"Error parsing JSON: {e}")
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        
    logger.info("Sleeping for 30 seconds before processing the next file...")
    time.sleep(30)
    
def image_to_task_generator(project_name_param, region, bucket_name_param, local_file_fixed_path, output_file_path, output_file_name_prefix, output_file_name_suffix, context, task, output_format):
    """Main function to process images and generate tasks."""
    client = storage.Client(project=project_name_param)
    extract_datetime = datetime.today().strftime("%Y%m%d_%H%M%S")
    output_file_name = f'{output_file_path}\\{output_file_name_prefix}_{extract_datetime}_{output_file_name_suffix}.xlsx'
    writer = pd.ExcelWriter(output_file_name, engine='xlsxwriter')

    # Clear the local directory before downloading new images
    delete_all_files_in_folder(local_file_fixed_path)

    blob_list = list(client.list_blobs(bucket_name_param))
    
    if not blob_list:
        logger.warning(f"No files found in the bucket: {bucket_name_param}")
    else:
        for blob in blob_list:
            file_name = str(blob.name)
            _, file_extension = os.path.splitext(file_name)
            if file_extension.lower() in ['.jpg', '.jpeg', '.png']:
                process_image_file(client, bucket_name_param, file_name, local_file_fixed_path, writer)
            else:
                logger.warning(f"Invalid file format for {file_name}. Only jpg and png files are supported.")

        writer.close()
        logger.info(f"Generated Output File: {output_file_name}")

def load_config():
    """Load configuration from the config.ini file."""
    config = configparser.ConfigParser()
    config.read('config.ini')
    return config['DEFAULT']

In [None]:
if __name__ == "__main__":
    try:
        config = load_config()
    
        project_name_param = config.get('project_name')
        region = config.get('region')
        bucket_name_param = config.get('bucket_name')
        local_file_fixed_path = config.get('image_file_path')
        output_file_path = config.get('output_file_path')
        log_folder_path = config.get('log_folder_path')
        output_file_name_prefix = config.get('output_file_name_prefix')
        default_prompt_flag=False
        
        # Configure the logging settings
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        logger = logging.getLogger(__name__)
        
        # Create the log folder if it doesn't exist
        os.makedirs(log_folder_path, exist_ok=True)
        
        # Create a file handler with a dynamic log file name and folder path
        log_file_name = 'image_to_task_gen_app.log'
        log_file_path = os.path.join(log_folder_path, log_file_name)
        file_handler = logging.FileHandler(log_file_path, mode='a')
        file_handler.setLevel(logging.INFO)
        
        # Create a formatter and add it to the file handler
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        
        # Add the file handler to the logger
        logger.addHandler(file_handler)    
    
        # CTF Format used in prompt designing.Copy from here during custom input
        """
        Context : Need to convert Flowchart to Steps
        
        Task : Generate a JSON structure representing a list of steps.
                Each step should be a dictionary with the following keys:
                    "Step number": an integer representing the step number.
                    "Step Description": a string describing the step.
            
        Format : 
                        [
                            {
                              "Step number":1,
                              "Step Description": "Discuss project concept with stakeholders."
                            },
                            {
                              "Step number":2,
                              "Step Description": "Do stakeholders approve of project concept?"
                            }
                        ]     
        """
        
                    
        # Uncomment the following lines for user input
        context = input("Enter the context: ")
        task = input("Enter the task description: ")
        output_format = input("Enter the output format (in JSON structure): ")
    
        if any(is_empty_or_whitespace(param) for param in [context, task, output_format]):
            logger.info("Proceeding with default_prompt")
            output_file_name_suffix = 'DefaultPrompt'
            default_prompt_flag=True
        else:
            logger.info("Proceeding with custom prompt")
            output_file_name_suffix = 'CustomPrompt'
        
        # Invoking the image_to_task_generator function with the specified parameters
        image_to_task_generator(
            project_name_param,
            region,
            bucket_name_param,
            local_file_fixed_path,
            output_file_path,
            output_file_name_prefix,
            output_file_name_suffix,
            context,
            task,
            output_format
        )
        
    except Exception as e:
        # Log the exception and any additional information you want
        logger.error(f"An error occurred: {str(e)}", exc_info=True)
    finally:
        file_handler.close()
        logging.shutdown()