<a href="https://colab.research.google.com/github/NousForFreedom/taxi_call_service_pubsub/blob/main/taxi_user.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install solace-pubsubplus

Collecting solace-pubsubplus
  Downloading solace_pubsubplus-1.7.0-py36-none-manylinux_2_12_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m19.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: solace-pubsubplus
Successfully installed solace-pubsubplus-1.7.0


In [None]:
import os
import platform
import time
import calendar
import math

# Import Solace Python  API modules from the solace package
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, \
            ServiceInterruptionListener, RetryStrategy, ServiceEvent
from solace.messaging.resources.topic import Topic
from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError
from solace.messaging.publisher.request_reply_message_publisher import RequestReplyMessagePublisher

# Import Solace Python  API modules
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, RetryStrategy, ServiceEvent
from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError
from solace.messaging.publisher.direct_message_publisher import PublishFailureListener, FailedPublishEvent
from solace.messaging.resources.topic_subscription import TopicSubscription
from solace.messaging.receiver.message_receiver import MessageHandler
from solace.messaging.config.solace_properties.message_properties import APPLICATION_MESSAGE_ID
from solace.messaging.resources.topic import Topic
from solace.messaging.receiver.inbound_message import InboundMessage



In [None]:
# Broker Config
broker_props = {
  "solace.messaging.transport.host": "tcp://mr-connection-hcnx0u899eb.messaging.solace.cloud:55555",
  "solace.messaging.service.vpn-name": "aiot",
  "solace.messaging.authentication.scheme.basic.username": "",
  "solace.messaging.authentication.scheme.basic.password": "",
}

# **Ride Request**
request/reply

In [None]:
#
# Goal is to demonstrate a requestor (a request-reply  pattern) that will receive reply asynchronously.

if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer

TOPIC_PREFIX = "solace/taxi/python"

name = ""
while not name:
    name = input("Enter your name: ")
unique_name = name.replace(" ", "")

# Inner classes for error handling
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener):
    def on_reconnected(self, e: ServiceEvent):
        print("\non_reconnected")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

    def on_reconnecting(self, e: "ServiceEvent"):
        print("\non_reconnecting")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

    def on_service_interrupted(self, e: "ServiceEvent"):
        print("\non_service_interrupted")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")



# Build A messaging service with a reconnection strategy of 20 retries over an interval of 3 seconds
# Note: The reconnections strategy could also be configured using the broker properties object
messaging_service = MessagingService.builder().from_properties(broker_props)\
                    .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
                    .build()

# Blocking connect thread
messaging_service.connect()
print(f'\nMessaging Service connected? {messaging_service.is_connected}')

# Event Handling for the messaging service
service_handler = ServiceEventHandler()
messaging_service.add_reconnection_listener(service_handler)
messaging_service.add_reconnection_attempt_listener(service_handler)
messaging_service.add_service_interruption_listener(service_handler)

# Create a direct message requestor and register the error handler
direct_requestor: RequestReplyMessagePublisher = messaging_service.request_reply() \
                                                                .create_request_reply_message_publisher_builder() \
                                                                .build()

# Blocking Start thread
direct_requestor.start()
print(f'\nDirect Requestor ready? {direct_requestor.is_ready()}')

# Prepare outbound message payload and body
# request message 보내는 메세지
user_id = "USER12345"
Start_location = "41.40338.2.17403"
destination = "123.52341.76.45786"

# Prepare outbound message payload and body
gmt = time.gmtime()
timestamp = calendar.timegm(gmt)

message_body = f"Timestamp: {timestamp}, USER-ID: {user_id}, StartLocation: {Start_location}, Destination: {destination}"

outbound_msg_builder = messaging_service.message_builder() \
                .with_property("application", "taxi") \
                .with_property("language", "Python")

# Capture the timestamp and use that as message-id
gmt = time.gmtime()
message_id = calendar.timegm(gmt)

print('\nSend a KeyboardInterrupt to stop publishing')
try:
    print(f'============================')
    topic = Topic.of(TOPIC_PREFIX + '/direct/ride_request/'  + f'{user_id}')
    print(f'Publishing to topic:\n{topic}')

    try:
        # Direct publish the message with dynamic headers and payload
        outbound_msg = outbound_msg_builder \
                            .with_application_message_id(f'NEW {message_id}')\
                            .build(f'\n{message_body}')
        print(f'>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
        print(f'')
        print(f'Publishing request (body):' + outbound_msg.get_payload_as_string())
        print(f'\n----------------------------')
        print(f'Publishing message:\n{outbound_msg}')
        publish_async = direct_requestor.publish(request_message=outbound_msg, \
                                                request_destination=topic,
                                                reply_timeout=3000)
        # we can get the reply in the future
        response = publish_async.result()
        print(f'<<<<<<<<<<<<<<<<<<<<<<<<<<<<')
        print(f'\nReceived reply (body):\n' + response.get_payload_as_string())
        print(f'\n----------------------------')
        print(f'Received reply:\n{response}')
        print(f'============================\n')
    except KeyboardInterrupt:
        print('\nInterrupted, disconnecting Messaging Service')
    except PubSubPlusClientError as exception:
        print(f'Received a PubSubPlusClientException: {exception}')
finally:
    print('\nTerminating Requestor')
    direct_requestor.terminate()
    print('\nDisconnecting Messaging Service')
    messaging_service.disconnect()

Enter your name: USER12345

Messaging Service connected? True

Direct Requestor ready? True

Send a KeyboardInterrupt to stop publishing
Publishing to topic:
topic : solace/taxi/python/direct/ride_request/USER12345 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Publishing request (body):
Timestamp: 1719106276, USER-ID: USER12345, StartLocation: 41.40338.2.17403, Destination: 123.52341.76.45786

----------------------------
Publishing message:
ApplicationMessageId:                   NEW 1719106276
Class Of Service:                       COS_1
DeliveryMode:                           DIRECT
DMQ Eligible                            
User Property Map:                      
  Key 'application' (STRING) taxi
  Key 'language' (STRING) Python
Binary Attachment String:               len=108
  0a 54 69 6d 65 73 74 61  6d 70 3a 20 31 37 31 39      .Timesta   mp: 1719
  31 30 36 32 37 36 2c 20  55 53 45 52 2d 49 44 3a      106276,    USER-ID:
  20 55 53 45 52 31 32 33  34 35 2c 20 53 74 61 72       USER123   45, S

### **RideRequestResponse**

In [None]:
from solace.messaging.receiver.request_reply_message_receiver import RequestMessageHandler
# Inner class for handling connection events
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener):
    def on_reconnected(self, e: ServiceEvent):
        print("\non_reconnected")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

    def on_reconnecting(self, e: "ServiceEvent"):
        print("\non_reconnecting")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

    def on_service_interrupted(self, e: "ServiceEvent"):
        print("\non_service_interrupted")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

# Build and connect the messaging service
messaging_service = MessagingService.builder().from_properties(broker_props) \
                                     .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20, 3)) \
                                     .build()
messaging_service.connect()
print(f'Messaging Service connected? {messaging_service.is_connected}')

# Event Handling for the messaging service
service_handler = ServiceEventHandler()
messaging_service.add_reconnection_listener(service_handler)
messaging_service.add_reconnection_attempt_listener(service_handler)
messaging_service.add_service_interruption_listener(service_handler)

# Define the topic to subscribe to for receiving RideRequestResponse messages
user_ride_topic = TOPIC_PREFIX + f'/guaranteed/ride_request_response/{user_id}'
print(f'Subscribing to topic: {user_ride_topic}')

# Create a persistent message receiver to receive messages
direct_receiver = messaging_service.request_reply() \
                                                .create_request_reply_message_receiver_builder() \
                                                .build(TopicSubscription.of(user_ride_topic))
direct_receiver.start()


# Handler for processing received RideRequestResponse messages
class RideRequestResponseHandler(RequestMessageHandler):
    def on_message(self, message: InboundMessage):
        payload = message.get_payload_as_string()
        print(f'Received RideRequestResponse message: {payload}')

        # Process the payload

        details = {part.split(': ')[0]: part.split(': ')[1] for part in payload.split(', ')}
        timestamp = details.get("Timestamp")
        eta = details.get("Estimated Time of Arrival")
        taxi_number = details.get("TaxiNumber")


print(f"\nProcessed Values:\nTimestamp: {timestamp}\nETA: {eta} second \nTaxiNumber: {taxi_number}")

try:
    # Set up the message handler for incoming messages
    message_handler = RideRequestResponseHandler()
    direct_receiver.receive_async(message_handler)

    # Keep the receiver running
    print("\nSend a KeyboardInterrupt to stop receiving\n")


    while True:
        time.sleep(1)

except KeyboardInterrupt:
    print('\nTerminating Receiver')
    direct_receiver.terminate()
    print('Disconnecting Messaging Service')
    messaging_service.disconnect()

print()

Messaging Service connected? True
Subscribing to topic: solace/taxi/python/guaranteed/ride_request_response/USER12345

Processed Values:
Timestamp: 1719106276
ETA: 224 second 
TaxiNumber: 24가5735

Send a KeyboardInterrupt to stop receiving


Terminating Receiver
Disconnecting Messaging Service



### **DropoffCompleteUser**

In [None]:
from solace.messaging.receiver.request_reply_message_receiver import RequestMessageHandler

service_complete = False

# Inner class for handling connection events
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener):
    def on_reconnected(self, e: ServiceEvent):
        print("\non_reconnected")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

    def on_reconnecting(self, e: "ServiceEvent"):
        print("\non_reconnecting")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

    def on_service_interrupted(self, e: "ServiceEvent"):
        print("\non_service_interrupted")
        print(f"Error cause: {e.get_cause()}")
        print(f"Message: {e.get_message()}")

# Build and connect the messaging service
messaging_service = MessagingService.builder().from_properties(broker_props) \
                                     .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20, 3)) \
                                     .build()
messaging_service.connect()
print(f'Messaging Service connected? {messaging_service.is_connected}')

# Event Handling for the messaging service
service_handler = ServiceEventHandler()
messaging_service.add_reconnection_listener(service_handler)
messaging_service.add_reconnection_attempt_listener(service_handler)
messaging_service.add_service_interruption_listener(service_handler)

# Define the topic to receive requests from the platform

request_topic = TOPIC_PREFIX + f'/direct/dropoff_complete_user_request/>'
# Create a request-reply message receiver for receiving requests from the platform
request_reply_receiver = messaging_service.request_reply() \
                                            .create_request_reply_message_receiver_builder() \
                                            .build(TopicSubscription.of(request_topic))

request_reply_receiver.start()

# Handler for processing requests from the platform
class DropoffCompleteUserRequestHandler(RequestMessageHandler):
    def on_message(self, request, replier):
        ask_payload = request.get_payload_as_string()
        print(f'Received DropoffCompleteUser request: {ask_payload}')

        # Check if the driver has arrived safely (example condition: current location == destination)
        current_location = "41.40338, 2.17403"  # Example current location (should match for arrival)
        destination_location = "41.40338, 2.17403"  # Example destination location

        if current_location == destination_location:
            timestamp = str(calendar.timegm(time.gmtime()))
            result = "True"  # Successful arrival
        else:
            timestamp = ""
            result = "False"  # Failed to arrive

        # Build the reply message
        reply_message = messaging_service.message_builder() \
            .with_application_message_id(request.get_application_message_id()) \
            .with_property("Timestamp", timestamp) \
            .with_property("Result", result) \
            .build(f'DropoffCompleteUserReply - {timestamp}: Result {result}')

        # Send the reply back to the platform
        replier.reply(reply_message)
        print(f'Replied with DropoffCompleteUserReply: {result}')


# Register the request handler
request_handler = DropoffCompleteUserRequestHandler()
request_reply_receiver.receive_async(request_handler)

print(f'Listening for DropoffCompleteUser requests on topic: {request_topic}')

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print('\nTerminating Request-Reply Receiver')
    request_reply_receiver.terminate()
    messaging_service.disconnect()

Messaging Service connected? True
Listening for DropoffCompleteUser requests on topic: solace/taxi/python/direct/dropoff_complete_user_request/>
Received DropoffCompleteUser request: Did you arrive safely?
Replied with DropoffCompleteUserReply: True

Terminating Request-Reply Receiver
