### general

In [54]:
import requests
from datetime import datetime, timedelta
import json
import msal
import webbrowser
from typing import List, Dict, Union, Optional, TypedDict
import os
os.chdir("/home/daniel/de_AI_Fabriek/belasting-agent-playground/")

from keys import O365_CLIENT_SECRET, O365_CLIENT_ID

SCOPES = ['Calendars.ReadWrite']
GRAPH_API_ENDPOINT = 'https://graph.microsoft.com/v1.0'

AUTHORITY = 'https://login.microsoftonline.com/common'

# experimental

## Authentication

### Type Definitions 

In [None]:
class TokenResponse(TypedDict, total=False):
    token_type: str
    scope: str
    expires_in: int
    ext_expires_in: int
    access_token: str
    refresh_token: str
    id_token: str
    client_info: str
    token_source: str
    id_token_claims: Dict[str, str | int]  # Can contain mixed types

### generate acces token

In [None]:

def generate_access_token(app_id: str, scopes: List[str]) -> TokenResponse:
    """ Summary:
    This function generates an access token for the Microsoft Graph API.
    It uses the MSAL library to authenticate the user and generate the token or
    use the token from the cache if it is still valid, otherwise it will refresh the token.
        - Tokens are valid for 10 minutes and refresh tokens are valid for 90 days (school and work accounts) or 1 year
        and can be used to refresh the access token and get a new refresh token.
        
    The token is saved in a file called 'msg_api_token_access.json' and is used to authenticate the user in the future.

    Args:
        app_id : Client ID of the Azure App Registration
        scopes : List of scopes that the token should have access to

    Returns:
        Dict : Dictionary containing the access token and
        other information like the token type, expiry time and other account information. 
    """

    # Save Session Token as a token file
    access_token_cache = msal.SerializableTokenCache()

    # read the token file
    if os.path.exists('msg_api_token_access.json'):
        access_token_cache.deserialize(open("msg_api_token_access.json", "r").read())
        token_detail = json.load(open('msg_api_token_access.json',))
        if not token_detail:
            os.remove('msg_api_token_access.json')
            access_token_cache = msal.SerializableTokenCache()

        # check if the token is still valid
        if token_detail:
            token_detail_key = list(token_detail['RefreshToken'].keys())[0]
            modified = datetime.fromtimestamp(int(token_detail['RefreshToken'][token_detail_key]['last_modification_time']))
            end_date = modified + timedelta(days=90)
            if datetime.now() > end_date:
                os.remove('msg_api_token_access.json')
                access_token_cache = msal.SerializableTokenCache()

    # assign a SerializableTokenCache object to the client instance
    client = msal.PublicClientApplication(client_id=app_id, token_cache=access_token_cache)

    accounts = client.get_accounts()
    if accounts:
        # load the session
        token_response = client.acquire_token_silent(scopes, accounts[0])

    # if there are no accounts or the token is not valid (expired or out of scope) request a login form the user
    if not accounts or not token_response:
        # authetnicate your accoutn as usual
        flow = client.initiate_device_flow(scopes=scopes)
        webbrowser.open('https://microsoft.com/devicelogin')
        token_response = client.acquire_token_by_device_flow(flow)

    with open('msg_api_token_access.json', 'w') as _f:
        _f.write(access_token_cache.serialize())

    return token_response

             


## Calendar

### Type Definitions

OutlookEvent type

In [None]:
class ResponseStatus(TypedDict):
    response: str
    time: str

class Body(TypedDict):
    contentType: str
    content: str

class TimeZoneData(TypedDict):
    dateTime: str
    timeZone: str

class Location(TypedDict):
    displayName: str
    locationType: str
    uniqueIdType: str
    address: dict
    coordinates: dict

class EmailAddress(TypedDict):
    name: str
    address: str

class Organizer(TypedDict):
    emailAddress: EmailAddress

class OutlookEvent(TypedDict, total=False):  # total=False allows missing keys
    id: str
    createdDateTime: str
    lastModifiedDateTime: str
    changeKey: str
    categories: List[str]
    transactionId: Optional[str]
    originalStartTimeZone: str
    originalEndTimeZone: str
    iCalUId: str
    uid: str
    reminderMinutesBeforeStart: int
    isReminderOn: bool
    hasAttachments: bool
    subject: str
    bodyPreview: str
    importance: str
    sensitivity: str
    isAllDay: bool
    isCancelled: bool
    isOrganizer: bool
    responseRequested: bool
    seriesMasterId: Optional[str]
    showAs: str
    type: str
    webLink: str
    onlineMeetingUrl: Optional[str]
    isOnlineMeeting: bool
    onlineMeetingProvider: str
    allowNewTimeProposals: bool
    occurrenceId: Optional[str]
    isDraft: bool
    hideAttendees: bool
    responseStatus: ResponseStatus
    body: Body
    start: TimeZoneData
    end: TimeZoneData
    location: Location
    locations: List[Location]
    recurrence: Optional[str]
    attendees: List[dict]
    organizer: Organizer
    onlineMeeting: Optional[dict]

### Predecessors 

In [56]:
access_token = generate_access_token(O365_CLIENT_ID, SCOPES)
headers = {
    'Authorization': 'Bearer ' + access_token['access_token']
}

# create an event


In [17]:
def get_headers() -> Dict[str, str]:
    access_token = generate_access_token(O365_CLIENT_ID, SCOPES)
    headers = {
        'Authorization': 'Bearer ' + access_token['access_token']
    }
    return headers

In [18]:
def construct_event_detail(event_name: Dict[str, str], **kwargs : Dict[str, str]) -> Dict[str, str]:
    request_body = {
        'subject': event_name,
    }
    for key, value in kwargs.items():
        request_body[key] = value
    return request_body


In [None]:
def write_event(event_name: str, start: datetime, end: datetime, **kwargs) -> List[OutlookEvent]:
    headers = get_headers()
    response = requests.post(
        GRAPH_API_ENDPOINT + f'/me/events',
        headers=headers,
        json=construct_event_detail(
            event_name,
            start=start,
            end=end,
            **kwargs
        )
    )
    return response

### Create event

Examples of the request body for creating an event:

In [20]:
# event_name = 'Work Holiday'
# body = {
#     # html or text
#     'contentType': 'html',
#     'content': '<b>2 weeks vacation</b>'
# }
# start = {
#     'dateTime': '2025-02-20T09:00:00',
#     'timeZone': 'Europe/Amsterdam'
# }
# end = {
#     'dateTime': '2025-02-21T17:00:00',
#     'timeZone': 'Europe/Amsterdam'
# }
# location = {
#     'displayName': 'Paris, France'
# }
# attendees = [
#     {
#         'emailAddress': {
#             'address': 'daniel@vanoosteroom.com'
#         },
#          'type': 'required' # or optional
#     }
# ]

In [21]:
# response2_create = requests.post(
#     GRAPH_API_ENDPOINT + f'/me/events',
#     headers=headers,
#     json=construct_event_detail(
#             event_name,
#             body=body,
#             location=location,
#             start=start,
#             end=end,
#             #attendees=attendees,
#         )
# )

### Create test environment workweek

In [35]:

import random
def generate_schedule() -> List[List[timedelta]]:
    week_schedule = []

    for _ in range(5):

        times = [(timedelta(hours=9) + timedelta(minutes=random.choice([0, 15, 30, 45])))]


        while True:
            random_value = times[-1] + timedelta(minutes=random.choice(range(30, 180, 15)))
            times.append(random_value)
            if random_value >= timedelta(hours=16, minutes=30):
                times.pop(-1)
                times.append(timedelta(hours=17))
                break
        week_schedule.append(times)
    return week_schedule

In [None]:
# Create a randon da

def to_iso_date(date: datetime) -> str:
    return date.strftime("%Y-%m-%d")

def to_iso_time(time : timedelta) -> str:
    string_time = str(time)
    dt = datetime.strptime(string_time, "%H:%M:%S")
    return dt.strftime("%H:%M:%S")

def date_and_time_to_iso(date: datetime, time: timedelta) -> str:
    return to_iso_date(date) + 'T' + to_iso_time(time)

def time_json(date: datetime, time: timedelta) -> Dict[str, str]:
    return {
        'dateTime': date_and_time_to_iso(date, time),
        'timeZone': 'Europe/Amsterdam'
    }
    
def convert_to_date_time(time: datetime) -> timedelta:
    hours, minutes, seconds = map(int, str(time).split(':'))
    time_checked = timedelta(hours=hours, minutes=minutes, seconds=seconds)
    return time_checked



def write_week(week_schedule: List[List[timedelta]], start_date: datetime = datetime.now()) -> List[OutlookEvent]:

    log = []

    day = start_date - timedelta(days=1)
    for day_schedule in week_schedule:
        if day.weekday() == 4:
            day = day + timedelta(days=2)
        elif day.weekday() == 5:
            day = day + timedelta(days=1)
        
        day = day + timedelta(days=1)
            
        for j, time_indices in enumerate(day_schedule):
            log.append(write_event(
                'Meeting',
                time_json(day, day_schedule[j -1]),
                time_json(day, time_indices)
            ))
    return log



In [50]:
logs = write_week(generate_schedule())

### Get and Delete

In [None]:
def get_and_delete(start_datetime = (datetime.now() - timedelta(days=1)),
                    end_datetime = (datetime.now() + timedelta(days=8))) -> str:
    start_datetime = start_datetime.isoformat()
    end_datetime = end_datetime.isoformat()
    headers = get_headers()
    appointments_ids = set()
    while True:
        response1_get = requests.get(
        GRAPH_API_ENDPOINT + f'/me/calendar/calendarView?startDateTime={start_datetime}&endDateTime={end_datetime}',
        headers=headers
        )
        
        for i in response1_get.json()['value']:
            appointments_ids.add(i['id'])
        
        if len(appointments_ids) == 0:
            break
            
        new_start_datetime = response1_get.json()['value'][-1]['end']['dateTime']

        if start_datetime == new_start_datetime:
            break
        else:
            start_datetime = new_start_datetime
    
    for i in appointments_ids:
        response = requests.delete(
            GRAPH_API_ENDPOINT + f'/me/events/{i}',
            headers=headers
        )
    return f"{len(appointments_ids)} appointments deleted"
         
print(get_and_delete(start_datetime = (datetime(2025, 2, 20) - timedelta(days=1))))

76 appointments deleted


### Find open space

In [None]:
def retrieve_unique_events(start_datetime = (datetime.now() - timedelta(days=1)).isoformat(),
                    end_datetime = (datetime.now() + timedelta(days=8)).isoformat()):
    headers = get_headers()
    appointments_ids = set()
    appointments_jsons = []
    
    while True:
        response1_get = requests.get(
        GRAPH_API_ENDPOINT + f'/me/calendar/calendarView?startDateTime={start_datetime}&endDateTime={end_datetime}',
        headers=headers
        )
        
        for i in response1_get.json()['value']:
            if i['id'] not in appointments_ids:
                appointments_ids.add(i['id'])
                appointments_jsons.append(i)
        
        if len(appointments_ids) == 0:
            print(len(appointments_ids))
            break
            
        new_start_datetime = response1_get.json()['value'][-1]['end']['dateTime']

        if start_datetime == new_start_datetime:
            break
        else:
            start_datetime = new_start_datetime
    return appointments_jsons

In [17]:
unique_events = retrieve_unique_events()

In [None]:
from typing import List, Dict, Optional, Tuple
 
def convert_to_timedelta(date: datetime) -> timedelta:
    return timedelta(hours=date.hour, minutes=date.minute, seconds=date.second)

def find_first_opening(appointments: List[Dict], time_lenght: int) -> Optional[Tuple[datetime, datetime]]:
    WORK_START = datetime.strptime("09:00:00", "%H:%M:%S").time()
    WORK_END = datetime.strptime("17:00:00", "%H:%M:%S").time()
    MIN_DURATION = timedelta(minutes=time_lenght)

    appointments_dt = [
        (datetime.strptime(i['start']['dateTime'][:19], "%Y-%m-%dT%H:%M:%S") + timedelta(hours=1),
         datetime.strptime(i['end']['dateTime'][:19], "%Y-%m-%dT%H:%M:%S") + timedelta(hours=1))
        for i in appointments
    ]

    appointments_dt.sort()

    merged_appointments = []
    for start, end in appointments_dt:
        if merged_appointments and (start < merged_appointments[-1][1]):
            inserted = (min(min(merged_appointments[-1][0], start), merged_appointments[-1][0]), max(merged_appointments[-1][1], end))
            merged_appointments[-1] = inserted
        else:
            merged_appointments.append((start, end))

    # Check if there is an opening before the first appointment        
    if merged_appointments and (convert_to_timedelta(merged_appointments[0][0]) - convert_to_timedelta(WORK_START)) >= MIN_DURATION:
        print("this")
        return (merged_appointments[0][0] + convert_to_timedelta(WORK_START), merged_appointments[0][0])

    # Check if there is an opening between appointments
    for i in range(len(merged_appointments) - 1):
        end_current = merged_appointments[i][1]
        start_next = merged_appointments[i + 1][0]
        if (convert_to_timedelta(start_next) - convert_to_timedelta(end_current)) >= MIN_DURATION:
            return (end_current, end_current + MIN_DURATION)

    # Check if there is an opening after the last appointment
    if merged_appointments and (convert_to_timedelta(WORK_END) - convert_to_timedelta(merged_appointments[-1][1])) >= MIN_DURATION:
        print("that")
        return (merged_appointments[-1][1], merged_appointments + MIN_DURATION)

    return None

In [None]:
def date_time_to_iso(date: datetime) -> str:
    return date.strftime("%Y-%m-%dT%H:%M:%S")

def date_time_to_dateTimeZone(date: datetime) -> Dict:
    return {
        'dateTime': date_time_to_iso(date),
        'timeZone': 'Europe/Amsterdam'
    }

In [20]:
found_time = find_first_opening(unique_events, 60)
print(found_time)

(datetime.datetime(2025, 2, 21, 13, 30), datetime.datetime(2025, 2, 21, 14, 30))


In [None]:
found_time_iso = map(time_json, found_time)
write_event('Meeting found', found_time_iso[0], found_time_iso[1])

NameError: name 'time_json' is not defined