In [None]:
import torch
from transformers import TrOCRProcessor, VisionEncoderDecoderModel
from PIL import Image
import time
from typing import List, Dict
import torch.cuda as cuda

class HandwrittenOCR:
    def __init__(self):
        """Initialize the TrOCR model for handwritten text recognition"""
        self.model_name = "microsoft/trocr-base-handwritten"

        # Set up device
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        if self.device.type == 'cuda':
            print(f"Using GPU: {torch.cuda.get_device_name(0)}")
            print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB")
        else:
            print("GPU not available, using CPU")

        # Load model and processor
        print(f"Loading {self.model_name}...")
        self.processor = TrOCRProcessor.from_pretrained(self.model_name)
        self.model = VisionEncoderDecoderModel.from_pretrained(self.model_name)

        # Move model to GPU if available
        self.model.to(self.device)

        # Optional: Enable half precision for faster processing on GPU
        if self.device.type == 'cuda':
            self.model.half()  # Convert to FP16

    def preprocess_image(self, image: Image) -> torch.Tensor:
        """Preprocess image for model input"""
        # Convert to RGB if needed
        if image.mode != 'RGB':
            image = image.convert('RGB')

        # Process image
        pixel_values = self.processor(image, return_tensors="pt").pixel_values

        # Move to GPU and convert to half precision if using GPU
        pixel_values = pixel_values.to(self.device)
        if self.device.type == 'cuda':
            pixel_values = pixel_values.half()

        return pixel_values

    def process_single_image(self, image_path: str) -> Dict:
        """
        Process a single image and return the results
        """
        try:
            start_time = time.time()

            # Load and preprocess image
            image = Image.open(image_path)
            pixel_values = self.preprocess_image(image)

            # Generate text
            with torch.no_grad():  # Disable gradient calculation
                generated_ids = self.model.generate(pixel_values)
                text = self.processor.batch_decode(generated_ids, skip_special_tokens=True)[0]

            process_time = time.time() - start_time

            return {
                'text': text.strip(),
                'processing_time': process_time,
                'status': 'success'
            }

        except Exception as e:
            return {
                'text': f"Error: {str(e)}",
                'processing_time': 0,
                'status': 'error'
            }

    def process_batch(self, image_paths: List[str], batch_size: int = 4) -> Dict:
        """
        Process multiple images in batches
        """
        results = {}

        # Process images in batches
        for i in range(0, len(image_paths), batch_size):
            batch_paths = image_paths[i:i + batch_size]
            batch_images = []
            valid_paths = []

            # Load batch images
            for img_path in batch_paths:
                try:
                    image = Image.open(img_path)
                    batch_images.append(image)
                    valid_paths.append(img_path)
                except Exception as e:
                    results[img_path] = {
                        'text': f"Error loading image: {str(e)}",
                        'processing_time': 0,
                        'status': 'error'
                    }

            if not batch_images:
                continue

            try:
                start_time = time.time()

                # Preprocess batch
                inputs = self.processor(batch_images, return_tensors="pt", padding=True)
                pixel_values = inputs.pixel_values.to(self.device)
                if self.device.type == 'cuda':
                    pixel_values = pixel_values.half()

                # Generate text
                with torch.no_grad():
                    generated_ids = self.model.generate(pixel_values)
                    texts = self.processor.batch_decode(generated_ids, skip_special_tokens=True)

                process_time = time.time() - start_time

                # Store results
                for img_path, text in zip(valid_paths, texts):
                    results[img_path] = {
                        'text': text.strip(),
                        'processing_time': process_time / len(batch_images),
                        'status': 'success'
                    }

            except Exception as e:
                error_msg = f"Batch processing error: {str(e)}"
                for img_path in valid_paths:
                    results[img_path] = {
                        'text': error_msg,
                        'processing_time': 0,
                        'status': 'error'
                    }

        return results

    def save_results(self, results: Dict, output_file: str = 'handwritten_ocr_results.txt'):
        """Save results to file"""
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(f"Handwritten Text Recognition Results\n")
            f.write(f"Model: {self.model_name}\n")
            f.write(f"Device: {self.device}\n")
            f.write("=" * 60 + "\n\n")

            # Calculate statistics
            successful = sum(1 for r in results.values() if r['status'] == 'success')
            total_time = sum(r['processing_time'] for r in results.values() if r['status'] == 'success')

            # Write summary
            f.write(f"Summary:\n")
            f.write(f"Total images processed: {len(results)}\n")
            f.write(f"Successful: {successful}\n")
            f.write(f"Failed: {len(results) - successful}\n")
            f.write(f"Total processing time: {total_time:.2f} seconds\n")
            f.write(f"Average time per image: {total_time/successful:.2f} seconds\n\n")

            # Write individual results
            for img_path, result in results.items():
                f.write("-" * 60 + "\n")
                f.write(f"Image: {img_path}\n")
                f.write(f"Status: {result['status']}\n")
                f.write(f"Text: {result['text']}\n")
                if result['status'] == 'success':
                    f.write(f"Processing time: {result['processing_time']:.2f} seconds\n")

def main():
    # Initialize OCR
    ocr = HandwrittenOCR()

    # List of images to process
    image_paths = ['img1.jpg', 'img2.jpg', 'img3.jpg']

    # Choose processing mode (batch or single)
    USE_BATCH = True

    if USE_BATCH:
        print("\nProcessing images in batch mode...")
        results = ocr.process_batch(image_paths, batch_size=2)
    else:
        print("\nProcessing images individually...")
        results = {}
        for img_path in image_paths:
            print(f"\nProcessing {img_path}")
            results[img_path] = ocr.process_single_image(img_path)

    # Save results
    ocr.save_results(results)
    print("\nProcessing complete. Results saved to handwritten_ocr_results.txt")

if __name__ == "__main__":
    main()

In [None]:
import pytesseract
from PIL import Image
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
from typing import Dict, List
import time

class GPUAcceleratedOCR:
    def __init__(self, lang='eng'):
        """
        Initialize OCR with GPU support and enhanced image processing
        """
        self.lang = lang
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Using device: {self.device}")

        # Enhanced transforms for better preprocessing
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485], std=[0.229])
        ])

    def enhance_image(self, image: np.ndarray) -> np.ndarray:
        """
        Apply advanced image enhancement techniques
        """
        # Convert to grayscale if needed
        if len(image.shape) == 3:
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        else:
            gray = image.copy()

        # Create CLAHE object for contrast enhancement
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        enhanced = clahe.apply(gray)

        # Denoise using GPU if available
        if self.device.type == 'cuda':
            # Convert to tensor for GPU processing
            img_tensor = torch.from_numpy(enhanced).float().unsqueeze(0).unsqueeze(0)
            img_tensor = img_tensor.to(self.device)

            # Apply GPU-accelerated denoising
            with torch.no_grad():
                # Bilateral filtering simulation using separable convolutions
                gaussian_kernel = torch.tensor([
                    [1, 4, 6, 4, 1],
                    [4, 16, 24, 16, 4],
                    [6, 24, 36, 24, 6],
                    [4, 16, 24, 16, 4],
                    [1, 4, 6, 4, 1]
                ], dtype=torch.float32, device=self.device) / 256

                gaussian_kernel = gaussian_kernel.view(1, 1, 5, 5)
                img_tensor = torch.nn.functional.conv2d(
                    img_tensor,
                    gaussian_kernel,
                    padding=2
                )

                denoised = img_tensor.squeeze().cpu().numpy()
        else:
            # CPU fallback for denoising
            denoised = cv2.GaussianBlur(enhanced, (5, 5), 0)

        # Convert back to uint8
        denoised = (denoised * 255).astype(np.uint8) if denoised.dtype != np.uint8 else denoised

        return denoised

    def deskew_image(self, image: np.ndarray) -> np.ndarray:
        """
        Deskew image using GPU acceleration if available
        """
        try:
            # Convert to binary
            thresh = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]

            # Find all non-zero points
            coords = np.column_stack(np.where(thresh > 0))

            if len(coords) < 20:  # Not enough points to determine angle
                return image

            if self.device.type == 'cuda':
                # GPU accelerated angle calculation
                coords_tensor = torch.from_numpy(coords).float().to(self.device)

                # Calculate angle using PCA
                with torch.no_grad():
                    mean = torch.mean(coords_tensor, dim=0)
                    centered_coords = coords_tensor - mean
                    cov = torch.mm(centered_coords.t(), centered_coords)
                    _, eigenvectors = torch.linalg.eigh(cov)
                    angle = torch.atan2(eigenvectors[-1, 1], eigenvectors[-1, 0])
                    angle = torch.rad2deg(angle).cpu().numpy()
            else:
                # CPU fallback
                angle = cv2.minAreaRect(coords)[-1]

            # Determine if angle needs to be adjusted
            if angle < -45:
                angle = 90 + angle
            elif angle > 45:
                angle = -90 + angle

            # Rotate the image
            (h, w) = image.shape[:2]
            center = (w // 2, h // 2)
            M = cv2.getRotationMatrix2D(center, angle, 1.0)
            rotated = cv2.warpAffine(
                image, M, (w, h),
                flags=cv2.INTER_CUBIC,
                borderMode=cv2.BORDER_REPLICATE
            )

            return rotated

        except Exception as e:
            print(f"Warning: Deskewing failed - {str(e)}")
            return image

    def preprocess_image(self, image: np.ndarray) -> np.ndarray:
        """
        Enhanced preprocessing pipeline
        """
        try:
            # Initial enhancement
            enhanced = self.enhance_image(image)

            # Deskew
            deskewed = self.deskew_image(enhanced)

            # Convert to tensor for GPU processing
            img_tensor = self.transform(deskewed).unsqueeze(0)
            img_tensor = img_tensor.to(self.device)

            # GPU-accelerated processing
            with torch.no_grad():
                # Additional denoising
                img_tensor = torch.nn.functional.avg_pool2d(
                    img_tensor, 2, stride=1, padding=1
                )

                # Enhance edges
                kernel = torch.tensor([
                    [-1, -1, -1],
                    [-1,  9, -1],
                    [-1, -1, -1]
                ], dtype=torch.float32, device=self.device).view(1, 1, 3, 3)
                img_tensor = torch.nn.functional.conv2d(
                    img_tensor, kernel, padding=1
                )

                # Normalize
                img_tensor = torch.nn.functional.normalize(img_tensor, dim=1)

                # Convert back to numpy
                processed = (img_tensor.squeeze().cpu().numpy() * 255).astype(np.uint8)

            # Final CPU operations
            kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3,3))
            processed = cv2.dilate(processed, kernel, iterations=1)
            processed = cv2.medianBlur(processed, 3)

            return processed

        except Exception as e:
            print(f"Warning: Advanced preprocessing failed - {str(e)}")
            # Fallback to basic preprocessing
            return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    def image_to_text(self, image_path: str, preprocess: bool = True) -> Dict:
        """
        Extract text from image with GPU acceleration
        Args:
            image_path: Path to image file
            preprocess: Whether to apply preprocessing
        Returns:
            Dictionary containing extracted text and confidence
        """
        try:
            start_time = time.time()

            # Read image
            image = cv2.imread(image_path)
            if image is None:
                raise ValueError(f"Could not read image at {image_path}")

            # Preprocess
            if preprocess:
                image = self.preprocess_image(image)

            # Extract text
            custom_config = r'--oem 3 --psm 6'
            text = pytesseract.image_to_string(image, lang=self.lang, config=custom_config)

            # Get confidence scores
            data = pytesseract.image_to_data(image, lang=self.lang, config=custom_config,
                                           output_type=pytesseract.Output.DICT)
            conf_scores = [int(conf) for conf in data['conf'] if conf != '-1']
            confidence = sum(conf_scores) / len(conf_scores) if conf_scores else 0

            process_time = time.time() - start_time

            return {
                'text': text.strip(),
                'confidence': confidence,
                'process_time': process_time
            }

        except Exception as e:
            print(f"Error processing {image_path}: {str(e)}")
            return {
                'text': f"Error: {str(e)}",
                'confidence': 0,
                'process_time': 0
            }

    def process_batch(self, image_paths: List[str], batch_size: int = 4) -> Dict:
        """
        Process multiple images in batches using GPU
        Args:
            image_paths: List of image paths
            batch_size: Number of images to process simultaneously
        Returns:
            Dictionary of results
        """
        results = {}

        for i in range(0, len(image_paths), batch_size):
            batch_paths = image_paths[i:i + batch_size]

            # Process batch
            for img_path in batch_paths:
                print(f"\nProcessing {img_path}")
                result = self.image_to_text(img_path)
                results[img_path] = result

                print(f"Detected text: {result['text']}")
                print(f"Confidence: {result['confidence']:.2f}%")
                print(f"Processing time: {result['process_time']:.3f} seconds")

        return results

    def save_results(self, results: Dict, output_file: str = 'ocr_results.txt'):
        """
        Save OCR results to file
        Args:
            results: Dictionary of OCR results
            output_file: Output file path
        """
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(f"OCR Results (Using {self.device})\n")
            f.write("=" * 50 + "\n\n")

            total_time = sum(result['process_time'] for result in results.values())
            avg_confidence = sum(result['confidence'] for result in results.values()) / len(results)

            f.write(f"Summary:\n")
            f.write(f"Total images processed: {len(results)}\n")
            f.write(f"Average confidence: {avg_confidence:.2f}%\n")
            f.write(f"Total processing time: {total_time:.3f} seconds\n")
            f.write(f"Average time per image: {total_time/len(results):.3f} seconds\n\n")

            for img_path, result in results.items():
                f.write("-" * 50 + "\n")
                f.write(f"Image: {img_path}\n")
                f.write(f"Text: {result['text']}\n")
                f.write(f"Confidence: {result['confidence']:.2f}%\n")
                f.write(f"Processing time: {result['process_time']:.3f} seconds\n")

def main():
    # Initialize OCR
    ocr = GPUAcceleratedOCR(lang='eng')

    # List of images to process
    image_paths = ['img1.jpg', 'img2.jpg', 'img3.jpg', 'img4.jpg', 'img5.jpg']

    # Process images in batches
    results = ocr.process_batch(image_paths, batch_size=2)

    # Save results
    ocr.save_results(results)
    print("\nProcessing complete. Results saved to ocr_results.txt")

if __name__ == "__main__":
    main()

In [None]:
import spacy
from spacy.tokens import DocBin
from spacy.tokens import Doc
from typing import List, Dict, Tuple
import json
from datetime import datetime
from google.colab import userdata





class MedicationNER:
    def __init__(self,api_key: str = None):

        """
        Initialize the NER system with predefined patterns and rules
        Args:
            api_key: OpenFDA API key
        """
        self.api_key = api_key
        self.nlp = spacy.blank("en")
        self.entity_types = {
            'MEDICATION': self.fetch_medications_from_openfda(),
            'DOSAGE': [r'\d+\s*(?:mg|mL|tablet|tablets|puffs|grams|drops|capsule)',
                      r'\d+\s*(?:gram|g|mcg)'],
            'STRENGTH': [r'\d+\s*(?:mg|mcg|%)',
                        r'\d+\.\d+\s*(?:mg|mcg|%)'],
            'ROUTE': ['orally', 'topically', 'intramuscularly', 'inhale',
                     'via inhaler', 'into each eye'],
            'FREQUENCY': ['every 8 hours', 'twice daily', 'three times a day',
                         'once daily', 'single dose', 'every 12 hours',
                         'every 4 to 6 hours'],
            'DURATION': [r'\d+\s*(?:days|weeks|months)',
                        r'\d+\s*(?:day|week|month)'],
            'FORM': ['cream', 'syrup', 'ophthalmic solution', 'inhaler', 'tablet',
                    'capsule'],
            'ADMINISTRATION_SITE': ['affected area', 'into each eye'],
            'ADMINISTRATION_TIME': ['at bedtime', 'immediately'],
            'ADMINISTRATION_CONDITION': ['after meals', 'with the evening meal',
                                       'as needed for wheezing']
        }



    def fetch_medications_from_openfda(self) -> List[str]:
        """
        Fetch a list of medication names from the openFDA API.
        Returns:
            A list of medication generic and brand names.
        """
        medications = set()
        base_url = "https://api.fda.gov/drug/ndc.json"
        # Initialize parameters
        limit = 1000  # Maximum results per request
        total_requests = 5  # Limit number of requests to avoid rate limiting

        for skip in range(0, limit * total_requests, limit):
            params = {
                'search': 'product_type:"HUMAN PRESCRIPTION DRUG"',
                'limit': limit,
                'skip': skip,
                'api_key': self.api_key  # Add your API key here
            }

            response = requests.get(base_url, params=params)
            response.raise_for_status()
            data = response.json()

            if 'results' in data:
                for result in data['results']:
                    # Add generic name if available
                    if 'generic_name' in result:
                        generic_names = result['generic_name'].split(', ')
                        medications.update(generic_names)

                    # Add brand name if available
                    if 'brand_name' in result:
                        brand_names = result['brand_name'].split(', ')
                        medications.update(brand_names)

                    # Add active ingredients if available
                    if 'active_ingredients' in result:
                        for ingredient in result['active_ingredients']:
                            if 'name' in ingredient:
                                medications.add(ingredient['name'])


        # Clean up medication names
        cleaned_medications = set()
        for med in medications:
            # Convert to title case and strip whitespace
            cleaned_med = med.title().strip()
            # Remove entries that are too short or contain unwanted characters
            if len(cleaned_med) > 2 and cleaned_med.replace(' ', '').isalnum():
                cleaned_medications.add(cleaned_med)

        return list(cleaned_medications)


    def extract_entities(self, text: str) -> List[Dict]:
        """
        Extract medication-related entities from text

        Args:
            text: Input medication instruction
        Returns:
            List of detected entities with their types
        """
        doc = self.nlp(text)
        entities = []

        # Process each entity type
        for entity_type, patterns in self.entity_types.items():
            for pattern in patterns:
                # Look for exact matches first
                if pattern in text:
                    start = text.find(pattern)
                    entities.append({
                        'text': pattern,
                        'type': entity_type,
                        'start': start,
                        'end': start + len(pattern)
                    })
                    continue

                # Look for pattern matches
                import re
                matches = re.finditer(pattern, text)
                for match in matches:
                    entities.append({
                        'text': match.group(),
                        'type': entity_type,
                        'start': match.start(),
                        'end': match.end()
                    })

        # Sort entities by start position
        entities = sorted(entities, key=lambda x: x['start'])

        return entities

    def process_and_format(self, text: str) -> str:
        """
        Process text and return formatted output

        Args:
            text: Input medication instruction
        Returns:
            Formatted string with detected entities
        """
        entities = self.extract_entities(text)

        output = f"Sentence: \"{text}\"\n"
        output += "Entities:\n"

        # Group entities by type
        grouped_entities = {}
        for entity in entities:
            if entity['type'] not in grouped_entities:
                grouped_entities[entity['type']] = []
            grouped_entities[entity['type']].append(entity['text'])

        # Format output
        for entity_type, entity_texts in grouped_entities.items():
            for text in entity_texts:
                output += f"  * {text}: {entity_type}\n"

        return output
def save_results_to_file(examples, results, filename='medication_analysis_results.txt'):
    """
    Save the analysis results to a text file with clear formatting

    Args:
        examples: List of example sentences
        results: List of processed results
        filename: Output filename
    """
    with open(filename, 'w', encoding='utf-8') as f:
        # Write header
        f.write("=" * 80 + "\n")
        f.write("MEDICATION PRESCRIPTION ANALYSIS RESULTS\n")
        f.write("=" * 80 + "\n\n")

        # Write summary
        f.write(f"Total Prescriptions Analyzed: {len(examples)}\n")
        f.write("Analysis Date: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n")

        # Write detailed results
        for i, (example, result) in enumerate(zip(examples, results), 1):
            f.write("-" * 80 + "\n")
            f.write(f"PRESCRIPTION #{i}\n")
            f.write("-" * 80 + "\n\n")

            # Original text
            f.write("Original Prescription:\n")
            f.write(f"\"{example}\"\n\n")

            # Extracted entities
            f.write("Extracted Entities:\n")
            entities = result.split('Entities:\n')[1].strip().split('\n')

            # Group entities by type
            entity_dict = {}
            for entity in entities:
                if entity.strip():
                    text, etype = entity.strip()[2:].split(': ')
                    if etype not in entity_dict:
                        entity_dict[etype] = []
                    entity_dict[etype].append(text)

            # Write grouped entities in order of importance
            entity_order = [
                'MEDICATION', 'DOSAGE', 'STRENGTH', 'FORM', 'ROUTE',
                'FREQUENCY', 'DURATION', 'ADMINISTRATION_SITE',
                'ADMINISTRATION_TIME', 'ADMINISTRATION_CONDITION'
            ]

            for etype in entity_order:
                if etype in entity_dict:
                    f.write(f"\n{etype}:\n")
                    for text in entity_dict[etype]:
                        f.write(f"  • {text}\n")

            f.write("\n")
def main():
    # Initialize NER
    api_key = userdata.get('OPENFDA_KEY')
    ner = MedicationNER(api_key)

    # Example medication instructions
    examples = [
        "Administer 500 mg of Amoxicillin orally every 8 hours for 7 days.",
        "Prescribe 2 tablets of Ibuprofen 200 mg to be taken twice daily after meals.",
        "Apply 1% Hydrocortisone cream topically to the affected area three times a day.",
        "Take 10 mL of Cetirizine syrup orally once daily at bedtime.",
        "Inject 0.3 mg of Epinephrine intramuscularly immediately as a single dose.",
        "Administer 1 tablet of Metformin 500 mg orally with the evening meal.",
        "Instill 2 drops of Timolol 0.25% ophthalmic solution into each eye twice daily.",
        "Take 1 capsule of Doxycycline 100 mg orally every 12 hours for 14 days.",
        "Apply 5 grams of Clotrimazole 1% cream topically to the affected area twice daily for 2 weeks.",
        "Inhale 2 puffs of Albuterol 90 mcg via inhaler every 4 to 6 hours as needed for wheezing."
    ]

    # Process examples and collect results
    results = []
    for text in examples:
        result = ner.process_and_format(text)
        results.append(result)
        print(f"\n{result}")

    # Save detailed results to file
    save_results_to_file(examples, results)
    print("\nDetailed results have been saved to 'medication_analysis_results.txt'")



if __name__ == "__main__":
    main()

In [None]:
import requests
from datetime import datetime
from typing import List, Dict
import json
from google.colab import userdata
class DrugInteractionChecker:
    def __init__(self, api_key):
        """
        Initialize the drug interaction checker
        Args:
            api_key: OpenFDA API key (required)
        """
        self.api_key = api_key
        self.base_url = "https://api.fda.gov/drug/event.json"

    def extract_medications_from_file(self, filename: str) -> List[str]:
        """
        Extract medication names from the analysis results file
        """
        medications = []
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                content = f.read()

            # Find all medication sections
            sections = content.split('PRESCRIPTION #')
            for section in sections:
                if 'MEDICATION:' in section:
                    med_section = section.split('MEDICATION:')[1].split('\n')[1]
                    medication = med_section.strip('• ').strip()
                    if medication:
                        medications.append(medication)

            return list(set(medications))  # Remove duplicates

        except Exception as e:
            print(f"Error reading file: {str(e)}")
            return []

    def get_drug_events(self, drug_name: str) -> Dict:
        """
        Get drug event information from OpenFDA
        """
        try:
            params = {
                'search': f'patient.drug.medicinalproduct:{drug_name}',
                'limit': 100,
                'api_key': self.api_key
            }

            response = requests.get(self.base_url, params=params)
            response.raise_for_status()
            data = response.json()

            if 'results' not in data:
                return {'error': 'No results found'}

            # Process and summarize the events
            events_summary = self._process_events(data['results'])
            return events_summary

        except requests.exceptions.RequestException as e:
            error_msg = str(e)
            if hasattr(e, 'response') and e.response is not None:
                error_msg = f"{error_msg}\nResponse: {e.response.text}"
            return {'error': f'API request failed: {error_msg}'}

    def _process_events(self, events: List[Dict]) -> Dict:
        """
        Process and summarize drug events
        """
        summary = {
            'total_reports': len(events),
            'reactions': {},
            'interactions': [],
            'common_indications': set(),
            'warnings': set()
        }

        for event in events:
            if 'patient' in event:
                patient = event['patient']

                # Process reactions
                if 'reaction' in patient:
                    for reaction in patient['reaction']:
                        if 'reactionmeddrapt' in reaction:
                            reaction_term = reaction['reactionmeddrapt']
                            summary['reactions'][reaction_term] = summary['reactions'].get(reaction_term, 0) + 1

                # Process drug information
                if 'drug' in patient:
                    for drug in patient['drug']:
                        # Get indications
                        if 'drugindication' in drug:
                            summary['common_indications'].add(drug['drugindication'])

                        # Look for interactions
                        if drug.get('drugcharacterization') == '2':  # Concomitant medications
                            if 'medicinalproduct' in drug:
                                summary['interactions'].append(drug['medicinalproduct'])

        # Convert sets to lists for JSON serialization
        summary['common_indications'] = list(summary['common_indications'])
        summary['warnings'] = list(summary['warnings'])

        # Sort reactions by frequency
        summary['reactions'] = dict(sorted(
            summary['reactions'].items(),
            key=lambda x: x[1],
            reverse=True
        ))

        return summary

    def save_interaction_results(self, results: Dict, filename: str = 'drug_safety_report.txt'):
        """
        Save drug safety and interaction results to a file
        """
        with open(filename, 'w', encoding='utf-8') as f:
            f.write("=" * 80 + "\n")
            f.write("DRUG SAFETY AND INTERACTION REPORT\n")
            f.write("=" * 80 + "\n\n")

            f.write(f"Analysis Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"Total Medications Analyzed: {len(results)}\n\n")

            for med_name, data in results.items():
                f.write("-" * 80 + "\n")
                f.write(f"MEDICATION: {med_name}\n")
                f.write("-" * 80 + "\n\n")

                if 'error' in data:
                    f.write(f"Error: {data['error']}\n\n")
                    continue

                # Total reports
                f.write(f"Total Reports Analyzed: {data['total_reports']}\n\n")

                # Common reactions
                f.write("TOP REPORTED REACTIONS:\n")
                for reaction, count in list(data['reactions'].items())[:10]:  # Top 10
                    f.write(f"• {reaction}: {count} reports\n")
                f.write("\n")

                # Drug interactions
                if data['interactions']:
                    f.write("REPORTED CONCURRENT MEDICATIONS:\n")
                    for interaction in set(data['interactions']):
                        f.write(f"• {interaction}\n")
                    f.write("\n")

                # Common indications
                if data['common_indications']:
                    f.write("COMMON INDICATIONS:\n")
                    for indication in data['common_indications']:
                        f.write(f"• {indication}\n")
                    f.write("\n")

            f.write("\nDISCLAIMER:\n")
            f.write("This report is based on FDA adverse event reporting data and should not be used\n")
            f.write("to make medical decisions. Consult healthcare professionals for medical advice.\n")

def main():
    # Initialize checker with API key
    API_KEY = userdata.get('OPENFDA_KEY')
    checker = DrugInteractionChecker(api_key=API_KEY)

    # Extract medications from analysis results
    print("Extracting medications from analysis results...")
    medications = checker.extract_medications_from_file('medication_analysis_results.txt')

    if not medications:
        print("No medications found in the analysis file.")
        return

    print(f"\nFound {len(medications)} medications:")
    for med in medications:
        print(f"• {med}")

    # Check drug events and interactions
    print("\nGathering drug safety information...")
    results = {}
    for med in medications:
        print(f"Processing {med}...")
        results[med] = checker.get_drug_events(med)

    # Save results
    checker.save_interaction_results(results)
    print("\nDrug safety report has been saved to 'drug_safety_report.txt'")

if __name__ == "__main__":
    main()

In [None]:
import easyocr
import cv2
import numpy as np
from PIL import Image
from datetime import datetime
import os

def read_prescription(image_path, save_dir='prescriptions_output'):
    try:
        # Create output directory if it doesn't exist
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        # Initialize the OCR reader
        reader = easyocr.Reader(['en'], gpu=False)

        # Read the image
        image = cv2.imread(image_path)

        # Basic preprocessing
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        denoised = cv2.fastNlMeansDenoising(gray)
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        enhanced = clahe.apply(denoised)

        # Perform OCR
        results = reader.readtext(enhanced)

        # Extract and format the results
        prescription_text = []
        for (bbox, text, prob) in results:
            if prob > 0.3:
                prescription_text.append({
                    'text': text,
                    'confidence': round(prob * 100, 2),
                    'position': bbox
                })

        # Generate filename with timestamp
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        base_image_name = os.path.splitext(os.path.basename(image_path))[0]
        output_filename = f"{base_image_name}_{timestamp}.txt"
        output_path = os.path.join(save_dir, output_filename)

        # Save results to text file
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(f"Prescription OCR Results\n")
            f.write(f"Image: {image_path}\n")
            f.write(f"Processed on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write("-" * 50 + "\n\n")

            for item in prescription_text:
                f.write(f"Text: {item['text']}\n")
                f.write(f"Confidence: {item['confidence']}%\n")
                f.write(f"Position: {item['position']}\n")
                f.write("-" * 30 + "\n")

        print(f"Results saved to: {output_path}")
        return prescription_text, output_path

    except Exception as e:
        print(f"Error processing image: {str(e)}")
        return None, None

def display_results(results, output_path):
    if results:
        print("\nPrescription Details:")
        print("-" * 50)
        for item in results:
            print(f"Text: {item['text']}")
            print(f"Confidence: {item['confidence']}%")
            print("-" * 50)

        print(f"\nDetailed results have been saved to: {output_path}")

# Example usage
if __name__ == "__main__":
    image_path = "test_prescription.jpg"

    # Process the image and save results
    results, output_path = read_prescription(image_path)

    # Display results in console
    display_results(results, output_path)

In [None]:
# Install required packages
!pip install transformers torch accelerate
!pip install -U bitsandbytes
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

def setup_model():
    # Initialize model and tokenizer
    model_name = "mistralai/Mistral-7B-Instruct-v0.3"

    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    # Load model with lower precision to save memory
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,  # Use float16 for efficiency
        device_map="auto",  # Automatically handle device placement
        load_in_8bit=True  # Use 8-bit quantization to reduce memory usage
    )

    return model, tokenizer

def process_text(input_text, model, tokenizer, max_length=512):
    # Tokenize input text
    inputs = tokenizer(input_text, return_tensors="pt").to(model.device)

    # Generate response
    with torch.no_grad():
        outputs = model.generate(
            inputs["input_ids"],
            max_length=max_length,
            num_return_sequences=1,
            temperature=0.7,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id
        )

    # Decode response
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response

def main():
    # Setup paths
    input_file = "input.txt"
    output_file = "output.txt"

    try:
        # Read input text
        with open(input_file, 'r', encoding='utf-8') as f:
            input_text = f.read()

        # Initialize model and tokenizer
        print("Loading model and tokenizer...")
        model, tokenizer = setup_model()

        # Process text
        print("Processing text...")
        response = process_text(input_text, model, tokenizer)

        # Write output
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(response)

        print(f"Processing complete! Response saved to {output_file}")

    except Exception as e:
        print(f"An error occurred: {str(e)}")

if __name__ == "__main__":
    main()