
## Create feature tables in UC with Spark Declarative Pipelines

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/feature_store/2025Q4_03_image1.png?raw=true" style="float: right" width="500px" />

Any streaming table or materialized view in Unity Catalog with a primary key can be a feature table in Unity Catalog, and you can use the Features UI and API with the table.

In order to run the notebook, please go to the pipeline UI to create a pipeline.




In [0]:
import dlt
from pyspark.sql import functions as F, Window as W
from pyspark import pipelines as dp

In [0]:
@dp.materialized_view(
    name="travel_purchase_sdp",
    schema="""
        destination_id BIGINT NOT NULL,
        user_id BIGINT NOT NULL,
        ts TIMESTAMP,
        clicked BOOLEAN,
        purchased BOOLEAN,
        price DOUBLE
    """
)
def travel_purchase_sdp():
    df = (
        spark.table("travel_purchase")
        .selectExpr(
            "CAST(destination_id AS BIGINT) AS destination_id",
            "CAST(user_id AS BIGINT) AS user_id",
            "ts",
            "clicked",
            "purchased",
            "price"
        )
    )
    return df


In [0]:
@dp.materialized_view(
    name="user_demography_sdp",
    schema="""
        user_id BIGINT NOT NULL,
        age BIGINT,
        gender STRING,
        income_bracket STRING,
        loyalty_tier STRING,
        first_login_date TIMESTAMP,
        billing_state STRING,
        billing_city STRING
    """
)
def user_demography_sdp():
    df = (
        spark.table("user_demography")
        .selectExpr(
            "CAST(user_id AS BIGINT) AS user_id",
            "CAST(age AS BIGINT) AS age",
            "gender",
            "income_bracket",
            "loyalty_tier",
            "first_login_date",
            "billing_state",
            "billing_city"
        )
    )
    return df

In [0]:
@dp.materialized_view(
    name="destination_features_advanced_sdp",
    schema="""
        destination_id BIGINT NOT NULL,
        ts TIMESTAMP,
        sum_clicks_7d DOUBLE,
        sum_impressions_7d DOUBLE,
        CONSTRAINT destination_features_advanced_pk_sdp PRIMARY KEY (destination_id)
    """
)
def destination_features_advanced():
    df = spark.table("travel_purchase_sdp").select("destination_id", "ts", "clicked")

    window_spec = W.partitionBy("destination_id").orderBy(F.col("ts").cast("long")).rangeBetween(-7*86400, 0)

    df = (
        df.withColumn("clicked_i", F.col("clicked").cast("int"))
          .withColumn("sum_clicks_7d", F.sum("clicked_i").over(window_spec).cast("double"))
          .withColumn("sum_impressions_7d", F.count("*").over(window_spec).cast("double"))
          # Fix datatype mismatches explicitly
          .withColumn("destination_id", F.col("destination_id").cast("bigint"))
          .withColumn("ts", F.col("ts").cast("timestamp"))
          .select("destination_id", "ts", "sum_clicks_7d", "sum_impressions_7d")
    )
    # Optional: fill nulls to avoid nullable conflicts
    df = df.na.fill({"sum_clicks_7d": 0.0, "sum_impressions_7d": 0.0})
    return df



In [0]:
@dp.materialized_view(
    name="user_features_advanced_sdp",
    schema="""
        user_id BIGINT NOT NULL,
        ts TIMESTAMP,
        mean_price_7d DOUBLE,
        last_6m_purchases DOUBLE,
        tenure_days DOUBLE,
        age BIGINT,
        gender STRING,
        income_bracket STRING,
        loyalty_tier STRING,
        billing_state STRING,
        billing_city STRING,
        CONSTRAINT user_features_advanced_pk_sdp PRIMARY KEY (user_id)
    """
)
def user_features_advanced():
    travel_purchase = spark.table("travel_purchase_sdp").select("user_id", "price", "purchased", "ts")
    user_demo = spark.table("user_demography_sdp")

    window_spec = W.partitionBy("user_id").orderBy(F.col("ts").cast("long"))

    user_feat = (
        travel_purchase
        .withColumn("ts_l", F.col("ts").cast("long"))
        .withColumn(
            "lookedup_price_7d_rolling_sum",
            F.sum("price").over(window_spec.rangeBetween(-7*86400, 0))
        )
        .withColumn(
            "lookups_7d_rolling_sum",
            F.count("*").over(window_spec.rangeBetween(-7*86400, 0))
        )
        .withColumn("mean_price_7d", (F.col("lookedup_price_7d_rolling_sum") / F.col("lookups_7d_rolling_sum")).cast("double"))
        .withColumn("tickets_purchased", F.col("purchased").cast("int"))
        .withColumn(
            "last_6m_purchases",
            F.sum("tickets_purchased").over(window_spec.rangeBetween(-6*30*86400, 0)).cast("double")
        )
    )

    user_feat = (
        user_feat.join(user_demo, on="user_id", how="left")
                 .withColumn("tenure_days", F.datediff(F.current_date(), F.col("first_login_date")).cast("double"))
                 .withColumn("user_id", F.col("user_id").cast("bigint"))
                 .withColumn("ts", F.col("ts").cast("timestamp"))
                 .select(
                     "user_id", "ts", "mean_price_7d", "last_6m_purchases", "tenure_days",
                     "age", "gender", "income_bracket", "loyalty_tier", "billing_state", "billing_city"
                 )
    )

    # Optional: handle nulls for numeric columns to keep schema strict
    user_feat = user_feat.na.fill({
        "mean_price_7d": 0.0,
        "last_6m_purchases": 0.0,
        "tenure_days": 0.0
    })

    return user_feat
