In [1]:
from kafka.admin import KafkaAdminClient, NewTopic


admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092", 
    client_id='tracking'
)

In [4]:
import json
import folium
import time
import re
import requests
from google.transit import gtfs_realtime_pb2
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer

api_url = 'https://realtime.hsl.fi/realtime/vehicle-positions/v2/hsl'  # Replace with your API endpoint
KAFKA_SERVER = 'localhost:9092'
TOPIC = 'coordinates'


def fetch_coordinates():
    response = requests.get(api_url)
    if response.status_code == 200:
        # Fetch the GTFS-RT payload
        response = requests.get(api_url)
        payload = response.content

        # Create a GTFS-RT feed message
        feed = gtfs_realtime_pb2.FeedMessage()

        # Parse and decode the payload
        feed.ParseFromString(payload)

        # Create a list to hold the decoded entities
        decoded_entities = []

        # Iterate over the feed entities
        for entity in feed.entity:
            # Convert the entity to a dictionary
            if re.search(r'47/668', entity.id):
                entity_dict = {
                    'id': entity.id,
                    'vehicle': {
                        'trip': {
                            'route_id': entity.vehicle.trip.route_id,
                            'start_time': entity.vehicle.trip.start_time,
                            'start_date': entity.vehicle.trip.start_date
                            # Add other trip attributes as needed
                        },
                        'position': {
                            'latitude': entity.vehicle.position.latitude,
                            'longitude': entity.vehicle.position.longitude,
                            'bearing': entity.vehicle.position.bearing
                            # Add other trip attributes as needed
                        },
                        'position': {
                            'latitude': entity.vehicle.position.latitude,
                            'longitude': entity.vehicle.position.longitude,
                            'bearing': entity.vehicle.position.bearing,
                            'odometer': entity.vehicle.position.odometer,
                            'speed' : entity.vehicle.position.speed
                            # Add other trip attributes as needed
                        },
                        'stop_id': entity.vehicle.stop_id,
                        'current_status': entity.vehicle.current_status,
                        'timestamp': entity.vehicle.timestamp
                    }
                }


                # Append the entity dictionary to the list
                decoded_entities.append(entity_dict)

        # Convert the list of decoded entities to a JSON array
        json_array = json.dumps(entity_dict)
        # print(type(json_array))
        return json_array
    else:
        print(f"Error fetching coordinates: {response.status_code}")
        return None


def produce_kafka_messages():
    producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, 
                             api_version=(0,11,5),
                             value_serializer=lambda x: json.dumps(x).encode('utf-8'))

    while True:
        coordinates = fetch_coordinates()
        if coordinates:
            producer.send(TOPIC, value=coordinates)
            print(f"Message produced: {coordinates}")

        time.sleep(2)  # Wait for 5 seconds before fetching coordinates again

def consume_kafka_messages():
    # Kafka server and topic information
    # Create a Kafka consumer
    consumer = KafkaConsumer(TOPIC,
                             bootstrap_servers=KAFKA_SERVER,
                             api_version=(0, 11, 5),
                             value_deserializer=lambda x: json.loads(x.decode('utf-8')))

    # Consume messages indefinitely
    for message in consumer:
        coordinates = message.value
        print(f"Message consumed: {coordinates}")
        
        # Example: Access individual fields in the coordinates

In [6]:
consume_kafka_messages()

Message consumed: {"id": "vehicle_position_47/668", "vehicle": {"trip": {"route_id": "1016", "start_time": "07:30:00", "start_date": "20240304"}, "position": {"latitude": 60.17913055419922, "longitude": 24.95655059814453, "bearing": 168.0, "odometer": 6801.0, "speed": 8.329999923706055}, "stop_id": "1010110", "current_status": 2, "timestamp": 1709531444}}
Message consumed: {"id": "vehicle_position_47/668", "vehicle": {"trip": {"route_id": "1016", "start_time": "07:30:00", "start_date": "20240304"}, "position": {"latitude": 60.1787223815918, "longitude": 24.95677947998047, "bearing": 162.0, "odometer": 6841.0, "speed": 9.470000267028809}, "stop_id": "1010110", "current_status": 2, "timestamp": 1709531449}}
Message consumed: {"id": "vehicle_position_47/668", "vehicle": {"trip": {"route_id": "1016", "start_time": "07:30:00", "start_date": "20240304"}, "position": {"latitude": 60.178287506103516, "longitude": 24.957056045532227, "bearing": 162.0, "odometer": 6890.0, "speed": 10.23999977111

KeyboardInterrupt: 