# Event-driven online price prediction 

Many companies switching from batch processing to real-time processing to use dynamic data to make more relevant recommendations to customers. Static data are information that changes slowly or rarely – age, gender, job, neighborhood, etc. Dynamic data are information or action based on what’s happening right now – what you’re watching, what you’ve just liked on Instagram, you are searching for a taxi driver using Ubver, etc. Knowing a user’s interests right now will allow your systems to make recommendations much more relevant to them.

In this example, we will demonstrate a sample pipeline for **ride-hailing companies like Bolt or Uber** that can process data from two different microservices—Service A (Driver Availability) and Service B (Ride Demand), send events into ML model, and returns the best possible price to show customers in real-time each time they request a ride. Microservices—Service A, Service B, and Service C (prediction pipeline) communicates through an event-driven architecture where:

- Service A (Driver Availability): Manages driver availability in real-time.
- Service B (Ride Demand): Manages customer ride requests and demand in different regions.
- Service C (Price Prediction Pipeline): Predicts the best possible price for a ride based on driver availability and ride demand.

**Steps:**

1. Service A (Driver Availability) and Service B (Ride Demand) publish events to the same GlassFlow pipeline.
2. GlassFlow transformation function calculates predicted price based on the driver availability and ride demand.
3. The consumed output from the pipeline is visualized in real-time after receiving the price prediction event in Service C.

## Pre-requisites

- Create your free GlassFlow account via the [GlassFlow WebApp](https://app.glassflow.dev).
- Get your [Personal Access Token](https://app.glassflow.dev/profile) to authorize the Python SDK to interact with GlassFlow Cloud.

## Step 1: Install required libraries and import

In [27]:
%pip install "glassflow>=2.0.5" pandas plotly faker


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.1.2[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [34]:
import glassflow
import pandas as pd
import random
import time
from faker import Faker
import plotly.graph_objects as go
from IPython.display import display, clear_output
from collections import defaultdict

## Step 2: Create GlassFlow Pipeline

In [41]:
# Set personal access token from your GlassFlow account
personal_access_token = "XtkeyvM2E3wNgAHQZpsNWh9bgZWFjCJZqNVeAYRZ9Eh7JC23HnKqf4t8zBWagFrnJGytxEM39NneHDSEFJ5wp7sdJfdG8JddW4zZgNRKD5V2VHM5C28W2W7AZ2RNnTf7"


In [18]:
# Create a GlassFlow client
client = glassflow.GlassFlowClient(
    personal_access_token=personal_access_token
)

In [19]:
# Get the space named "ride-hailing" (or create one if no space is found)
list_spaces = client.list_spaces()

space_name = "ride-hailing"
for s in list_spaces.spaces:
    if s["name"] == space_name:
        space = glassflow.Space(
            personal_access_token=client.personal_access_token,
            id=s["id"], 
            name=s["name"]
        )
        break
else:
    space = client.create_space(name=space_name)

print(f"Created space {space.name} with ID: {space.id}")

Created space ride-hailing with ID: e439eaee-fc49-46ed-a428-1601879540ad


### Transformation Function

In [20]:
%pycat transform.py

[0;32mimport[0m [0mpandas[0m [0;32mas[0m [0mpd[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m[0;31m# GlassFlow handler function[0m[0;34m[0m
[0;34m[0m[0;32mdef[0m [0mhandler[0m[0;34m([0m[0mdata[0m[0;34m,[0m [0mlog[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m    [0;34m"""[0m
[0;34m    The GlassFlow handler function processes driver availability and ride demand data,[0m
[0;34m    calculates price predictions, and returns the transformed data.[0m
[0;34m[0m
[0;34m    Parameters:[0m
[0;34m    - data: Incoming event data from both Service A (driver availability) and Service B (ride demand).[0m
[0;34m    - log: Logging object for logging within the pipeline.[0m
[0;34m[0m
[0;34m    Returns:[0m
[0;34m    - A dictionary containing predicted price and other related information.[0m
[0;34m    """[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m    [0;32mtry[0m[0;34m:[0m[0;34m[0m
[0;34m[0m        [0mlog[0m[0;34m.

### Requirements txt

Define external dependencies for the transformation function

In [21]:
with open("requirements.txt") as f:
    requirements_txt = f.read()
print(requirements_txt)

pandas


### Create Pipeline

Create pipeline for the price prediction

In [54]:
pipeline_name = "event-driven-price-prediction"

pipeline = client.create_pipeline(
    name=pipeline_name, 
    transformation_file='transform.py',
    space_id=space.id,
    requirements=requirements_txt
)
print(f"Pipeline created successfully with ID: {pipeline.id}")
print("Pipeline URL on GlassFlow UI to discover %s "% f"https://app.glassflow.dev/pipelines/{pipeline.id}")

Pipeline created successfully with ID: 2146e908-4daa-4c87-9ba0-7348e0e03aee
Pipeline URL on GlassFlow UI to discover https://app.glassflow.dev/pipelines/2146e908-4daa-4c87-9ba0-7348e0e03aee 


## Step 3: Service A and Service B Publishes Events To Pipeline

Both Service A (Driver Availability) and Service B (Ride Demand) will publish their data to the same pipeline.

In [66]:
fake = Faker()

def generate_service_a_events(n):
    """Generate mock driver availability data."""
    events = []
    for _ in range(n):
        event = {
            "event": "driver_availability",
            "region": "downtown",
            "available_drivers": random.randint(5, 20),
            "datetime": fake.date_time_this_month().isoformat()
        }
        events.append(event)
    return events

def generate_service_b_events(n):
    """Generate mock ride demand data."""
    events = []
    for _ in range(n):
        event = {
            "event": "ride_demand",
            "region": "downtown",
            "ride_requests": random.randint(10, 50),
            "datetime": fake.date_time_this_month().isoformat()
        }
        events.append(event)
    return events

# GlassFlow data source to publish events
data_source = pipeline.get_source()

# Generate 20 events each from Service A and Service B
service_a_events = generate_service_a_events(20)
service_b_events = generate_service_b_events(20)

# Alternate sending events from Service A and Service B
for event_a, event_b in zip(service_a_events, service_b_events):
    print(f"Published event from Service A: {event_a}")
    data_source.publish(event_a)
    time.sleep(1)  # Simulate event delay

    print(f"Published event from Service B: {event_b}")
    data_source.publish(event_b)
    time.sleep(1)  # Simulate event delay

print("All events published to the pipeline")


Published event from Service A: {'event': 'driver_availability', 'region': 'downtown', 'available_drivers': 10, 'datetime': '2024-10-17T13:10:20.250304'}
Published event from Service B: {'event': 'ride_demand', 'region': 'downtown', 'ride_requests': 36, 'datetime': '2024-10-22T17:18:50.047031'}
Published event from Service A: {'event': 'driver_availability', 'region': 'downtown', 'available_drivers': 9, 'datetime': '2024-10-01T02:24:19.308502'}
Published event from Service B: {'event': 'ride_demand', 'region': 'downtown', 'ride_requests': 25, 'datetime': '2024-10-03T21:19:04.775947'}
Published event from Service A: {'event': 'driver_availability', 'region': 'downtown', 'available_drivers': 16, 'datetime': '2024-10-03T00:17:47.821989'}
Published event from Service B: {'event': 'ride_demand', 'region': 'downtown', 'ride_requests': 12, 'datetime': '2024-10-19T10:47:59.575978'}
Published event from Service A: {'event': 'driver_availability', 'region': 'downtown', 'available_drivers': 6, 'd

## Step 4: Consume and visualize events from the pipeline

Get pipeline data sink to consume the transformed events from the pipeline.

In [64]:
data_sink = pipeline.get_sink()

In [None]:
processed_events = []

# Set up an empty dataframe to store the real-time data
df_visual = pd.DataFrame(columns=['datetime', 'available_drivers', 'ride_requests', 'predicted_price'])

# Infinite loop to continuously consume events and update the plot
while True:
    # Consume event from the GlassFlow pipeline
    resp = data_sink.consume()
    if resp.status_code == 200:
        event_data = resp.json()
        
        # Extract the predicted price and event details
        predicted_prices = event_data.get('predicted_price', None)
        
        if predicted_prices:
            print(f"Consumed event: {event_data}")
            processed_events.append(event_data)

            # Create a list to store new rows
            new_rows = []

            # Iterate through the array of predicted prices
            for price in predicted_prices:
                new_data = {
                    'datetime': pd.to_datetime(price['datetime']),
                    'available_drivers': price['available_drivers'],
                    'ride_requests': price['ride_requests'],
                    'predicted_price': price['predicted_price']
                }
                new_rows.append(new_data)

            # Append new rows to df_visual using pd.concat
            df_visual = pd.concat([df_visual, pd.DataFrame(new_rows)], ignore_index=True)

            # Clear previous output
            clear_output(wait=True)

            # Plot the updated DataFrame
            fig = go.Figure()

            fig.add_trace(go.Scatter(x=df_visual['datetime'], y=df_visual['available_drivers'],
                                     mode='lines', name='Available Drivers', line=dict(color='blue')))
            fig.add_trace(go.Scatter(x=df_visual['datetime'], y=df_visual['ride_requests'],
                                     mode='lines', name='Ride Requests', line=dict(color='red')))
            fig.add_trace(go.Scatter(x=df_visual['datetime'], y=df_visual['predicted_price'],
                                     mode='lines', name='Price Prediction', line=dict(color='green')))

            fig.update_layout(title="Real-time Price Prediction with Driver Availability and Ride Requests",
                              xaxis_title="Time",
                              yaxis_title="Value",
                              legend_title="Legend")

            # Display the updated plot
            display(fig)

    # Pause for a bit before consuming the next event
    time.sleep(1)

## Monitor the pipeline

Go to the pipeline logs you created and monitor real-time events.

In [62]:
## Explore the pipeline logs on the web-UI 
pipeline_url = f"https://app.glassflow.dev/pipelines/{pipeline.id}/logs"
print(pipeline_url)

https://app.glassflow.dev/pipelines/2146e908-4daa-4c87-9ba0-7348e0e03aee/logs
