In [None]:
import os
import re
import base64
import json
import time
from io import BytesIO
from typing import Dict, Any, Optional

import pandas as pd
from flask import Flask, request, jsonify, current_app
from flask_cors import CORS
from PIL import Image
import google.generativeai as genai
from google.generativeai.types import HarmCategory, HarmBlockThreshold

app = Flask(__name__)
CORS(app)


# what this code does
# this takes images pass it to gemini api and extract the raw text from gemini then its prcesses the text using regex,
# map abbr to their full meaning 
# next part:- currently trying to put write icd code for provisional diagnosis 
# ////////////////////////////////////////////

# instructions of using this code >>>>>>>>>>>>>>>>>>>>>>>>
# change your path here 
# Configuration
JSON_FILE_PATH = r"C:\Users\Admin\Downloads\bajaj_project_github\output.json"
EXCEL_FILE_PATH = r"C:\Users\Admin\Downloads\bajaj_project_github\OUTPUTS\output_file.xlsx"
ICD_CODE_JSON_PATH = r'C:\Users\Admin\Downloads\output (1).json'
API_KEY = "AIzaSyCQrYGVRTNivr4Dh_xhJLkVovy6kDEFhKY"

# Global variable to store the ICD data
icd_data = None

def load_icd_codes(file_path: str) -> None:
    """
    Load ICD codes from JSON file into a global dictionary.
    """
    global icd_data
    try:
        with open(file_path, 'r') as json_file:
            icd_data = json.load(json_file)
        current_app.logger.info(f"Successfully loaded ICD codes from {file_path}")
    except Exception as e:
        current_app.logger.error(f"Error loading ICD codes: {str(e)}")
        icd_data = None

def get_icd_code(provisional_diagnosis: str) -> Optional[str]:
    """
    Get the ICD code for a given provisional diagnosis.
    """
    global icd_data
    try:
        if icd_data is None:
            current_app.logger.error("ICD data is not initialized.")
            return None

        diagnosis_lower = provisional_diagnosis.lower()
        for medical_name, icd_code in icd_data.items():
            if diagnosis_lower in medical_name.lower():
                current_app.logger.info(f"Found ICD code '{icd_code}' for diagnosis '{provisional_diagnosis}'")
                return icd_code
        
        current_app.logger.warning(f"No matching ICD code found for diagnosis '{provisional_diagnosis}'")
        return None
    except Exception as e:
        current_app.logger.error(f"Error in get_icd_code: {str(e)}")
        return None

def process_local_image(image_path: str) -> bytes:
    """
    Process a local image file and return its bytes.
    """
    try:
        with Image.open(image_path) as image:
            if image.mode not in ('RGB', 'L'):
                image = image.convert('RGB')
            
            buffered = BytesIO()
            image.save(buffered, format="JPEG", quality=95, optimize=True)
            image_bytes = buffered.getvalue()
            
            if len(image_bytes) == 0:
                raise ValueError("Processed image is empty")
                
            return image_bytes
    except IOError as e:
        raise IOError(f"Error opening image: {str(e)}")
    except Exception as e:
        raise Exception(f"Unexpected error processing image: {str(e)}")

def clean_text(text: str) -> str:
    """
    Clean and format the extracted text.
    """
    # Existing cleaning rules
    text = re.sub(r'\b-\b', '-', text)  # Keep '-' as is for days
    text = re.sub(r"\b'\b|\B'\b|\b'\B", "", text)
    text = re.sub(r"\\(\w+)\\", r"\1", text)
    text = re.sub(r'\s\?\s', ' ', text)
    text = re.sub(r'\b(?:age|a\.g|ag|age\.)\b', 'age', text, flags=re.IGNORECASE)
    text = re.sub(r'\b(?:on|o\.n|on\.|o)\b', 'on', text, flags=re.IGNORECASE)
    text = re.sub(r'/\s*\(per\)', '', text)
    text = re.sub(r'\bc/0\b|\bclo\b|\bc/on\b|\bcLo\b|\bcho\b', 'Complains of', text, flags=re.IGNORECASE)
    text = re.sub(r'to\s*\(telephone order\)', 'to', text, flags=re.IGNORECASE)
    text = re.sub(r'AGE\s*\(acute gastro enteritis\)', 'age', text, flags=re.IGNORECASE)
    text = re.sub(r'\bckd\b|\bCKD\b', 'CKD chronic kidney disease', text, flags=re.IGNORECASE)
    
    # New abbreviation mappings
    abbreviations = {
        "T2DM": "Type 2 Diabetes Mellitus",
        "CABG": "Coronary Artery Bypass Grafting",
        "ACS": "Acute Coronary Syndrome",
        "MI": "Myocardial Infarction",
        "CAG": "Coronary Angiography",
        "S/P": "Status Post",
        "USG": "Ultrasound",
        "LSCS": "Lower Segment Cesarean Section",
        "LE": "Lower Extremity",
        "MHD": "Maintenance Hemodialysis",
        "HTN": "Hypertension",
        "UTI": "Urinary Tract Infection",
        "CG": "Coronary Graft",
        "IOL": "Intraocular Lens",
        "HT": "Hypertensive",
        "CAD": "Coronary Artery Disease"
    }
    
    for abbr, full_form in abbreviations.items():
        text = re.sub(r'\b' + abbr + r'\b', full_form, text)
    
    # Handle arrow symbols
    text = text.replace("↑", "increased ")
    text = text.replace("↓", "decreased ")
    text = text.replace("→", "leads to ")
    
    return text

def analyze_document_from_path(image_path: str) -> Dict[str, Any]:
    """
    Analyze the document from the given image path and return extracted data.
    """
    try:
        genai.configure(api_key=API_KEY)

        generation_config = {
            "temperature": 0.1,
            "top_p": 1,
            "top_k": 32,
            "max_output_tokens": 2048,
        }

        safety_settings = [
            {"category": HarmCategory.HARM_CATEGORY_HATE_SPEECH, "threshold": HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE},
            {"category": HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, "threshold": HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE},
            {"category": HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, "threshold": HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE},
            {"category": HarmCategory.HARM_CATEGORY_HARASSMENT, "threshold": HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE},
        ]

        model = genai.GenerativeModel(
            model_name="gemini-1.5-flash",
            generation_config=generation_config,
            safety_settings=safety_settings
        )

        image_data = process_local_image(image_path)
        image_part = {"mime_type": "image/jpeg", "data": base64.b64encode(image_data).decode('utf-8')}

        prompt_template = """Analyze this image and extract all visible text and information. 
        Also then according to values present in the document analyze if provided details 
        conflict/be incongruous/incongruous each other return boolean true or false accordingly in forged field.
        default value of forged is false 
        also provide the reason in reason section for both. 
        Return it in the following JSON format:
        
        {
            "Treating_Doctor_Name": "",
            "Treating_Doctor_Contact_Number": "",
            "Nature_Of_Illness_Disease": "",
            "Nature_Of_Illness_Presenting_Complaint": "",
            "Relevant_Critical_Findings": "",
            "Duration_Of_Present_Ailment_Days": "",
            "Date_Of_First_Consultation": "",
            "Past_History_Of_Present_Ailment": "",
            "Provisional_Diagnosis": "",
            "Provisional_Diagnosis_ICD10_Code": "",
            "Medical_Management": false,
            "Surgical_Management": false,
            "Intensive_Care": false,
            "Investigation": false,
            "Non_Allopathic_Treatment": false,
            "Route_Of_Drug_Administration": "",
            "Surgical_Details_Name_Of_Surgery": "",
            "Surgical_Details_ICD10_PCS_Code": "",
            "Other_Treatment_Details": "",
            "How_Did_Injury_Occur": "",
            "Is_RTA": false,
            "Date_Of_Injury": "",
            "Report_To_Police": false,
            "FIR_No": "",
            "Substance_Abuse": false,
            "Test_Conducted": false,
            "Test_Conducted_Report_Attached": false,
            "Maternity_G": false,
            "Maternity_P": false,
            "Maternity_L": false,
            "Maternity_A": false,
            "Expected_Date_Of_Delivery": "",
            "Forged": false,
            "Reason": "",
            "Additional_Information": {}
        }
        
        If you find any information that doesn't fit into the predefined fields, 
        include it in the "Additional_Information" field with appropriate key-value pairs.
        """

        prompt_parts = [prompt_template, image_part]

        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = model.generate_content(prompt_parts)
                if not response or not response.text:
                    raise ValueError("Empty response from model")

                response_text = response.text.strip()
                current_app.logger.debug(f"Raw response from model: {response_text}")

                json_match = re.search(r'(\{.*\})', response_text, re.DOTALL)
                if not json_match:
                    raise ValueError("No valid JSON found in the response")

                json_content = json_match.group(1)
                cleaned_json = clean_text(json_content)
                current_app.logger.debug(f"Cleaned JSON: {cleaned_json}")

                parsed_json = json.loads(cleaned_json)
                
                # Add ICD code lookup
                if 'Provisional_Diagnosis' in parsed_json:
                    icd_code = get_icd_code(parsed_json['Provisional_Diagnosis'])
                    if icd_code:
                        parsed_json['Provisional_Diagnosis_ICD10_Code'] = icd_code
                    else:
                        parsed_json['Provisional_Diagnosis_ICD10_Code'] = "No matching ICD code found"

                return parsed_json

            except json.JSONDecodeError as je:
                current_app.logger.error(f"JSON decode error on attempt {attempt + 1}: {str(je)}")
                if attempt == max_retries - 1:
                    raise ValueError(f"Failed to parse JSON after {max_retries} attempts: {str(je)}")

            except Exception as e:
                current_app.logger.error(f"Error on attempt {attempt + 1}: {str(e)}")
                if attempt == max_retries - 1:
                    raise Exception(f"Failed after {max_retries} attempts: {str(e)}")
                time.sleep(2 ** attempt)

    except Exception as e:
        current_app.logger.error(f"Error in analyzing document: {str(e)}")
        raise Exception(f"Error in analyzing document: {str(e)}")

def update_excel_with_new_data(new_data: Dict[str, Any]) -> bool:
    """
    Update the Excel file with new data.
    """
    try:
        if os.path.exists(EXCEL_FILE_PATH):
            existing_df = pd.read_excel(EXCEL_FILE_PATH)
        else:
            existing_df = pd.DataFrame()

        new_df = pd.json_normalize(new_data)
        updated_df = pd.concat([existing_df, new_df], ignore_index=True)
        updated_df.to_excel(EXCEL_FILE_PATH, index=False)
        current_app.logger.info("Excel file updated successfully.")
        return True
    except Exception as e:
        current_app.logger.error(f"Error updating Excel: {str(e)}")
        return False

@app.before_first_request
def initialize_icd_codes():
    """
    Load ICD codes before the first request is processed.
    """
    load_icd_codes(ICD_CODE_JSON_PATH)

@app.route('/process_image', methods=['POST'])
def process_image():
    """
    Process the image and return extracted data.
    """
    try:
        current_app.logger.debug("Request Headers: %s", request.headers)
        current_app.logger.debug("Content-Type: %s", request.content_type)

        data = request.get_json()
        current_app.logger.debug("Received JSON data: %s", data)

        if not data or 'image_path' not in data:
            current_app.logger.error("Error: No image path provided in the request.")
            return jsonify({'error': 'No image path provided'}), 400

        image_path = data['image_path']
        if not isinstance(image_path, str) or not os.path.exists(image_path):
            current_app.logger.error("Error: Invalid image path or file does not exist.")
            return jsonify({'error': 'Invalid image path or file does not exist'}), 400

        current_app.logger.info(f"Processing image from path: {image_path}")
        extracted_data = analyze_document_from_path(image_path)
        current_app.logger.debug(extracted_data)

        with open(JSON_FILE_PATH, 'w', encoding='utf-8') as json_file:
            json.dump(extracted_data, json_file, indent=4)

        excel_update_success = update_excel_with_new_data(extracted_data)

        return jsonify(extracted_data)

    except Exception as e:
        current_app.logger.error(f"Error during image processing: {str(e)}")
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True, use_reloader=False, port=5000)

 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
[2024-10-12 06:14:32,566] INFO in 4182899617: Successfully loaded ICD codes from C:\Users\Admin\Downloads\output (1).json
[2024-10-12 06:14:32,569] DEBUG in 4182899617: Request Headers: Content-Type: application/json
User-Agent: PostmanRuntime/7.42.0
Accept: */*
Postman-Token: 62284379-e185-4443-9d24-5b0a75fb9fef
Host: 127.0.0.1:5000
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
Content-Length: 112


[2024-10-12 06:14:32,570] DEBUG in 4182899617: Content-Type: application/json
[2024-10-12 06:14:32,573] DEBUG in 4182899617: Received JSON data: {'image_path': 'C:\\Users\\Admin\\Downloads\\PS2_Level_2 Dataset\\PS2 Level 2 Dataset\\sample_43.png'}
[2024-10-12 06:14:32,576] INFO in 4182899617: Processing image from path: C:\Users\Admin\Downloads\PS2_Level_2 Dataset\PS2 Level 2 Dataset\sample_43.png
[2024-10-12 06:14:37,145] DEBUG in 4182899617: Raw response from model: ```json
{
    "Treating_Doctor_Name": null,
    "Treat