# Demo - Online data sources

In [1]:
from IPython.display import HTML, display

import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)

## Create simplified demo dataset
Based on some parquet files

In [2]:
from libs.helpers.utils_db import query_duckdb

query_duckdb("SELECT id, first_name, last_name FROM '../../datasets/users.parquet' ORDER BY id LIMIT 10")

Unnamed: 0,id,first_name,last_name
0,1,VIOLET,HONEYCUTT
1,2,AMANDA,MAGANA
2,3,AUSTIN,PATEL
3,4,JAMES,BROWN
4,5,RANDY,MANESS
5,6,JUDY,FRYER
6,7,SHARON,FANE
7,8,ASHTON,MCLAMB
8,9,HEIDI,BOGART
9,10,PATRICK,BERRY


In [3]:
query_duckdb("SELECT * FROM '../../datasets/order_items.parquet' WHERE created_at < '2020-01-01' and status='Complete' LIMIT 10")

Unnamed: 0,id,order_id,user_id,inventory_item_id,sale_price,status,created_at,returned_at,shipped_at,delivered_at
0,1603,1602,1142,31309,9.99,Complete,2019-01-20 19:01:26,NaT,2019-01-24,2019-01-28
1,1619,1618,1156,31317,59.950001,Complete,2019-01-20 22:12:04,NaT,2019-01-24,2019-01-28
2,1643,1642,1178,31329,12.99,Complete,2019-01-21 05:32:17,NaT,2019-01-24,2019-01-28
3,1683,1682,1205,35399,8.35,Complete,2019-01-21 20:41:01,NaT,2019-01-24,2019-01-28
4,1731,1730,1238,35424,58.0,Complete,2019-01-22 15:35:23,NaT,2019-01-24,2019-01-28
5,1760,1759,1261,39840,25.0,Complete,2019-01-22 21:54:05,NaT,2019-01-24,2019-01-28
6,1776,1775,1273,39848,64.0,Complete,2019-01-22 18:25:38,NaT,2019-01-24,2019-01-28
7,1625,1624,1161,31320,37.990002,Complete,2019-01-21 20:54:30,NaT,2019-01-24,2019-01-29
8,1648,1647,1181,31332,25.0,Complete,2019-01-21 18:15:59,NaT,2019-01-24,2019-01-29
9,1652,1651,1185,35383,7.99,Complete,2019-01-21 18:42:22,NaT,2019-01-24,2019-01-29


In [4]:
query_duckdb("""
    CREATE OR REPLACE VIEW dim_customers AS
    SELECT user_id as customer_id, max(email) as email, max(order_id) as last_order_id 
    FROM '../../datasets/order_items.parquet' o 
    LEFT JOIN '../../datasets/users.parquet' u ON u.id=o.user_id
    WHERE o.created_at < '2021-01-01' and status='Complete' and user_id<100
    GROUP BY user_id 
    ORDER BY user_id 
""")
query_duckdb("SELECT * FROM dim_customers LIMIT 10")

Unnamed: 0,customer_id,email,last_order_id
0,1,vhoneycutt@gmail.com,1
1,2,amagana@aol.com,58407
2,3,austinpatel@yahoo.com,492020
3,4,jamesbrown@gmail.com,451687
4,5,randymaness@yahoo.com,271134
5,6,judyfryer@yahoo.com,506282
6,7,sharonfane@yahoo.com,394992
7,8,ashtonmclamb@gmail.com,13
8,9,heidibogart@yahoo.com,513209
9,10,pberry@gmail.com,354482


In [5]:
query_duckdb("""
    CREATE OR REPLACE VIEW dim_orders AS
    SELECT order_id, MAX(user_id) as customer_id, max(created_at)::DATE::VARCHAR as order_date, sum(sale_price) as price 
    FROM '../../datasets/order_items.parquet' 
    WHERE created_at < '2021-01-01' and status='Complete' and user_id<100
    GROUP BY order_id 
    ORDER BY order_id 
""")
query_duckdb("SELECT * FROM dim_orders LIMIT 10")

Unnamed: 0,order_id,customer_id,order_date,price
0,1,1,2019-01-01,19.99
1,3,2,2019-01-01,7.99
2,5,3,2019-01-01,39.990002
3,7,4,2019-01-01,44.990002
4,9,5,2019-01-01,39.990002
5,11,6,2019-01-01,13.65
6,13,8,2019-01-01,75.110001
7,15,9,2019-01-01,9.99
8,17,10,2019-01-01,64.0
9,19,12,2019-01-01,49.990002


In [6]:
query_duckdb("""
    CREATE OR REPLACE VIEW customers_view AS
    SELECT 
        c.customer_id,
        MAX(o.order_id) as last_order_id,
        COALESCE(
            to_json(
                list({
                    'order_id': o.order_id,
                    'order_date': o.order_date,
                    'price': o.price
                } ORDER BY o.order_date)
            )::VARCHAR,
            '[]'::VARCHAR
        ) as orders_json
    FROM dim_customers c
    LEFT JOIN dim_orders o ON c.customer_id = o.customer_id
    GROUP BY c.customer_id
""")
query_duckdb("SELECT * FROM customers_view LIMIT 10")

Unnamed: 0,customer_id,last_order_id,orders_json
0,1,1,"[{""order_id"":1,""order_date"":""2019-01-01"",""price"":19.989999771118164}]"
1,2,58407,"[{""order_id"":3,""order_date"":""2019-01-01"",""price"":7.989999771118164},{""order_id"":58407,""order_date"":""2019-01-17"",""price"":12.739999771118164}]"
2,3,492020,"[{""order_id"":5,""order_date"":""2019-01-01"",""price"":39.9900016784668},{""order_id"":245362,""order_date"":""2019-02-21"",""price"":73.98999786376953},{""order_id"":303650,""order_date"":""2019-04-04"",""price"":12.989999771118164},{""order_id"":470869,""order_date"":""2019-11-17"",""price"":16.989999771118164},{""order_id"":492020,""order_date"":""2020-01-22"",""price"":9.949999809265137}]"
3,4,451687,"[{""order_id"":7,""order_date"":""2019-01-01"",""price"":44.9900016784668},{""order_id"":245368,""order_date"":""2019-02-15"",""price"":65.0},{""order_id"":394972,""order_date"":""2019-07-02"",""price"":60.0},{""order_id"":451687,""order_date"":""2019-10-09"",""price"":60.0}]"
4,5,271134,"[{""order_id"":9,""order_date"":""2019-01-01"",""price"":39.9900016784668},{""order_id"":271134,""order_date"":""2019-04-17"",""price"":44.400001525878906}]"
5,6,506282,"[{""order_id"":11,""order_date"":""2019-01-01"",""price"":13.649999618530273},{""order_id"":462060,""order_date"":""2019-11-22"",""price"":54.0},{""order_id"":506282,""order_date"":""2020-05-01"",""price"":109.98999786376953}]"
6,7,394992,"[{""order_id"":394992,""order_date"":""2019-08-16"",""price"":75.11000061035156}]"
7,8,13,"[{""order_id"":13,""order_date"":""2019-01-01"",""price"":75.11000061035156}]"
8,9,513209,"[{""order_id"":15,""order_date"":""2019-01-01"",""price"":9.989999771118164},{""order_id"":354480,""order_date"":""2019-07-11"",""price"":19.989999771118164},{""order_id"":411629,""order_date"":""2019-09-08"",""price"":15.279999732971191},{""order_id"":492038,""order_date"":""2020-02-04"",""price"":36.9900016784668},{""order_id"":513209,""order_date"":""2020-05-15"",""price"":44.0}]"
9,10,354482,"[{""order_id"":17,""order_date"":""2019-01-01"",""price"":64.0},{""order_id"":354482,""order_date"":""2019-05-26"",""price"":120.0}]"


## Setup connexion and import data

### Connect to redis from the notebook

In [7]:
# !pip install redis

In [8]:
import redis
import json
import pandas as pd
from libs.helpers.utils import get_redis_connection_config, get_postgres_connection_string, get_ml_service_config

# Connect to Redis
# Using Redis on port 6380 for this demo to avoid conflicts
redis_config = get_redis_connection_config(port=6380)
r = redis.Redis(**redis_config, decode_responses=True)

# Clear any existing data (optional)
r.flushdb()

df = query_duckdb("SELECT * FROM dim_customers")
for record in df.to_dict('records'):
    # Store as JSON string
    r.set(f'customer:{record['customer_id']}', json.dumps(record))
    # Alternative: Store as Redis hash
    r.hset(f'customer_hash:{record['customer_id']}', mapping=record)

df = query_duckdb("SELECT * FROM dim_orders")
for record in df.to_dict('records'):
    # Store as JSON string
    r.set(f'order:{record['order_id']}', json.dumps(record))

In [9]:
def return_redis_data(keys):
    data = {}
    for key in keys:
        key_type = r.type(key)
        
        if key_type == 'string':
            data[key] = r.get(key)
        elif key_type == 'hash':
            data[key] = r.hgetall(key)
        elif key_type == 'list':
            data[key] = r.lrange(key, 0, -1)
        elif key_type == 'set':
            data[key] = r.smembers(key)
        elif key_type == 'zset':
            data[key] = r.zrange(key, 0, -1, withscores=True)
        else:
            data[key] = f"Unknown type: {key_type}"
    return data

print(f"{len(return_redis_data(r.keys('customer:*')))} customers")
print(f"{len(return_redis_data(r.keys('customer_hash:*')))} customers (hash)")
print(f"{len(return_redis_data(r.keys('order:*')))} orders")

86 customers
86 customers (hash)


204 orders


### Connect to postgres from the notebook

In [10]:
# !pip install psycopg2-binary pandas sqlalchemy

In [11]:
import pandas as pd
from sqlalchemy import create_engine, text
import numpy as np
from datetime import datetime, timedelta
import random

# Create connection string
connection_string = get_postgres_connection_string()
engine_pg = create_engine(connection_string)

with engine_pg.connect() as conn:
    # Drop existing tables
    conn.execute(text("DROP TABLE IF EXISTS orders CASCADE"))
    conn.execute(text("DROP TABLE IF EXISTS customers CASCADE"))
    conn.execute(text("DROP TABLE IF EXISTS customers_view CASCADE"))
    
    # Create customers table
    conn.execute(text("""
        CREATE TABLE customers (
            customer_id BIGINT PRIMARY KEY,
            email VARCHAR(50),
            last_order_id BIGINT
        )
    """))
    
    # Create orders table
    conn.execute(text("""
        CREATE TABLE orders (
            order_id BIGINT PRIMARY KEY,
            customer_id BIGINT REFERENCES customers(customer_id),
            order_date TIMESTAMP,
            price DECIMAL(10,2)
        )
    """))
    
    conn.commit()

# Insert customers
df = query_duckdb("SELECT * FROM dim_customers")
with engine_pg.connect() as conn:
    for record in df.to_dict('records'):
        conn.execute(text("""
            INSERT INTO customers (customer_id, email, last_order_id)
            VALUES (:customer_id, :email, :last_order_id)
        """), record)
    conn.commit()

# Insert orders
df = query_duckdb("SELECT * FROM dim_orders")
with engine_pg.connect() as conn:
    for record in df.to_dict('records'):
        conn.execute(text("""
            INSERT INTO orders (order_id, customer_id, order_date, price)
            VALUES (:order_id, :customer_id, :order_date, :price)
        """), record)
    conn.commit()

# Create customers view
with engine_pg.connect() as conn:
    conn.execute(text("""
        CREATE OR REPLACE VIEW customers_view AS
        SELECT 
            c.customer_id,
            MAX(o.order_id) as last_order_id,
            COALESCE(
                json_agg(
                    json_build_object(
                        'order_id', o.order_id,
                        'order_date', o.order_date,
                        'price', o.price
                    ) ORDER BY o.order_date
                )::VARCHAR,
                '[]'::VARCHAR
            ) as orders_json
        FROM customers c
        LEFT JOIN orders o ON c.customer_id = o.customer_id
        GROUP BY c.customer_id
    """))
    conn.commit()

# Verify the results
print("Customers table:")
result = pd.read_sql_query("SELECT * FROM customers ORDER BY customer_id LIMIT 10", engine_pg)
print(result)

print("\nOrders table:")
result = pd.read_sql_query("SELECT * FROM orders ORDER BY order_id, order_date LIMIT 10", engine_pg)
print(result)

print("\nCustomers view with aggregated orders:")
result = pd.read_sql_query("SELECT * FROM customers_view ORDER BY customer_id LIMIT 10", engine_pg)
print(result)

Customers table:
   customer_id                   email  last_order_id
0            1    vhoneycutt@gmail.com              1
1            2         amagana@aol.com          58407
2            3   austinpatel@yahoo.com         492020
3            4    jamesbrown@gmail.com         451687
4            5   randymaness@yahoo.com         271134
5            6     judyfryer@yahoo.com         506282
6            7    sharonfane@yahoo.com         394992
7            8  ashtonmclamb@gmail.com             13
8            9   heidibogart@yahoo.com         513209
9           10        pberry@gmail.com         354482

Orders table:
   order_id  customer_id order_date  price
0         1            1 2019-01-01  19.99
1         3            2 2019-01-01   7.99
2         5            3 2019-01-01  39.99
3         7            4 2019-01-01  44.99
4         9            5 2019-01-01  39.99
5        11            6 2019-01-01  13.65
6        13            8 2019-01-01  75.11
7        15            9 2019-

### Connect to http (internal FastAPI) from the notebook

In [12]:
import requests

response = requests.get(
    get_ml_service_config()['hello_endpoint'], 
    params={'name': 'Alice'},
    timeout=30, 
    verify=False
)

if response.status_code == 200:
    print(response.text)
else:
    print(f'A non HTTP 200 response occured: {response=}: {response.text}')

Hello Alice!


## Create FeatureMesh features to access these data

In [13]:
import featuremesh
%reload_ext featuremesh

from libs.helpers.utils import get_featuremesh_config, get_redis_connection_string

__ACCESS_TOKEN__ = get_featuremesh_config()['access_token']

client_online = featuremesh.OnlineClient(access_token=__ACCESS_TOKEN__)
featuremesh.set_default('client', client_online)

## For redis

### Create the source feature

In [14]:
# Using Redis on port 6380 for this demo
redis_conn_2 = get_redis_connection_string(port=6380)
print(f"Using Redis connection: {redis_conn_2}")

Using Redis connection: redis://host.docker.internal:6380


In [15]:
%%featureql
CREATE OR REPLACE FEATURE FM.DEMO1.REDIS_SRC AS 
    SOURCE_REDIS('redis://host.docker.internal:6380' WITH (timeout='500ms', key_patterns=ARRAY['customer_hash:*']));

Unnamed: 0,FEATURE_NAME,STATUS,MESSAGE
0,FM.DEMO1.REDIS_SRC,REPLACED,Feature was replaced


In [16]:
%%featureql

SHOW FEATURES (INCLUDE ('PROPERTIES')) WHERE NAME='FM.DEMO1.REDIS_SRC'

Unnamed: 0,NAME,DATATYPE,FUNCTION,INPUTS,FORMULA,RESTRICTIONS,STATUS,META,SIGNATURE,POSITION,DEPENDENCIES,CREATED_AT,CREATED_BY,UPDATED_AT,UPDATED_BY,PROPERTIES
0,FM.DEMO1.REDIS_SRC,REDISSOURCE,SOURCE_REDIS,[],"SOURCE_REDIS( json_object( 'function', 'SOURCE_REDIS' , 'uri', 'redis://host.docker.internal:6380' , 'timeout', '500ms' , 'key_patterns', ARRAY['customer_hash:*'}] ))","[""ONLINE""]",DEV,{},SOURCE_REDIS.custom.REDISSOURCE,50,"{""FM.DEMO1.REDIS_SRC"": 2, ""UNNAMED_FEATURE_Z21J8F"": 1}",2025-11-12 14:39:00.820000,-- None --,2025-11-12 14:39:00.825000,3cd6f8dd-bfdf-44ee-87d3-4b0c9a44cb2f,"{""config_as_json"": ""{\""function\"":\""SOURCE_REDIS\"",\""uri\"":\""redis://host.docker.internal:6380\"",\""timeout\"":\""500ms\"",\""key_patterns\"":[\""customer_hash:*\""]}""}"


### Read data (Mode call string)

In [17]:
%%featureql

WITH 
    CUSTOMER_ID := INPUT(BIGINT),
    CUSTOMER_DETAILS_JSON := EXTERNAL_REDIS(KEY 'customer:' || CUSTOMER_ID::VARCHAR FROM FM.DEMO1.REDIS_SRC),
    CUSTOMER_DETAILS := JSON_PARSE_AS(CUSTOMER_DETAILS_JSON, TYPE 'ROW(customer_id BIGINT, email VARCHAR, last_order_id BIGINT)'),
SELECT
    CUSTOMER_ID := BIND_TABLE(ARRAY[1,3,5,7]),
    LAST_ORDER_ID := CUSTOMER_DETAILS[last_order_id]
ORDER BY CUSTOMER_ID;

Unnamed: 0,CUSTOMER_ID,LAST_ORDER_ID
0,1,1
1,3,492020
2,5,271134
3,7,394992


### Read data (Mode call hash)

In [18]:
%%featureql

WITH 
    CUSTOMER_ID := INPUT(BIGINT)
SELECT
    CUSTOMER_ID := BIND_TABLE(ARRAY[1,3,5,7]),
    LAST_ORDER_ID := EXTERNAL_REDIS(KEY 'customer_hash:' || CUSTOMER_ID::VARCHAR FIELD 'last_order_id' FROM FM.DEMO1.REDIS_SRC)
ORDER BY CUSTOMER_ID;

Unnamed: 0,CUSTOMER_ID,LAST_ORDER_ID
0,1,1
1,3,492020
2,5,271134
3,7,394992


### Read data (Mode table)

#### Joining on an attribute
**Attention!** It is quite inefficient because the join is on one value of the hash

In [19]:
%%featureql

WITH 
    CUSTOMER_ID := INPUT(BIGINT),
    REDIS_VIEW := EXTERNAL_VIEW(
        `SELECT customer_id, last_order_id FROM %FM.DEMO1.REDIS_SRC[customer_hash:*]`
        ON `SELF.customer_id=%CUSTOMER_ID`
        AS ROW(customer_id BIGINT, last_order_id VARCHAR)
    ),
SELECT
    CUSTOMER_ID := BIND_TABLE(ARRAY[1,2,3,4]),
    LAST_ORDER_ID := REDIS_VIEW[last_order_id],
ORDER BY CUSTOMER_ID;

*** Error(s) occurred! ***
Error code: INVALID_RESPONSE
Error message: Translation failed: no data in response
Error context: {'errors': [{'message': 'Please use metaprogramming instructions instead of this.', 'code': 'UE/ARGUMENT-SHOULD-BE-LITERAL'}]}


#### Joining on the key
Best efficiency but need to understand how the keys are structured

In [20]:
%%featureql

WITH 
    CUSTOMER_ID := INPUT(BIGINT),
    CUSTOMER_ID_AS_KEY := 'customer_hash:' || CUSTOMER_ID::VARCHAR,
    REDIS_VIEW := EXTERNAL_VIEW(
        `SELECT key, customer_id, last_order_id FROM %FM.DEMO1.REDIS_SRC[customer_hash:*]`  /* We need the "key" here */
        ON `SELF.key='customer_hash:' || %CUSTOMER_ID`
        AS ROW(key VARCHAR, customer_id BIGINT, last_order_id BIGINT)
    ),
SELECT
    CUSTOMER_ID := BIND_TABLE(ARRAY[1,2,3,4]),
    LAST_ORDER_ID := REDIS_VIEW[last_order_id]
ORDER BY CUSTOMER_ID;

*** Error(s) occurred! ***
Error code: INVALID_RESPONSE
Error message: Translation failed: no data in response
Error context: {'errors': [{'message': 'Please use metaprogramming instructions instead of this.', 'code': 'UE/ARGUMENT-SHOULD-BE-LITERAL'}]}


## For postgres

In [21]:
connection_string = get_postgres_connection_string()
engine_pg = create_engine(connection_string)

result = pd.read_sql_query("SELECT * FROM customers LIMIT 10", engine_pg)
print(result)

engine_pg.dispose()

   customer_id                   email  last_order_id
0            1    vhoneycutt@gmail.com              1
1            2         amagana@aol.com          58407
2            3   austinpatel@yahoo.com         492020
3            4    jamesbrown@gmail.com         451687
4            5   randymaness@yahoo.com         271134
5            6     judyfryer@yahoo.com         506282
6            7    sharonfane@yahoo.com         394992
7            8  ashtonmclamb@gmail.com             13
8            9   heidibogart@yahoo.com         513209
9           10        pberry@gmail.com         354482


In [22]:
# secrets.register('my_dsn_1', get_postgres_connection_string())

In [23]:
postgres_conn = get_postgres_connection_string()
print(f"Using PostgreSQL connection: {postgres_conn}")

Using PostgreSQL connection: postgresql://featuremesh:featuremesh@host.docker.internal:5433/featuremesh?sslmode=disable


In [24]:
%%featureql
CREATE OR REPLACE FEATURE FM.DEMO1.POSTGRES_CONN AS 
    SOURCE_JDBC(
        'postgresql://featuremesh:featuremesh@host.docker.internal:5433/featuremesh?sslmode=disable'  -- DSN
        WITH (
            tables = ARRAY['customers_view'],  -- Array of tables that can be exposed
            timeout='500ms'
        )
    );

Unnamed: 0,FEATURE_NAME,STATUS,MESSAGE
0,FM.DEMO1.POSTGRES_CONN,REPLACED,Feature was replaced


In [25]:
%%featureql

WITH
    CUSTOMER_ID := INPUT(BIGINT),
    POSTGRES_SRC := EXTERNAL_VIEW(
        `SELECT customer_id, last_order_id, orders_json FROM %FM.DEMO1.POSTGRES_CONN[customers_view]`
        ON `SELF.customer_id=%CUSTOMER_ID`
        AS ROW(customer_id BIGINT, last_order_id BIGINT, orders_json VARCHAR)
    ),
    ORDERS_JSON := POSTGRES_SRC[orders_json],
SELECT 
    CUSTOMER_ID := BIND_TABLE(ARRAY[1,2,4]),
    LAST_ORDER_ID := POSTGRES_SRC[last_order_id],
    ORDERS_ARRAY_OF_ROWS := json_parse_as(ORDERS_JSON, TYPE 'ARRAY(ROW(order_id BIGINT, order_date VARCHAR, price FLOAT)'),
    -- ORDERS_ARRAY_OF_ROWS[order_id] -- TODO: accessors on array of rows in datafusion
;

Unnamed: 0,CUSTOMER_ID,LAST_ORDER_ID,ORDERS_ARRAY_OF_ROWS
0,1,1,"[{'order_id': 1, 'order_date': '2019-01-01T00:00:00', 'price': 19.99}]"
1,2,58407,"[{'order_id': 3, 'order_date': '2019-01-01T00:00:00', 'price': 7.99}, {'order_id': 58407, 'order_date': '2019-01-17T00:00:00', 'price': 12.74}]"
2,4,451687,"[{'order_id': 7, 'order_date': '2019-01-01T00:00:00', 'price': 44.99}, {'order_id': 245368, 'order_date': '2019-02-15T00:00:00', 'price': 65.0}, {'order_id': 394972, 'order_date': '2019-07-02T00:00:00', 'price': 60.0}, {'order_id': 451687, 'order_date': '2019-10-09T00:00:00', 'price': 60.0}]"


## For HTTP

### Read data (Mode call)

In [26]:
ml_hello_endpoint = get_ml_service_config()['hello_endpoint']
print(f"Using ML hello endpoint: {ml_hello_endpoint}")

Using ML hello endpoint: http://host.docker.internal:8010/hello


In [27]:
%%featureql
CREATE OR REPLACE FEATURES AS 
SELECT 
    FM.DEMO1.HTTP_SOURCE1 := SOURCE_HTTP('http://host.docker.internal:8010/hello' WITH (
        query_params=ROW('default1' AS "name"),
        timeout='500ms'
    ))
;

Unnamed: 0,FEATURE_NAME,STATUS,MESSAGE
0,FM.DEMO1.HTTP_SOURCE1,REPLACED,Feature was replaced


In [28]:
%%featureql

WITH 
    FIRSTNAME := INPUT(VARCHAR),
SELECT 
    FIRSTNAME := BIND_TABLE(ARRAY['Alice', 'Bob', 'Charlie']),
    HTTP_RESPONSE := EXTERNAL_HTTP(FROM FM.DEMO1.HTTP_SOURCE1 WITH (
        query_params=ROW(FIRSTNAME AS name)
    ))
;

Unnamed: 0,FIRSTNAME,HTTP_RESPONSE
0,Alice,Hello Alice!
1,Bob,Hello Bob!
2,Charlie,Hello Charlie!


## Prepared statement with federated query

We want to know the amount of the last order for each customer:
- The details on customers are in postgres
- The details on orders are in redis
  
We then need to perform a join between two different systems.

### Prototyping of features in "free form" mode

In [29]:
%%featureql 

WITH 
    -- The details on orders are in redis
    ORDERS := ENTITY(),
    ORDER_ID := INPUT(BIGINT#ORDERS),
    REDIS_KEY := 'order:' || ORDER_ID::VARCHAR,
    ORDER_DETAILS_STR := EXTERNAL_REDIS(KEY REDIS_KEY FROM FM.DEMO1.REDIS_SRC),
    ORDER_DETAILS := JSON_PARSE_AS(ORDER_DETAILS_STR, TYPE 'ROW(order_id BIGINT, price FLOAT)'), -- VARCHAR TO ROW

    -- The details on customers are in postgres
    CUSTOMERS := ENTITY(),
    CUSTOMER_ID := INPUT(BIGINT#CUSTOMERS),
    CUSTOMER_DETAILS := EXTERNAL_VIEW(
        `SELECT customer_id, last_order_id FROM %FM.DEMO1.POSTGRES_CONN[customers_view]`
        ON `SELF.customer_id=%CUSTOMER_ID`
        AS ROW(customer_id BIGINT, last_order_id BIGINT#ORDERS)
    ),
    LAST_ORDER_ID := CUSTOMER_DETAILS[last_order_id],

    -- Join between the two
    LAST_ORDER_JOIN := ADD_FIELDS(
        ORDER_DETAILS as order_details
        TO ROW(LAST_ORDER_ID as last_order_id)
        BINDING FIELD last_order_id WITH ORDER_ID -- TODO: Persistence without explicit binding raises an error
    ),
    LAST_ORDER_PRICE := LAST_ORDER_JOIN[order_details][price]

SELECT 
    CUSTOMER_ID := BIND_TABLE(ARRAY[1,2,4]),
    LAST_ORDER_PRICE
ORDER BY CUSTOMER_ID;

Unnamed: 0,CUSTOMER_ID,LAST_ORDER_PRICE
0,1,19.99
1,2,12.74
2,4,60.0


### Persist definitions of features

In [30]:
%%featureql 

CREATE OR REPLACE FEATURES AS 
SELECT 
    -- The details on orders are in redis
    FM.DEMO1.ORDERS := ENTITY(),
    FM.DEMO1.ORDER_ID := INPUT(BIGINT#FM.DEMO1.ORDERS),
    FM.DEMO1.REDIS_KEY := 'order:' || FM.DEMO1.ORDER_ID::VARCHAR,
    FM.DEMO1.ORDER_DETAILS_STR := EXTERNAL_REDIS(KEY FM.DEMO1.REDIS_KEY FROM FM.DEMO1.REDIS_SRC),
    FM.DEMO1.ORDER_DETAILS := JSON_PARSE_AS(FM.DEMO1.ORDER_DETAILS_STR, TYPE 'ROW(order_id BIGINT, price FLOAT)'), -- VARCHAR TO ROW

    -- The details on customers are in postgres
    FM.DEMO1.CUSTOMERS := ENTITY(),
    FM.DEMO1.CUSTOMER_ID := INPUT(BIGINT#CUSTOMERS),
    FM.DEMO1.CUSTOMER_DETAILS := EXTERNAL_VIEW(
        `SELECT customer_id, last_order_id FROM %FM.DEMO1.POSTGRES_CONN[customers_view]`
        ON `SELF.customer_id=%FM.DEMO1.CUSTOMER_ID`
        AS ROW(customer_id BIGINT, last_order_id BIGINT#FM.DEMO1.ORDERS)
    ),
    FM.DEMO1.LAST_ORDER_ID := FM.DEMO1.CUSTOMER_DETAILS[last_order_id],

    -- Join between the two
    FM.DEMO1.LAST_ORDER_JOIN := ADD_FIELDS(
        FM.DEMO1.ORDER_DETAILS as order_details
        TO ROW(FM.DEMO1.LAST_ORDER_ID as last_order_id)
        BINDING FIELD last_order_id WITH FM.DEMO1.ORDER_ID -- TODO: Persistence without explicit binding raises an error
    ),
    FM.DEMO1.LAST_ORDER_PRICE := FM.DEMO1.LAST_ORDER_JOIN[order_details][price];

Unnamed: 0,FEATURE_NAME,STATUS,MESSAGE
0,FM.DEMO1.ORDERS,REPLACED,Feature was replaced
1,FM.DEMO1.ORDER_ID,REPLACED,Feature was replaced
2,FM.DEMO1.REDIS_KEY,REPLACED,Feature was replaced
3,FM.DEMO1.ORDER_DETAILS_STR,REPLACED,Feature was replaced
4,FM.DEMO1.ORDER_DETAILS,REPLACED,Feature was replaced
5,FM.DEMO1.CUSTOMERS,REPLACED,Feature was replaced
6,FM.DEMO1.CUSTOMER_ID,REPLACED,Feature was replaced
7,FM.DEMO1.CUSTOMER_DETAILS,REPLACED,Feature was replaced
8,FM.DEMO1.LAST_ORDER_ID,REPLACED,Feature was replaced
9,FM.DEMO1.LAST_ORDER_JOIN,REPLACED,Feature was replaced


### See what we get

#### Data at customer_id granularity (postgres)

In [31]:
%%featureql 

SELECT
    FM.DEMO1.CUSTOMER_ID := BIND_TABLE(ARRAY[1,2,4]),
    FM.DEMO1.CUSTOMER_DETAILS,
    FM.DEMO1.LAST_ORDER_ID;

Unnamed: 0,FM__DEMO1__CUSTOMER_ID,FM__DEMO1__CUSTOMER_DETAILS,FM__DEMO1__LAST_ORDER_ID
0,4,"{'customer_id': 4, 'last_order_id': 451687}",451687
1,2,"{'customer_id': 2, 'last_order_id': 58407}",58407
2,1,"{'customer_id': 1, 'last_order_id': 1}",1


#### Data at order_id granularity (redis)

In [32]:
%%featureql 

SELECT 
    FM.DEMO1.ORDER_ID := BIND_TABLE(ARRAY[1,58407,451687]),
    FM.DEMO1.ORDER_DETAILS,
    ORDER_PRICE := FM.DEMO1.ORDER_DETAILS[price];

Unnamed: 0,FM__DEMO1__ORDER_ID,FM__DEMO1__ORDER_DETAILS,ORDER_PRICE
0,1,"{'order_id': 1, 'price': 19.99}",19.99
1,58407,"{'order_id': 58407, 'price': 12.74}",12.74
2,451687,"{'order_id': 451687, 'price': 60.0}",60.0


#### Data with the join customer_id <-> order_id

In [33]:
%%featureql

SELECT 
    FM.DEMO1.CUSTOMER_ID := BIND_TABLE(ARRAY[1,2,4]),
    FM.DEMO1.LAST_ORDER_PRICE
ORDER BY FM.DEMO1.CUSTOMER_ID;

Unnamed: 0,FM__DEMO1__CUSTOMER_ID,FM__DEMO1__LAST_ORDER_PRICE
0,1,19.99
1,2,12.74
2,4,60.0


### Transform the "last order price" feature as a prepared statement

In [34]:
%%featureql

CREATE OR REPLACE FEATURES AS
SELECT 
    FM.DEMO1.LAST_ORDER_PRICE_PS := PREPARED_STATEMENT(
        FM.DEMO1.LAST_ORDER_PRICE
        USING INPUTS FM.DEMO1.CUSTOMER_ID
    );

Unnamed: 0,FEATURE_NAME,STATUS,MESSAGE
0,FM.DEMO1.LAST_ORDER_PRICE_PS,REPLACED,Feature was replaced


### Call the prepared statement

In [35]:
import requests
import json

def call_prepared_statement(feature_name: str, ids: list[int]) -> str:

    # This code is going to be registered as a stored procedure in the FeatureMesh Registry
    params_post = {
      "statement": {
        "id": feature_name,
        "inputs": {
            "input_table_1": [[id] for id in ids],
        }
      }
    }
    # print(json.dumps(params_post))
    
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {__ACCESS_TOKEN__}",
    }
    response = requests.post(f"{get_featuremesh_config()['serving.host']}/v1/featureql", json=params_post, headers=headers, timeout=30, verify=False)

    if response.status_code == 200:
        return response.json()
    else:
        print(f'A non HTTP 200 response occured: {response=}: {response.text}')

#### One customer at a time (100 times)

In [36]:
%%time

customer_ids = query_duckdb("SELECT customer_id FROM dim_customers ORDER BY RANDOM() LIMIT 100").to_dict('list').get('customer_id')
responses = []
for customer_id in customer_ids:
    response = call_prepared_statement("FM.DEMO1.LAST_ORDER_PRICE_PS", [customer_id])
    responses.append(response)

print(len(responses))
responses[17]

86
CPU times: user 259 ms, sys: 72.9 ms, total: 332 ms
Wall time: 899 ms


{'errors': [{'message': 'Engine execution error: Invalid query \'/* start_of_query reference: persistence_prepared_statement */\nwith\ncte_qu5pd3ux_0 as (\n    select\n    /*  */\n    source_unnamed_z96a1b29."fm__demo1__customer_id" as "fm__demo1__customer_id", "source_fm__demo1__customer_details_query".final_row as "fm__demo1__customer_details"\n    from (\n        \tselect 1\n    ) source_constants\n        cross join (\n            \tselect "customer_id_2db32c7f" from input_table_1_561\n        ) source_unnamed_z96a1b29\n        left join (\n            \tselect *, row(last_order_id) as final_row from (select customer_id, last_order_id from "fm.demo1.postgres_conn.customers_view")\n        ) "source_fm__demo1__customer_details_query"\n        on "source_fm__demo1__customer_details_query".customer_id=source_unnamed_z96a1b29."fm__demo1__customer_id"\n), \ncte_qu5pd3ux_1 as (\nselect\n\tcast("fm__demo1__customer_details" as struct<last_order_id bigint>) as "fm__demo1__customer_details"

#### Or many customers at a time (4 customers, 100 times)

In [37]:
%%time

customer_ids={}
for i in range(4):
    customer_ids[i] = query_duckdb("SELECT customer_id FROM dim_customers ORDER BY RANDOM() LIMIT 100").to_dict('list').get('customer_id')

responses = []
for j in range(len(customer_ids[0])):
    response = call_prepared_statement("FM.DEMO1.LAST_ORDER_PRICE_PS", [customer_ids[0][j], customer_ids[1][j], customer_ids[2][j], customer_ids[3][j]])
    responses.append(response)

print(len(responses))

responses[17]

86
CPU times: user 385 ms, sys: 102 ms, total: 486 ms
Wall time: 995 ms


{'errors': [{'message': 'Engine execution error: Invalid query \'/* start_of_query reference: persistence_prepared_statement */\nwith\ncte_qu5pd3ux_0 as (\n    select\n    /*  */\n    source_unnamed_z96a1b29."fm__demo1__customer_id" as "fm__demo1__customer_id", "source_fm__demo1__customer_details_query".final_row as "fm__demo1__customer_details"\n    from (\n        \tselect 1\n    ) source_constants\n        cross join (\n            \tselect "customer_id_2db32c7f" from input_table_1_647\n        ) source_unnamed_z96a1b29\n        left join (\n            \tselect *, row(last_order_id) as final_row from (select customer_id, last_order_id from "fm.demo1.postgres_conn.customers_view")\n        ) "source_fm__demo1__customer_details_query"\n        on "source_fm__demo1__customer_details_query".customer_id=source_unnamed_z96a1b29."fm__demo1__customer_id"\n), \ncte_qu5pd3ux_1 as (\nselect\n\tcast("fm__demo1__customer_details" as struct<last_order_id bigint>) as "fm__demo1__customer_details"

**=> Result: Less complexity, more speed** 

With a few lines of FeatureQL, you have a already deployed endpoint that gives you the price of the last order for any customer_id in 20-30 ms.