In [7]:
import streamlit as st
from google import genai
from google.genai import types
from firebase_admin import credentials, initialize_app, firestore
from google.cloud.firestore_v1.base_query import FieldFilter
import json
import uuid
import datetime
import os
import firebase_admin
from pathlib import Path
from dotenv import load_dotenv
import requests


load_dotenv()

True

In [None]:
telegram_post_function = {
    "name": "telegram_post",
    "description": "Posts a message to Telegram channel and group",
    "parameters": {
        "type": "object",
        "properties": {
            "message": {
                "type": "string", 
                "description": "Formatted message content with Telegram-supported HTML",
            },
        },
        "required": ["message"],
    },
}

def clean_telegram_html(text: str) -> str:
    return text.replace('<br>', '\n')

def telegram_post(message: str) -> dict:
    """Posts message to Telegram without json parsing"""
    bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
    targets = {
        'channel': os.getenv('CHANNEL_USERNAME'),
        'group': os.getenv('GROUP_USERNAME')
    }
    
    cleaned_msg = clean_telegram_html(message)
    results = {}
    
    for name, target in targets.items():
        try:
            response = requests.post(
                f"https://api.telegram.org/bot{bot_token}/sendMessage",
                data={
                    'chat_id': target,
                    'text': cleaned_msg,
                    'parse_mode': 'HTML',
                }
            )
            

            if response.status_code == 200:
                response = response.json()
                results[name] = {
                    'success': True,
                    'message_id': response['result']['message_id'],  
                    'target': target
                }
            else:
                results[name] = {
                    'success': False,
                    'error': response.text,  
                    'target': target
                }
                
        except Exception as e:
            results[name] = {
                'success': False,
                'error': str(e),
                'target': target
            }
    
    return results

# Initialize Gemini Client
client = genai.Client(api_key=os.getenv('GEMINI_API_KEY'))
tools = types.Tool(function_declarations=[telegram_post_function])
config = types.GenerateContentConfig(tools=[tools])

prompt = """
Create a pharmacy announcement as we have a new product called 'Vitamin C' with:
1. use stunning html tags icons etc
2. Newlines for line breaks
3. Relevant emojis
4. Use the following information while creating the announcements:
- name: Axon Pharmacy
- location: 4kilo, Addis Ababa
- telegram contact: @axon_pharmacy
- website: https://axonpharma.com
- phone: +251111234567
"""

response = client.models.generate_content(
    model="gemini-2.5-flash",
    contents=prompt,
    config=config,
)

# Process response
if response.candidates and response.candidates[0].content.parts:
    part = response.candidates[0].content.parts[0]
    
    if part.function_call:
        print("Function call detected!")
        print(f"Function: {part.function_call.name}")
        print(f"Arguments: {part.function_call.args}")
        
        if part.function_call.name == "telegram_post":
            result = telegram_post(part.function_call.args['message'])
            print("\nTelegram Post Results:")
            for name, res in result.items():
                status = "Success!" if res['success'] else "Failed"
                print(f"{name} ({res['target']}): {status}")
    else:
        print("Response:", part.text)
else:
    print("No valid response received")

python-dotenv could not parse statement starting at line 32


Function call detected!
Function: telegram_post
Arguments: {'message': "✨ <b>Exciting News from Axon Pharmacy!</b> ✨<br><br>We're thrilled to announce the arrival of our newest product: <b>VITAMIN C</b>! 🍊💊<br><br>Boost your immunity and stay healthy with our high-quality Vitamin C.<br><br>📍 Visit us at: 4kilo, Addis Ababa<br>📞 Call us: +251111234567<br>🌐 Explore more: <a href='https://axonpharma.com'>https://axonpharma.com</a><br>✉️ Connect on Telegram: <a href='https://t.me/axon_pharmacy'>@axon_pharmacy</a><br><br>Your health is our priority! 💚<br>#AxonPharmacy #VitaminC #Health #NewProduct #AddisAbaba"}

Telegram Post Results:
channel (@axon_pharma): Success!
group (@axon_pharmacy): Success!


In [None]:
import os
import json
import hashlib
import uuid
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime

import firebase_admin
from firebase_admin import credentials, firestore
from dotenv import load_dotenv
from google.cloud.firestore_v1.base_document import DocumentSnapshot

# Load .env environment variables
load_dotenv()

# ---------------------- FIREBASE INITIALIZATION ----------------------

class FirebaseManager:
    def __init__(self):
        self.db = self._initialize_firebase()

    def _initialize_firebase(self):
        """Initialize Firebase Admin SDK with credentials"""
        try:
            # Look for credentials in current directory first
            cred_path = Path('firebase_credentials.json')
            
            if not cred_path.exists():
                # Try parent directory if not found
                cred_path = Path.cwd().parent / 'firebase_credentials.json'
                if not cred_path.exists():
                    raise FileNotFoundError("Firebase credentials file not found")

            cred = credentials.Certificate(str(cred_path))
            if not firebase_admin._apps:
                firebase_admin.initialize_app(cred)

            return firestore.client()
        except Exception as e:
            print(f"🔥 Firebase initialization failed: {e}")
            return None

# ---------------------- MEDICINE MANAGEMENT ----------------------

class PharmacyDB:
    def __init__(self):
        self.firebase = FirebaseManager()
        self.db = self.firebase.db

        if not self.db:
            raise RuntimeError("Database connection failed")

    def add_medicine(self, medicine_data: Dict) -> Optional[str]:
        """Add or update a medicine using its name as document ID"""
        try:
            if 'name' not in medicine_data:
                raise ValueError("Medicine name is required")

            medicine_id = medicine_data['name'].replace(' ', '_').lower()
            
            # Convert unit_price to float if it exists
            if 'unit_price' in medicine_data:
                medicine_data['unit_price'] = float(medicine_data['unit_price'])

            # Set default values for required fields
            medicine_data.setdefault('stock', 0)
            medicine_data.setdefault('madein', '')
            medicine_data.setdefault('description', '')
            medicine_data.setdefault('category', '')
            medicine_data.setdefault('unit_price', 0.0)
            
            # Add timestamp
            medicine_data['created_at'] = firestore.SERVER_TIMESTAMP

            doc_ref = self.db.collection('medicines').document(medicine_id)
            doc_ref.set(medicine_data)
            print(f"✅ Medicine added/updated: {medicine_id}")
            return medicine_id
        except Exception as e:
            print(f"❌ Error adding medicine: {e}")
            return None

    def get_medicine(self, medicine_name: str) -> Optional[Dict]:
        try:
            medicine_id = medicine_name.replace(' ', '_').lower()
            doc = self.db.collection('medicines').document(medicine_id).get()
            return doc.to_dict() if doc.exists else None
        except Exception as e:
            print(f"❌ Error fetching medicine: {e}")
            return None

    def update_stock(self, medicine_name: str, quantity_change: int) -> bool:
        try:
            medicine_id = medicine_name.replace(' ', '_').lower()
            product_ref = self.db.collection('medicines').document(medicine_id)
            product_ref.update({
                'stock': firestore.Increment(quantity_change),
                'last_updated': firestore.SERVER_TIMESTAMP
            })
            print(f"✅ Stock updated for {medicine_id}")
            return True
        except Exception as e:
            print(f"❌ Error updating stock: {e}")
            return False

    def list_medicines(self, limit: int = 100) -> List[Dict]:
        try:
            return [doc.to_dict() for doc in self.db.collection('medicines').limit(limit).stream()]
        except Exception as e:
            print(f"❌ Error listing medicines: {e}")
            return []

    def delete_medicine(self, medicine_name: str) -> bool:
        try:
            medicine_id = medicine_name.replace(' ', '_').lower()
            self.db.collection('medicines').document(medicine_id).delete()
            print(f"✅ Medicine deleted: {medicine_id}")
            return True
        except Exception as e:
            print(f"❌ Error deleting medicine: {e}")
            return False

# ---------------------- USER MANAGEMENT ----------------------

class UserManager:
    def __init__(self, db):
        self.db = db

    def hash_password(self, password: str) -> str:
        return hashlib.sha256(password.encode()).hexdigest()

    def add_user(self, user_data: Dict) -> bool:
        try:
            if 'email' not in user_data or 'password' not in user_data:
                raise ValueError("Email and password are required")

            email = user_data['email'].strip().lower()
            doc_ref = self.db.collection('users').document(email)

            if doc_ref.get().exists:
                print("⚠️ User already exists.")
                return False

            # Set default values
            user_data.setdefault('name', '')
            user_data.setdefault('age', 0)
            user_data.setdefault('orders', {})
            user_data.setdefault('chats', [])
            
            # Hash password
            user_data['password'] = self.hash_password(user_data['password'])
            user_data['created_at'] = firestore.SERVER_TIMESTAMP

            doc_ref.set(user_data)
            print(f"✅ User created: {email}")
            return True
        except Exception as e:
            print(f"❌ Error adding user: {e}")
            return False

    def get_user(self, email: str) -> Optional[Dict]:
        try:
            doc: DocumentSnapshot = self.db.collection('users').document(email.lower()).get()
            return doc.to_dict() if doc.exists else None
        except Exception as e:
            print(f"❌ Error retrieving user: {e}")
            return None

    def update_user_orders(self, email: str, order_id: str, medicine_name: str) -> bool:
        try:
            user_ref = self.db.collection('users').document(email.lower())
            user_ref.update({f'orders.{order_id}': medicine_name})
            print(f"✅ Order {order_id} added to user {email}")
            return True
        except Exception as e:
            print(f"❌ Error updating orders: {e}")
            return False

    def add_user_chat(self, email: str, message: str) -> bool:
        try:
            user_ref = self.db.collection('users').document(email.lower())
            user_ref.update({'chats': firestore.ArrayUnion([message])})
            print(f"💬 Chat added for {email}")
            return True
        except Exception as e:
            print(f"❌ Error adding chat: {e}")
            return False

# ---------------------- ORDER MANAGEMENT ----------------------

class OrdersManager:
    def __init__(self, db):
        self.db = db

    def place_order(self, medicinename: str, user_email: str, amount: int, unit_price: float) -> Optional[str]:
        try:
            order_id = uuid.uuid4().hex
            total_price = round(amount * unit_price, 2)

            order_data = {
                'id': order_id,
                'medicinename': medicinename,
                'user_email': user_email.lower(),
                'amount': amount,
                'unit_price': unit_price,
                'total_price': total_price,
                'order_status': "Pending",
                'created_at': firestore.SERVER_TIMESTAMP
            }

            self.db.collection('orders').document(order_id).set(order_data)
            print(f"✅ Order placed: {order_id}")
            return order_id
        except Exception as e:
            print(f"❌ Error placing order: {e}")
            return None

    def update_order_status(self, order_id: str, new_status: str) -> bool:
        try:
            order_ref = self.db.collection('orders').document(order_id)
            order_ref.update({
                'order_status': new_status,
                'updated_at': firestore.SERVER_TIMESTAMP
            })
            print(f"📦 Order {order_id} status updated to '{new_status}'")
            return True
        except Exception as e:
            print(f"❌ Error updating order status: {e}")
            return False

    def get_order(self, order_id: str) -> Optional[Dict]:
        try:
            doc = self.db.collection('orders').document(order_id).get()
            return doc.to_dict() if doc.exists else None
        except Exception as e:
            print(f"❌ Error retrieving order: {e}")
            return None

    def list_orders(self, limit: int = 100) -> List[Dict]:
        try:
            return [doc.to_dict() for doc in self.db.collection('orders').limit(limit).stream()]
        except Exception as e:
            print(f"❌ Error listing orders: {e}")
            return []

# ---------------------- ADMIN MANAGEMENT ----------------------

class AdminManager:
    def __init__(self, db):
        self.db = db

    def hash_password(self, password: str) -> str:
        return hashlib.sha256(password.encode()).hexdigest()

    def authenticate_admin(self, email: str, password: str) -> bool:
        """Hardcoded admin credentials for development"""
        return email == "admin@axon.org" and password == "123"

    def add_admin(self, admin_data: Dict) -> bool:
        try:
            if 'email' not in admin_data or 'password' not in admin_data:
                raise ValueError("Email and password are required")

            email = admin_data['email'].strip().lower()
            doc_ref = self.db.collection('admins').document(email)

            if doc_ref.get().exists:
                print("⚠️ Admin already exists.")
                return False

            # Set defaults
            admin_data.setdefault('name', 'Admin')
            
            # Hash password
            admin_data['password'] = self.hash_password(admin_data['password'])
            admin_data['created_at'] = firestore.SERVER_TIMESTAMP

            doc_ref.set(admin_data)
            print(f"✅ Admin created: {email}")
            return True
        except Exception as e:
            print(f"❌ Error adding admin: {e}")
            return False

    def get_admin(self, email: str) -> Optional[Dict]:
        try:
            doc = self.db.collection('admins').document(email.lower()).get()
            return doc.to_dict() if doc.exists else None
        except Exception as e:
            print(f"❌ Error retrieving admin: {e}")
            return None

# ---------------------- EXAMPLE USAGE ----------------------

if __name__ == "__main__":
    try:
        # Initialize all managers
        db = PharmacyDB()
        users = UserManager(db.db)
        orders = OrdersManager(db.db)
        admin_mgr = AdminManager(db.db)

        # Test admin authentication
        print("\n🔐 Testing admin authentication:")
        print(admin_mgr.authenticate_admin("admin@axon.org", "123"))  # True
        print(admin_mgr.authenticate_admin("wrong@email.com", "123"))  # False

        # Add a medicine
        print("\n💊 Adding medicine:")
        new_medicine = {
            'name': 'Aspririn',
            'stock': 300,
            'madein': 'India',
            'description': 'Used for treating bacterial infections.',
            'category': 'Antibiotics',
            'unit_price': 5.75
        }
        medicine_id = db.add_medicine(new_medicine)
        print(f"Medicine ID: {medicine_id}")

        # Add a user
        print("\n👤 Adding user:")
        new_user = {
            'email': 'test@gmail.com',
            'password': '123',
            'name': 'Test User',
            'age': 30
        }
        users.add_user(new_user)

        # Create an order
        print("\n📦 Placing order:")
        order_id = orders.place_order(
            medicinename='Amoxicillin Tablet',
            user_email='test@gmail.com',
            amount=3,
            unit_price=5.75
        )
        print(f"Order ID: {order_id}")

        # Update order status
        if order_id:
            print("\n🔄 Updating order status:")
            orders.update_order_status(order_id, "Shipped")

        # List all orders
        print("\n📋 All Orders:")
        for order in orders.list_orders():
            print(f"- {order['id']} | Status: {order['order_status']}")

    except Exception as e:
        print(f"🚨 Application error: {e}")


🔐 Testing admin authentication:
True
False

💊 Adding medicine:
✅ Medicine added/updated: aspririn
Medicine ID: aspririn

👤 Adding user:
✅ User created: test@gmail.com

📦 Placing order:
✅ Order placed: 0b527cd538844e7ab93c4f656f314cd7
Order ID: 0b527cd538844e7ab93c4f656f314cd7

🔄 Updating order status:
📦 Order 0b527cd538844e7ab93c4f656f314cd7 status updated to 'Shipped'

📋 All Orders:
- 0b527cd538844e7ab93c4f656f314cd7 | Status: Shipped
🚨 Application error: 'order_status'
