### Import Libraries

In [1]:
from google.auth.transport.requests import Request
from email.message import EmailMessage
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from dateutil import parser
from typing import Annotated
from livekit.agents import llm
import logging
import sqlite3
import datetime
import base64
import os

# Defining Scope for accessing Google servics
SCOPES_calendar = ["https://www.googleapis.com/auth/calendar"]
SCOPES_gmail= ['https://www.googleapis.com/auth/gmail.compose']

# Setting up a logging system to track actions in the programm
logger = logging.getLogger("office-assistant")
logger.setLevel(logging.INFO)

### Setup Function

In [3]:
def _setup_database():
    """Initialize the database and create tables if not exists."""
    db_path = os.path.abspath("customer_data.db")

    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS customers (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT NOT NULL
            )
        """)
        conn.commit()

_setup_database()           # this sets up the database


def _setup_calendar_service():
    """Authenticate and return the Google Calendar service."""
    creds = None
    if os.path.exists("token_calendar.json"):
        creds = Credentials.from_authorized_user_file("token_calendar.json", SCOPES_calendar)
    
    # If no valid credentials are found, the user has to authenticate via browser 
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                "credentials_calendar.json", SCOPES_calendar
            )
            creds = flow.run_local_server(port=0)
        with open("token_calendar.json", "w") as token:     # credentials for further use
            token.write(creds.to_json())

    return build("calendar", "v3", credentials=creds)



def _setup_gmail_service():
    """Authenticate the user and save the credentials."""
    creds = None
    script_dir = os.getcwd() # Current script's directory
    token_path = os.path.join(script_dir, "token_gmail.json")
    credentials_path = os.path.join(script_dir, "credentials_gmail.json")
    
    # Load the saved credentials if token.json exists
    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path)
    
    # If no valid credentials are found, the user has to authenticate via browser 
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(credentials_path, SCOPES_gmail)
            creds = flow.run_local_server(port=0)

        # Save the credentials to token.json
        with open(token_path, 'w') as token_file:       # credentials for further use
            token_file.write(creds.to_json())
    
    return build('gmail', 'v1', credentials=creds)

### Accessing Google Calendar

In [11]:
def create_event(summary: str, description: str, start: str, end: str, timezone: str = "Europe/Vienna"):
        """Creates a new event in the user's Google Calendar."""
        try:
            service = _setup_calendar_service()

            # Parsing the start and end times into different format
            start = parser.parse(start).isoformat()
            end = parser.parse(end).isoformat()

            # event dictionary with necessary details
            event = {
                "summary": summary,
                "description": description,
                "start": {"dateTime": start, "timeZone": timezone},
                "end": {"dateTime": end, "timeZone": timezone},
            }

            # Inserting event into user's calendar
            event_result = service.events().insert(calendarId="primary", body=event).execute()
            return f"Event created: {event_result.get('htmlLink')}"
        
        except HttpError as error:
            logger.error(f"An error occurred: {error}")
            return "Failed to create event."

In [None]:
# functiontest

event = {
    "summary": "Meeting",
    "location": "123 Example St, City, Country",
    "description": "Meeting for product launch.",
    "start": "2025-02-03T11:00:00",
    "end": "2025-02-03T14:00:00",
    }

create_event(
    summary=event["summary"],
    description=event["description"],
    start=event["start"],
    end=event["end"]
)

In [4]:
def get_upcoming_events(max_results: int = 10):
    """Fetches upcoming events from the user's Google Calendar."""
    try:
        service = _setup_calendar_service()                 # setting up Calendar API service
        
        now = datetime.datetime.utcnow().isoformat() + "Z"       # current time as starting point
        
        events_result = service.events().list(                   # fetching upcoming events
            calendarId="primary",
            timeMin=now,
            maxResults=max_results,
            singleEvents=True,
            orderBy="startTime",
        ).execute()

        events = events_result.get("items", [])                  # extracting events

        if not events:
            return "No upcoming events found."
        
        # processing event
        return f"{[{'start': event['start'].get('dateTime', event['start'].get('date')), 'summary': event['summary']} for event in events]}"

    except HttpError as error:
        logger.error(f"An error occurred: {error}")
        return "Failed to fetch events."
    except KeyError as e:
        logger.error(f"Missing key in event data: {e}")
        return "Invalid event data format."

In [5]:
# functiontest

get_upcoming_events()

"[{'start': '2025-02-03T09:00:00+01:00', 'summary': 'meeting product launch'}]"

### Accessing Gmail

In [16]:
def gmail_create_draft(
        recipient: Annotated[str, llm.TypeInfo(description="Email recipient")],
        subject: Annotated[str, llm.TypeInfo(description="Email subject")],
        body: Annotated[str, llm.TypeInfo(description="Email body")],
    ):
        """Create and insert a draft email."""
        service = _setup_gmail_service()

        try:
            # Create email message
            message = EmailMessage()
            message.set_content(body)
            message["To"] = recipient
            message["Subject"] = subject

            # Encode the message 
            encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
            create_message = {"message": {"raw": encoded_message}}

            # Create a draft with Gmail API
            draft = service.users().drafts().create(userId="me", body=create_message).execute()
            logger.info(f'Draft id: {draft["id"]}\nDraft message: {draft["message"]}')
            return draft
        
        except HttpError as error:
            logger.info(f"An error occurred: {error}")
            draft = None
            return "Failed to create draft"

In [None]:
# functiontest

gmail_create_draft("alice.smith@example.com", "Test mail", "Hi, this is just a test mail")

In [18]:
def gmail_send_message(
        recipient: Annotated[str, llm.TypeInfo(description="Email recipient")],
        subject: Annotated[str, llm.TypeInfo(description="Email subject")],
        body: Annotated[str, llm.TypeInfo(description="Email body")],
    ):
        """Create and send an email message
        Print the returned  message id
        Returns: Message object, including message id
        """
        service = _setup_gmail_service()

        try:
            # creating email message
            message = EmailMessage()
            message.set_content(body)
            message["To"] = recipient
            message["Subject"] = subject

            # Encode the message
            encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
            create_message = {"raw": encoded_message}

            # send mail using Gmail API
            send_message = (
                service.users()
                .messages()
                .send(userId="me", body=create_message)
                .execute()
            )
            logger.info(f'Message Id: {send_message["id"]}')
            return send_message
        
        except HttpError as error:
            logger.info(f"An error occurred: {error}")
            send_message = None
            return "Failed to send mail"

In [None]:
# functiontest

gmail_send_message("alice.smith@example.com", "Test mail", "Hi this is just a test mail")

### Accessing SQL Database

In [20]:
def add_customer(name: str, email: str):
    try:
        # connect with the database and insert new entry
        with sqlite3.connect("customer_data.db") as conn:
            cursor = conn.cursor()
            cursor.execute("INSERT INTO customers (name, email) VALUES (?, ?)", (name, email))
            conn.commit()

        return f"Customer {name} added successfully."
    
    except sqlite3.IntegrityError:
        return f"Customer {name} already exists."

In [21]:
# functiontest

add_customer("Bob Johnson", "bob.johnson@example.com")

'Customer Bob Johnson added successfully.'

In [29]:
def get_customer(name: str):
    # connect with the database and query it
    with sqlite3.connect("customer_data.db") as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM customers WHERE name = ?", (name,))
        result = cursor.fetchone()

    if result:
        return f"Customer Name: {result[1]}, Email: {result[2]}"
    
    return f"Customer {name} not found."

In [30]:
# functiontest

get_customer("Bob Johnson")

'Customer Name: Bob Johnson, Email: bob.johnson@example.com'