# Joining Session-Scoped UDTFs

This notebook demonstrates how to join data from different UDTFs based on `external_id` and `space`.

## Prerequisites

- Multiple UDTFs must be registered (see `basic_registration.ipynb`)
- CDF credentials configured in Secret Manager


## Step 1: Join on external_id


In [None]:
# Join two UDTFs on external_id
query = """
SELECT 
    v.external_id,
    v.name AS vessel_name,
    s.external_id AS sensor_id,
    s.value AS sensor_value
FROM vessel_udtf(
    client_id => SECRET('cdf_sailboat_sailboat', 'client_id'),
    client_secret => SECRET('cdf_sailboat_sailboat', 'client_secret'),
    tenant_id => SECRET('cdf_sailboat_sailboat', 'tenant_id'),
    cdf_cluster => SECRET('cdf_sailboat_sailboat', 'cdf_cluster'),
    project => SECRET('cdf_sailboat_sailboat', 'project'),
    name => NULL,
    description => NULL
) v
JOIN sensor_udtf(
    client_id => SECRET('cdf_power_windturbine', 'client_id'),
    client_secret => SECRET('cdf_power_windturbine', 'client_secret'),
    tenant_id => SECRET('cdf_power_windturbine', 'tenant_id'),
    cdf_cluster => SECRET('cdf_power_windturbine', 'cdf_cluster'),
    project => SECRET('cdf_power_windturbine', 'project'),
    name => NULL,
    description => NULL
) s ON v.external_id = s.vessel_external_id
WHERE v.space = 'sailboat'
LIMIT 10;
"""

result = spark.sql(query)
result.show(truncate=False)


## Step 2: Join on space + external_id


In [None]:
# Join on space and external_id (more precise)
query = """
SELECT 
    a.external_id,
    a.space,
    b.parent_external_id,
    b.child_external_id
FROM parent_udtf(
    client_id => SECRET('cdf_sailboat_sailboat', 'client_id'),
    client_secret => SECRET('cdf_sailboat_sailboat', 'client_secret'),
    tenant_id => SECRET('cdf_sailboat_sailboat', 'tenant_id'),
    cdf_cluster => SECRET('cdf_sailboat_sailboat', 'cdf_cluster'),
    project => SECRET('cdf_sailboat_sailboat', 'project'),
    name => NULL,
    description => NULL
) a
JOIN child_udtf(
    client_id => SECRET('cdf_sailboat_sailboat', 'client_id'),
    client_secret => SECRET('cdf_sailboat_sailboat', 'client_secret'),
    tenant_id => SECRET('cdf_sailboat_sailboat', 'tenant_id'),
    cdf_cluster => SECRET('cdf_sailboat_sailboat', 'cdf_cluster'),
    project => SECRET('cdf_sailboat_sailboat', 'project'),
    name => NULL,
    description => NULL
) b 
    ON a.space = b.space 
    AND a.external_id = b.parent_external_id
LIMIT 10;
"""

result = spark.sql(query)
result.show(truncate=False)


## Step 3: CROSS JOIN LATERAL with Time Series UDTFs


# Register time series UDTFs if not already registered
from pyspark.sql.functions import udtf
from cognite.pygen_spark.time_series_udtfs import TimeSeriesDatapointsUDTF

time_series_datapoints_udtf = udtf(TimeSeriesDatapointsUDTF)
spark.udtf.register("time_series_datapoints_udtf", time_series_datapoints_udtf)
print("âœ“ Time Series UDTF registered for CROSS JOIN LATERAL example")

In [None]:
# Use vessel data to query time series
query = """
SELECT 
    v.external_id AS vessel_id,
    v.name AS vessel_name,
    ts.timestamp,
    ts.value AS speed
FROM vessel_udtf(
    client_id => SECRET('cdf_sailboat_sailboat', 'client_id'),
    client_secret => SECRET('cdf_sailboat_sailboat', 'client_secret'),
    tenant_id => SECRET('cdf_sailboat_sailboat', 'tenant_id'),
    cdf_cluster => SECRET('cdf_sailboat_sailboat', 'cdf_cluster'),
    project => SECRET('cdf_sailboat_sailboat', 'project'),
    name => NULL,
    description => NULL
) v
CROSS JOIN LATERAL (
    SELECT * FROM time_series_datapoints_udtf(
        space => v.space,
        external_id => v.speed_ts_external_id,
        start => '1d-ago',
        end => 'now',
        client_id => SECRET('cdf_sailboat_sailboat', 'client_id'),
        client_secret => SECRET('cdf_sailboat_sailboat', 'client_secret'),
        tenant_id => SECRET('cdf_sailboat_sailboat', 'tenant_id'),
        cdf_cluster => SECRET('cdf_sailboat_sailboat', 'cdf_cluster'),
        project => SECRET('cdf_sailboat_sailboat', 'project')
    )
) ts
WHERE v.space = 'sailboat'
ORDER BY v.external_id, ts.timestamp
LIMIT 100;
"""

result = spark.sql(query)
result.show(truncate=False)


## Step 4: Join with Data Model Views (Multiple Time Series)


In [None]:
# Query multiple time series referenced in a view
query = """
SELECT 
    v.external_id AS vessel_id,
    v.name AS vessel_name,
    speed_ts.timestamp AS speed_timestamp,
    speed_ts.value AS speed_value,
    course_ts.timestamp AS course_timestamp,
    course_ts.value AS course_value
FROM vessel_udtf(
    client_id => SECRET('cdf_sailboat_sailboat', 'client_id'),
    client_secret => SECRET('cdf_sailboat_sailboat', 'client_secret'),
    tenant_id => SECRET('cdf_sailboat_sailboat', 'tenant_id'),
    cdf_cluster => SECRET('cdf_sailboat_sailboat', 'cdf_cluster'),
    project => SECRET('cdf_sailboat_sailboat', 'project'),
    name => NULL,
    description => NULL
) v
CROSS JOIN LATERAL (
    SELECT * FROM time_series_datapoints_udtf(
        space => v.speed_ts_space,
        external_id => v.speed_ts_external_id,
        start => '1d-ago',
        end => 'now',
        client_id => SECRET('cdf_sailboat_sailboat', 'client_id'),
        client_secret => SECRET('cdf_sailboat_sailboat', 'client_secret'),
        tenant_id => SECRET('cdf_sailboat_sailboat', 'tenant_id'),
        cdf_cluster => SECRET('cdf_sailboat_sailboat', 'cdf_cluster'),
        project => SECRET('cdf_sailboat_sailboat', 'project')
    )
) speed_ts
CROSS JOIN LATERAL (
    SELECT * FROM time_series_datapoints_udtf(
        space => v.course_ts_space,
        external_id => v.course_ts_external_id,
        start => '1d-ago',
        end => 'now',
        client_id => SECRET('cdf_sailboat_sailboat', 'client_id'),
        client_secret => SECRET('cdf_sailboat_sailboat', 'client_secret'),
        tenant_id => SECRET('cdf_sailboat_sailboat', 'tenant_id'),
        cdf_cluster => SECRET('cdf_sailboat_sailboat', 'cdf_cluster'),
        project => SECRET('cdf_sailboat_sailboat', 'project')
    )
) course_ts
WHERE v.space = 'sailboat'
  AND v.speed_ts_external_id IS NOT NULL
  AND v.course_ts_external_id IS NOT NULL
ORDER BY v.external_id, speed_ts.timestamp
LIMIT 100;
"""

result = spark.sql(query)
result.show(truncate=False)
