In [9]:
import requests
import pytz

import json
from datetime import datetime, time as datetime_time, timedelta
import calendar


In [10]:
class BearerAuth(requests.auth.AuthBase):
    def __init__(self, token):
        self.token = token

    def __call__(self, r):
        r.headers["authorization"] = "Bearer " + self.token
        return r


In [11]:
def make_api_request(stop_id, bearer):
    open_data_api_endpoint = "https://opendata-api.stib-mivb.be/OperationMonitoring/4.0/PassingTimeByPoint/" + stop_id
    headers = {"Accept": "application/json"}

    response = requests.get(open_data_api_endpoint, auth=BearerAuth(bearer))
    
    return response.json()

In [12]:
def parse_waiting_times(json_payload, destination):
    passing_times = json_payload['points'][0]['passingTimes']
    str_expected_arrival_times = [passing_time['expectedArrivalTime'] for passing_time in passing_times if passing_time['destination']['fr']==destination]
    print(str_expected_arrival_times)
    expected_arrival_times = [datetime.fromisoformat(str_expected_arrival_time) for str_expected_arrival_time in str_expected_arrival_times]
    return expected_arrival_times 

In [13]:
def compute_time_diff(start, end):
    if isinstance(start, datetime_time): # convert to datetime
        assert isinstance(end, datetime_time)
        start, end = [datetime.combine(datetime.min, t) for t in [start, end]]
    if start <= end: # e.g., 10:33:26-11:15:49
        return end - start
    else: # end < start e.g., 23:55:00-00:25:00
        end += timedelta(1) # +day
        assert end > start
        return end - start

In [14]:
from datetime import datetime, time as datetime_time, timedelta
import pytz


class TimeUtils:

    @staticmethod
    def get_current_localized_time():
        current_time_utc = datetime.utcnow()
        return pytz.utc.localize(current_time_utc)

    @staticmethod
    def compute_time_diff(start, end):
        if isinstance(start, datetime_time): # convert to datetime
            assert isinstance(end, datetime_time)
            start, end = [datetime.combine(datetime.min, t) for t in [start, end]]
        if start <= end: # e.g., 10:33:26-11:15:49
            return TimeUtils._convert_timedelta(end - start)
        else: # end < start e.g., 23:55:00-00:25:00
            end += timedelta(1) # +day
            assert end > start
            return TimeUtils._convert_timedelta(end - start)

    @staticmethod
    def _convert_timedelta(duration):
        duration_dict = {}
        days, seconds = duration.days, duration.seconds
        duration_dict['days'] = days
        hours = seconds // 3600
        minutes = (seconds % 3600) // 60
        seconds = (seconds % 60)
        duration_dict['hours'] = hours
        duration_dict['minutes'] = minutes
        duration_dict['seconds'] = seconds

        return duration_dict


In [15]:
from dataclasses import dataclass, field
from typing import List, Optional, Any

from dataclasses_json import dataclass_json, config, LetterCase

from datetime import datetime

@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class Message:
    fr: str = ""
    nl: str = ""
        
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class Destination:
    fr: str = ""
    nl: str = ""

@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class PassingTime:
    destination: Optional[Destination] = None
    message: Optional[Message] = None
    line_id: str = ""
    expected_arrival_time: str = ""
    arriving_in_dict: dict = field(init=False, repr=False)
        
    def __post_init__(self):
        self.expected_arrival_time = datetime.fromisoformat(self.expected_arrival_time)
        current_localized_time = TimeUtils.get_current_localized_time()
        self.arriving_in_dict = TimeUtils.compute_time_diff(current_localized_time, self.expected_arrival_time)


    def __str__(self):
        return f'{self.arriving_in_dict}'

        

@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class PointPassingTimes:
    passing_times: List[PassingTime] = field(default_factory=list)
    point_id: str = ""



In [16]:
legrand_stop_id = "1059"
bearer = '37ea5c32ad393fb55e387ebb72f17175'
response = make_api_request(legrand_stop_id, bearer)
response

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [None]:
passages = PointPassingTimes.schema().load(response['points'], many=True) 

passages

In [None]:
expected_arrival_times = parse_waiting_times(response, 'ROODEBEEK' )

current_time_utc = datetime.utcnow()
current_time_be = pytz.utc.localize(current_time_utc)
tram_arrival_time = expected_arrival_times[0]

waiting_time = compute_time_diff(current_time_be, tram_arrival_time)
print(waiting_time)


In [None]:
def convert_timedelta(duration):
    days, seconds = duration.days, duration.seconds
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    seconds = (seconds % 60)
    return days, hours, minutes, seconds

In [None]:
formatted_waiting_time = convert_timedelta(waiting_time)

In [None]:
formatted_waiting_time

In [None]:
def compute_token_expiration_date(token_validity_time):
    now = datetime.utcnow()
    future = now + timedelta(seconds=token_validity_time)
    return calendar.timegm(future.utctimetuple())

In [None]:
token_expiration_date = compute_token_expiration_date(10)

In [None]:
def is_token_expired(token_expiration_date):
    current_time = datetime.now()
    unix_timestamp = current_time.timestamp()
    return unix_timestamp > token_expiration_date

In [None]:
test = is_token_expired(token_expiration_date)
test

In [None]:
def get_shapefiles()
    open_data_api_endpoint = "https://opendata-api.stib-mivb.be/Files/2.0/Shapefiles" + stop_id
    headers = {"Accept": "application/json"}

    response = requests.get(open_data_api_endpoint, auth=BearerAuth(bearer))
    
    return response.json()

In [None]:
a = [1,2]
a[:1]

In [None]:
import gettext