%md
## Entity Relationship Diagram

The following diagram shows how the synthetic data tables relate to each other for the churn prediction model:

```mermaid
erDiagram
    USER_TENANT_MAPPING ||--o{ USER_CLICK_EVENTS : "has"
    USER_TENANT_MAPPING ||--o{ CONFLUENCE_EVENTS : "has"
    USER_TENANT_MAPPING ||--o{ JIRA_EVENTS : "has"
    USER_TENANT_MAPPING ||--o{ PRODUCT_ENTITLEMENTS : "has"
    USER_TENANT_MAPPING ||--|| CHURN_LABELS : "has"
    USER_TENANT_MAPPING }o--|| TENANT_METRICS : "belongs to"
    
    USER_TENANT_MAPPING {
        string user_id PK
        string tenant_id FK
    }
    
    USER_CLICK_EVENTS {
        string user_id FK
        string tenant_id FK
        timestamp event_ts
        string action_type
        int duration_seconds
        string product
    }
    
    CONFLUENCE_EVENTS {
        string user_id FK
        string tenant_id FK
        timestamp event_ts
        string page_id
        string event_type
        int dwell_time_seconds
        int comment_count
    }
    
    JIRA_EVENTS {
        string user_id FK
        string tenant_id FK
        timestamp event_ts
        string issue_id
        string activity_type
        int reaction_count
        string jira_product
        string issue_type
    }
    
    PRODUCT_ENTITLEMENTS {
        string user_id FK
        string tenant_id FK
        timestamp entitled_ts
        string product
        boolean is_active
        int page_visits
    }
    
    TENANT_METRICS {
        string tenant_id PK_FK
        timestamp metric_ts
        int key_events_count
        int active_users_count
        int reporting_lines_count
        int linked_aris_count
    }
    
    CHURN_LABELS {
        string user_id PK_FK
        string tenant_id FK
        timestamp label_ts
        int churned
    }
```

**Key Relationships:**
* **user_tenant_mapping** is the central table linking users to tenants
* **Event tables** (user_click_events, confluence_events, jira_events) capture time-series behavioral data
* **product_entitlements** tracks product access and usage
* **tenant_metrics** provides organizational-level context
* **churn_labels** contains the target variable for model training

**Join Keys:**
* `user_id` - Primary key for user-level joins
* `tenant_id` - Foreign key for tenant-level aggregations

## Entity Relationship Diagram

The following diagram shows how the synthetic data tables relate to each other for the churn prediction model:

```mermaid
erDiagram
    USER_TENANT_MAPPING ||--o{ USER_CLICK_EVENTS : "has"
    USER_TENANT_MAPPING ||--o{ CONFLUENCE_EVENTS : "has"
    USER_TENANT_MAPPING ||--o{ JIRA_EVENTS : "has"
    USER_TENANT_MAPPING ||--o{ PRODUCT_ENTITLEMENTS : "has"
    USER_TENANT_MAPPING ||--|| CHURN_LABELS : "has"
    USER_TENANT_MAPPING }o--|| TENANT_METRICS : "belongs to"
    
    USER_TENANT_MAPPING {
        string user_id PK
        string tenant_id FK
    }
    
    USER_CLICK_EVENTS {
        string user_id FK
        string tenant_id FK
        timestamp event_ts
        string action_type
        int duration_seconds
        string product
    }
    
    CONFLUENCE_EVENTS {
        string user_id FK
        string tenant_id FK
        timestamp event_ts
        string page_id
        string event_type
        int dwell_time_seconds
        int comment_count
    }
    
    JIRA_EVENTS {
        string user_id FK
        string tenant_id FK
        timestamp event_ts
        string issue_id
        string activity_type
        int reaction_count
        string jira_product
        string issue_type
    }
    
    PRODUCT_ENTITLEMENTS {
        string user_id FK
        string tenant_id FK
        timestamp entitled_ts
        string product
        boolean is_active
        int page_visits
    }
    
    TENANT_METRICS {
        string tenant_id PK_FK
        timestamp metric_ts
        int key_events_count
        int active_users_count
        int reporting_lines_count
        int linked_aris_count
    }
    
    CHURN_LABELS {
        string user_id PK_FK
        string tenant_id FK
        timestamp label_ts
        int churned
    }
```

**Key Relationships:**
* **user_tenant_mapping** is the central table linking users to tenants
* **Event tables** (user_click_events, confluence_events, jira_events) capture time-series behavioral data
* **product_entitlements** tracks product access and usage
* **tenant_metrics** provides organizational-level context
* **churn_labels** contains the target variable for model training

**Join Keys:**
* `user_id` - Primary key for user-level joins
* `tenant_id` - Foreign key for tenant-level aggregations

## 1. Setup and Imports

### Configuration Widgets

This notebook uses **widgets** (query parameters) to make the catalog and schema configurable:
- **Catalog Name**: Unity Catalog catalog where features and models will be stored
- **Schema Name**: Schema within the catalog for organizing feature tables

You can modify these values using the widgets at the top of the notebook without changing the code.

### Required Libraries

We'll install the latest pre-release version of `databricks-feature-engineering` which includes:
- Declarative feature engineering APIs
- Support for multiple window types
- Point-in-time correctness
- Feature materialization to offline/online stores
- Model logging with feature lineage

In [0]:
%pip install --pre databricks-feature-engineering>=0.13.1a4
dbutils.library.restartPython()

In [0]:
from datetime import timedelta

from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
    DeltaTableSource,
    ContinuousWindow,
    TumblingWindow,
    SlidingWindow,
    Sum,
    Avg,
    Count,
    Min,
    Max,
    StddevPop,
    ApproxCountDistinct,
)
from pyspark.sql import functions as F
import mlflow
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score

# Configuration - Get values from widgets
CATALOG = dbutils.widgets.get("catalog")
SCHEMA = dbutils.widgets.get("schema")
SOURCE_SCHEMA = dbutils.widgets.get("source_schema")
# Initialize Feature Engineering Client
fe = FeatureEngineeringClient()

print(f"Feature Engineering Client initialized for {CATALOG}.{SCHEMA}")

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

## 2. Define Data Sources

### Understanding DeltaTableSource

`DeltaTableSource` is a declarative way to define your source data for feature engineering. It specifies:

1. **Table Location**: Catalog, schema, and table name in Unity Catalog
2. **Entity Columns**: The key(s) that identify the entity (e.g., `user_id`, `tenant_id`)
3. **Timeseries Column**: The timestamp column for temporal aggregations

### Why Entity and Timeseries Columns Matter

- **Entity Columns**: Define the granularity of your features (user-level, tenant-level, etc.)
- **Timeseries Column**: Enables point-in-time correctness - ensures features are computed using only data available at prediction time

### Our Data Sources

We'll define four data sources:

1. **user_click_events**: User engagement data (clicks, sessions, products)
2. **confluence_events**: Collaboration activity (page views, comments, dwell time)
3. **jira_events**: Project management activity (issues, reactions, activities)
4. **tenant_metrics**: Organization-level metrics (different entity: tenant_id)

Each source has its own entity key and timestamp column, allowing us to create features at different granularities.

In [0]:
# User Click Events Source - for user engagement features
click_events_source = DeltaTableSource(
    catalog_name=CATALOG,
    schema_name=SOURCE_SCHEMA,
    table_name="user_click_events",
    entity_columns=["user_id"],
    timeseries_column="event_ts"
)

# Confluence Events Source - for collaboration features
confluence_events_source = DeltaTableSource(
    catalog_name=CATALOG,
    schema_name=SOURCE_SCHEMA,
    table_name="confluence_events",
    entity_columns=["user_id"],
    timeseries_column="event_ts"
)

# Jira Events Source - for project activity features
jira_events_source = DeltaTableSource(
    catalog_name=CATALOG,
    schema_name=SOURCE_SCHEMA,
    table_name="jira_events",
    entity_columns=["user_id"],
    timeseries_column="event_ts"
)

# Tenant Metrics Source - for tenant-level features (different entity)
tenant_metrics_source = DeltaTableSource(
    catalog_name=CATALOG,
    schema_name=SOURCE_SCHEMA,
    table_name="tenant_metrics",
    entity_columns=["tenant_id"],
    timeseries_column="metric_ts"
)

print("Data sources defined:")
print("  - click_events_source (entity: user_id)")
print("  - confluence_events_source (entity: user_id)")
print("  - jira_events_source (entity: user_id)")
print("  - tenant_metrics_source (entity: tenant_id)")

## 3. Create Features with Different Window Types

### 3.1 User Engagement Features (SlidingWindow)

#### What is SlidingWindow?

`SlidingWindow` creates **overlapping rolling aggregations** that slide forward in time:

- **window_duration**: How far back to look (e.g., 7 days, 30 days)
- **slide_duration**: How often the window moves forward (e.g., 1 day)

**Example**: A 7-day window with 1-day slide:
- Day 1: Aggregates data from Day -6 to Day 1
- Day 2: Aggregates data from Day -5 to Day 2
- Day 3: Aggregates data from Day -4 to Day 3

This creates smooth, overlapping metrics ideal for trend analysis.

#### Features We'll Create

1. **user_clicks_7d**: Total clicks in last 7 days (short-term engagement)
2. **user_clicks_30d**: Total clicks in last 30 days (long-term engagement)
3. **avg_session_duration_7d**: Average session length (engagement quality)
4. **total_session_duration_30d**: Total time spent (engagement depth)
5. **unique_products_visited_30d**: Product diversity (exploration behavior)

#### Why These Features Matter for Churn

- Declining click counts indicate disengagement
- Shorter sessions suggest reduced interest
- Lower product diversity shows narrowing usage patterns

In [0]:
# Feature 1: Total clicks in last 7 days (SlidingWindow + Count)
user_clicks_7d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="user_clicks_7d",
    description="Total user click events in the last 7 days",
    source=click_events_source,
    inputs=["action_type"],
    function=Count(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=7),
        slide_duration=timedelta(days=1)
    )
)

# Feature 2: Total clicks in last 30 days (SlidingWindow + Count)
user_clicks_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="user_clicks_30d",
    description="Total user click events in the last 30 days",
    source=click_events_source,
    inputs=["action_type"],
    function=Count(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

# Feature 3: Average session duration 7 days (SlidingWindow + Avg)
avg_duration_7d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="avg_session_duration_7d",
    description="Average session duration in seconds over last 7 days",
    source=click_events_source,
    inputs=["duration_seconds"],
    function=Avg(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=7),
        slide_duration=timedelta(days=1)
    )
)

# Feature 4: Total session duration 30 days (SlidingWindow + Sum)
total_duration_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="total_session_duration_30d",
    description="Total session duration in seconds over last 30 days",
    source=click_events_source,
    inputs=["duration_seconds"],
    function=Sum(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

# Feature 5: Unique products visited 30 days (SlidingWindow + ApproxCountDistinct)
unique_products_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="unique_products_visited_30d",
    description="Number of distinct products visited in last 30 days",
    source=click_events_source,
    inputs=["product"],
    function=ApproxCountDistinct(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)


print("User engagement features created (SlidingWindow):")
print("  - user_clicks_7d")
print("  - user_clicks_30d")
print("  - avg_session_duration_7d")
print("  - total_session_duration_30d")
print("  - unique_products_visited_30d")

### 3.2 Confluence Collaboration Features (SlidingWindow)

#### Collaboration as a Churn Signal

Confluence activity reflects team collaboration and knowledge sharing. Declining collaboration often precedes churn:
- Users who stop contributing to documentation may be disengaging
- Reduced page views suggest decreased team involvement
- Lower comment activity indicates less communication

#### Features We'll Create

1. **confluence_dwell_time_14d**: Total time spent on Confluence pages (14-day window)
2. **confluence_avg_dwell_30d**: Average time per page visit (engagement depth)
3. **confluence_comments_30d**: Total comments made (collaboration intensity)
4. **confluence_unique_pages_30d**: Number of unique pages visited (breadth of engagement)
5. **confluence_events_7d_lagged**: Event count with temporal comparison capability
6. **total_dwell_with_view_30d_lagged**: Filtered aggregation (only 'view' events)

#### Advanced Features

- **Filter Conditions**: The last feature uses `filter_condition="event_type = 'view'"` to aggregate only specific event types
- **Lagged Windows**: Can be used to compare current vs. previous period activity

#### Why SlidingWindow for Collaboration?

Collaboration patterns change gradually, so overlapping windows capture smooth trends better than discrete buckets.

In [0]:
# Feature 6: Total dwell time 14 days (ContinuousWindow + Sum)
conf_dwell_time_14d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="confluence_dwell_time_14d",
    description="Total Confluence dwell time in seconds over last 14 days",
    source=confluence_events_source,
    inputs=["dwell_time_seconds"],
    function=Sum(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=14),
        slide_duration=timedelta(days=1)
    )
)

# Feature 7: Average dwell time 30 days (ContinuousWindow + Avg)
conf_avg_dwell_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="confluence_avg_dwell_30d",
    description="Average Confluence dwell time per page over last 30 days",
    source=confluence_events_source,
    inputs=["dwell_time_seconds"],
    function=Avg(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

# Feature 8: Total comments 30 days (ContinuousWindow + Sum)
conf_comments_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="confluence_comments_30d",
    description="Total Confluence comments in last 30 days",
    source=confluence_events_source,
    inputs=["comment_count"],
    function=Sum(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

# Feature 9: Unique pages visited 30 days (ContinuousWindow + ApproxCountDistinct)
conf_unique_pages_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="confluence_unique_pages_30d",
    description="Number of unique Confluence pages visited in last 30 days",
    source=confluence_events_source,
    inputs=["page_id"],
    function=ApproxCountDistinct(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

# Feature 10: Confluence events count with offset (ContinuousWindow with offset)
conf_events_7d_offset = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="confluence_events_7d_lagged",
    description="Confluence events 7 days with 1 day lag (for trend comparison)",
    source=confluence_events_source,
    inputs=["event_type"],
    function=Count(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=7),
        # offset=timedelta(days=-1) ,
        slide_duration=timedelta(days=1) # 1 day lag
    )
)

# Feature 11: Confluence events count with offset (ContinuousWindow with offset)
conf_events_view_30d_offset = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="total_dwell_with_view_30d_lagged",
    description="Confluence events 7 days with 1 day lag (for trend comparison)",
    source=confluence_events_source,
    inputs=["dwell_time_seconds"],
    function=Sum(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        # offset=timedelta(days=-1) ,
        slide_duration=timedelta(days=1) # 1 day lag
    ),
    filter_condition="event_type = 'view'"  # Only transactions over $100
)


print("Confluence collaboration features created (SlidingWindow Window):")
print("  - confluence_dwell_time_14d")
print("  - confluence_avg_dwell_30d")
print("  - confluence_comments_30d")
print("  - confluence_unique_pages_30d")
print("  - confluence_events_7d_lagged (with offset)")
print("  - total_dweel_with_view_30d_lagged (with offset)")

### 3.3 Jira Activity Features (TumblingWindow)

#### What is TumblingWindow?

`TumblingWindow` creates **non-overlapping, fixed-size windows**:

- Each time period is independent (no overlap)
- Ideal for periodic reporting and discrete time buckets
- **window_duration**: Size of each window (e.g., 7 days for weekly, 30 days for monthly)

**Example**: Weekly tumbling windows:
- Week 1: Days 1-7
- Week 2: Days 8-14
- Week 3: Days 15-21

Each week is completely separate with no overlap.

#### When to Use TumblingWindow vs SlidingWindow?

| Use TumblingWindow | Use SlidingWindow |
|-------------------|-------------------|
| Periodic reports (weekly/monthly) | Trend analysis |
| Discrete time buckets | Smooth rolling metrics |
| Non-overlapping aggregations | Recent activity emphasis |
| Comparing distinct periods | Continuous monitoring |

#### Features We'll Create

1. **jira_activities_weekly**: Count of Jira activities per week
2. **jira_max_reactions_weekly**: Peak engagement in each week
3. **jira_total_reactions_monthly**: Monthly reaction totals
4. **jira_unique_issues_monthly**: Issue diversity per month
5. **jira_min_reactions_weekly**: Minimum engagement baseline

#### Why These Features Matter

- Weekly/monthly patterns reveal work rhythms
- Max/min reactions show engagement range
- Unique issue counts indicate project involvement breadth

In [0]:
# Feature 11: Jira activities weekly (TumblingWindow + Count)
jira_activities_weekly = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="jira_activities_weekly",
    description="Count of Jira activities in weekly tumbling window",
    source=jira_events_source,
    inputs=["activity_type"],
    function=Count(),
    time_window=TumblingWindow(
        window_duration=timedelta(days=7)
    )
)

# Feature 12: Max reactions weekly (TumblingWindow + Max)
jira_max_reactions_weekly = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="jira_max_reactions_weekly",
    description="Maximum reaction count on any Jira activity in weekly window",
    source=jira_events_source,
    inputs=["reaction_count"],
    function=Max(),
    time_window=TumblingWindow(
        window_duration=timedelta(days=7)
    )
)

# Feature 13: Total reactions monthly (TumblingWindow + Sum)
jira_total_reactions_monthly = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="jira_total_reactions_monthly",
    description="Total reactions on Jira activities in monthly tumbling window",
    source=jira_events_source,
    inputs=["reaction_count"],
    function=Sum(),
    time_window=TumblingWindow(
        window_duration=timedelta(days=30)
    )
)

# Feature 14: Unique issues worked on monthly (TumblingWindow + ApproxCountDistinct)
jira_unique_issues_monthly = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="jira_unique_issues_monthly",
    description="Number of unique Jira issues worked on in monthly window",
    source=jira_events_source,
    inputs=["issue_id"],
    function=ApproxCountDistinct(),
    time_window=TumblingWindow(
        window_duration=timedelta(days=30)
    )
)

# Feature 15: Min reactions weekly (TumblingWindow + Min)
jira_min_reactions_weekly = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="jira_min_reactions_weekly",
    description="Minimum reaction count on Jira activities in weekly window",
    source=jira_events_source,
    inputs=["reaction_count"],
    function=Min(),
    time_window=TumblingWindow(
        window_duration=timedelta(days=7)
    )
)

print("Jira activity features created (TumblingWindow):")
print("  - jira_activities_weekly")
print("  - jira_max_reactions_weekly")
print("  - jira_total_reactions_monthly")
print("  - jira_unique_issues_monthly")
print("  - jira_min_reactions_weekly")

### 3.4 Jira Features with Filter Conditions

#### Advanced Feature Engineering: Filter Conditions

The `filter_condition` parameter allows you to create **filtered aggregations** without pre-filtering your source data:

**Benefits**:
1. **Reusability**: Same source table, multiple filtered features
2. **Performance**: Filtering happens during aggregation (optimized)
3. **Maintainability**: Filter logic is part of feature definition
4. **Clarity**: Feature name and filter are co-located

#### Example Use Cases

```python
# Only count specific product activities
filter_condition="jira_product = 'jsw'"

# Only aggregate high-value transactions
filter_condition="amount > 100"

# Only include specific event types
filter_condition="event_type = 'view'"
```

#### Features We'll Create

1. **jsw_activities_15d**: Activities specific to Jira Software (JSW)
2. **bug_activities_30d**: Bug-related activities only

These filtered features help identify product-specific or issue-type-specific engagement patterns.

In [0]:
# Feature 16: JSW-specific activities (with filter_condition)
jsw_activities_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="jsw_activities_30d",
    description="Count of Jira Software activities in last 30 days",
    source=jira_events_source,
    inputs=["activity_type"],
    function=Count(),
    filter_condition="jira_product = 'jsw'",
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

# Feature 17: Bug-related activities (with filter_condition)
bug_activities_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="bug_activities_30d",
    description="Count of bug-related Jira activities in last 30 days",
    source=jira_events_source,
    inputs=["activity_type"],
    function=Count(),
    filter_condition="issue_type = 'bug'",
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

# Feature 18: Issue creation count (with filter_condition)
issues_created_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="issues_resolved_30d",
    description="Count of Jira issues resolved in last 30 days",
    source=jira_events_source,
    inputs=["issue_id"],
    function=ApproxCountDistinct(),
    filter_condition="activity_type = 'resolve' ",
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)


print("Filtered Jira features created:")
print("  - jsw_activities_30d (filter: jira_product = 'jsw')")
print("  - bug_activities_30d (filter: issue_type = 'bug')")
print("  - issues_created_30d (filter: activity_type = 'create')")
print("  - issues_resolved_30d (filter: activity_type = 'resolve')")

### 3.5 Statistical Features (StddevPop)

#### Why Standard Deviation Matters for Churn Prediction

`StddevPop()` (Population Standard Deviation) measures **consistency and variability** in user behavior:

**High Standard Deviation** = Inconsistent behavior
- Sporadic engagement
- Unpredictable usage patterns
- May indicate casual or declining users

**Low Standard Deviation** = Consistent behavior
- Regular engagement
- Predictable usage patterns
- Indicates habitual, committed users

#### Features We'll Create

1. **stddev_session_duration_30d**: Consistency of session lengths
   - High variance: Erratic usage
   - Low variance: Stable engagement

2. **stddev_confluence_dwell_30d**: Consistency of Confluence engagement
   - Measures collaboration pattern stability

3. **stddev_jira_reactions_30d**: Consistency of Jira engagement
   - Indicates project involvement stability

#### Machine Learning Insight

Standard deviation features often have high predictive power because:
- They capture behavior patterns, not just volumes
- Declining users often show increasing variance (erratic behavior)
- Engaged users maintain consistent patterns

In [0]:
# Feature 20: Standard deviation of session duration (engagement consistency)
stddev_duration_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="stddev_session_duration_30d",
    description="Standard deviation of session duration (engagement consistency)",
    source=click_events_source,
    inputs=["duration_seconds"],
    function=StddevPop(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

# Feature 21: Standard deviation of Confluence dwell time
stddev_dwell_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="stddev_confluence_dwell_30d",
    description="Standard deviation of Confluence dwell time (reading consistency)",
    source=confluence_events_source,
    inputs=["dwell_time_seconds"],
    function=StddevPop(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

# Feature 22: Standard deviation of reactions (engagement variability)
stddev_reactions_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="stddev_jira_reactions_30d",
    description="Standard deviation of Jira reaction counts (engagement variability)",
    source=jira_events_source,
    inputs=["reaction_count"],
    function=StddevPop(),
    time_window=SlidingWindow(
        window_duration=timedelta(days=30),
        slide_duration=timedelta(days=1)
    )
)

print("Statistical features created (StddevPop):")
print("  - stddev_session_duration_30d")
print("  - stddev_confluence_dwell_30d")
print("  - stddev_jira_reactions_30d")

### 3.6 Tenant-Level Features

#### Multi-Entity Feature Engineering

So far, all features have been at the **user level** (entity: `user_id`). Now we'll create features at the **tenant level** (entity: `tenant_id`).

**Why Tenant-Level Features?**

User churn is influenced by organizational health:
- Users in struggling organizations are more likely to churn
- Tenant-level metrics provide context for individual behavior
- Combining user and tenant features improves prediction accuracy

#### Features We'll Create

1. **tenant_avg_key_events_30d**: Average daily key events for the organization
   - Low values indicate declining organizational activity

2. **tenant_max_active_users_30d**: Peak active user count
   - Shows organizational engagement capacity

3. **tenant_total_linked_aris_30d**: Total linked artifacts (integration usage)
   - Indicates platform adoption depth

#### Joining User and Tenant Features

During training set creation, the Feature Engineering Client will:
1. Compute user-level features for each user
2. Compute tenant-level features for each tenant
3. Join them based on the user-tenant relationship
4. Ensure point-in-time correctness for both levels

This multi-entity approach captures both individual and organizational signals.

In [0]:
# Feature 23: Average tenant key events (organizational activity)
tenant_avg_key_events_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="tenant_avg_key_events_30d",
    description="Average daily key events count for tenant over 30 days",
    source=tenant_metrics_source,
    inputs=["key_events_count"],
    function=Avg(),
    time_window=TumblingWindow(
        window_duration=timedelta(days=30)
    )
)

# Feature 24: Max active users (tenant health)
tenant_max_active_users_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="tenant_max_active_users_30d",
    description="Maximum daily active users for tenant over 30 days",
    source=tenant_metrics_source,
    inputs=["active_users_count"],
    function=Max(),
    time_window=TumblingWindow(
        window_duration=timedelta(days=30)
    )
)

# Feature 25: Total linked ARIs (integration depth)
tenant_total_aris_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="tenant_total_linked_aris_30d",
    description="Total linked ARIs for tenant over 30 days",
    source=tenant_metrics_source,
    inputs=["linked_aris_count"],
    function=Sum(),
    time_window=TumblingWindow(
        window_duration=timedelta(days=30)
    )
)

# Feature 26: Average reporting lines (organization structure)
tenant_avg_reporting_lines_30d = fe.create_feature(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    name="tenant_avg_reporting_lines_30d",
    description="Average reporting lines count for tenant over 30 days",
    source=tenant_metrics_source,
    inputs=["reporting_lines_count"],
    function=Avg(),
    time_window=TumblingWindow(
        window_duration=timedelta(days=30)
    )
)

print("Tenant-level features created:")
print("  - tenant_avg_key_events_30d")
print("  - tenant_max_active_users_30d")
print("  - tenant_total_linked_aris_30d")
print("  - tenant_avg_reporting_lines_30d")

## 4. Collect All Features

### Feature Organization

Before materializing or using features, we organize them into logical groups:

1. **User Engagement Features**: Click and session behavior
2. **Confluence Collaboration Features**: Documentation and knowledge sharing
3. **Jira Activity Features**: Project management and issue tracking
4. **Statistical Features**: Variance and consistency metrics
5. **Tenant-Level Features**: Organizational health indicators

### Why Organize Features?

- **Maintainability**: Easy to update or remove feature groups
- **Materialization**: Can materialize different groups to different tables
- **Experimentation**: Easy to test models with different feature combinations
- **Documentation**: Clear feature lineage and purpose

### Feature Naming Convention

Our features follow a consistent naming pattern:
- `{source}_{metric}_{window}` (e.g., `user_clicks_7d`)
- `{source}_{aggregation}_{metric}_{window}` (e.g., `avg_session_duration_7d`)

This makes features self-documenting and easy to understand.

In [0]:
fe.list_features(full_name='ananyaroy.feature_store.*')

In [0]:
# Organize features into separate lists by type

# User Engagement Features (SlidingWindow)
user_engagement_features = [
    f"{CATALOG}.{SCHEMA}.user_clicks_7d",
    f"{CATALOG}.{SCHEMA}.user_clicks_30d",
    f"{CATALOG}.{SCHEMA}.avg_session_duration_7d",
    f"{CATALOG}.{SCHEMA}.total_session_duration_30d",
    f"{CATALOG}.{SCHEMA}.unique_products_visited_30d"
]

# Confluence Collaboration Features (SlidingWindow)
confluence_features = [
    f"{CATALOG}.{SCHEMA}.confluence_dwell_time_14d",
    f"{CATALOG}.{SCHEMA}.confluence_avg_dwell_30d",
    f"{CATALOG}.{SCHEMA}.confluence_comments_30d",
    f"{CATALOG}.{SCHEMA}.confluence_unique_pages_30d",
    f"{CATALOG}.{SCHEMA}.confluence_events_7d_lagged",
    f"{CATALOG}.{SCHEMA}.total_dwell_with_view_30d_lagged"
]

# Jira Activity Features (TumblingWindow)
jira_features = [
    f"{CATALOG}.{SCHEMA}.jira_activities_weekly",
    f"{CATALOG}.{SCHEMA}.jira_max_reactions_weekly",
    f"{CATALOG}.{SCHEMA}.jira_total_reactions_monthly",
    f"{CATALOG}.{SCHEMA}.jira_unique_issues_monthly",
    f"{CATALOG}.{SCHEMA}.jira_min_reactions_weekly",
    f"{CATALOG}.{SCHEMA}.jsw_activities_30d",
    f"{CATALOG}.{SCHEMA}.bug_activities_30d",
    f"{CATALOG}.{SCHEMA}.issues_resolved_30d"
]

# Statistical Features (StddevPop)
statistical_features = [
    f"{CATALOG}.{SCHEMA}.stddev_session_duration_30d",
    f"{CATALOG}.{SCHEMA}.stddev_confluence_dwell_30d",
    f"{CATALOG}.{SCHEMA}.stddev_jira_reactions_30d"
]

# Tenant-Level Features
tenant_features = [
    f"{CATALOG}.{SCHEMA}.tenant_avg_key_events_30d",
    f"{CATALOG}.{SCHEMA}.tenant_max_active_users_30d",
    f"{CATALOG}.{SCHEMA}.tenant_total_linked_aris_30d",
    f"{CATALOG}.{SCHEMA}.tenant_avg_reporting_lines_30d"
]

# Print summary
print("Feature Lists Created:")
print(f"\n1. User Engagement Features (SlidingWindow): {len(user_engagement_features)} features")
for f in user_engagement_features:
    print(f"   - {f.split('.')[-1]}")

print(f"\n2. Confluence Collaboration Features (SlidingWindow): {len(confluence_features)} features")
for f in confluence_features:
    print(f"   - {f.split('.')[-1]}")

print(f"\n3. Jira Activity Features : {len(jira_features)} features")
for f in jira_features:
    print(f"   - {f.split('.')[-1]}")

print(f"\n5. Statistical Features (StddevPop): {len(statistical_features)} features")
for f in statistical_features:
    print(f"   - {f.split('.')[-1]}")

print(f"\n6. Tenant-Level Features: {len(tenant_features)} features")
for f in tenant_features:
    print(f"   - {f.split('.')[-1]}")

print(f"\nTotal: {len(user_engagement_features) + len(confluence_features) + len(jira_features) + len(statistical_features) + len(tenant_features)} features organized")

## 5. Materialize Features

### What is Feature Materialization?

**Feature Materialization** is the process of pre-computing and storing features for reuse:

#### Benefits

1. **Performance**: Compute once, use many times
2. **Consistency**: Same feature values across training and inference
3. **Cost Efficiency**: Avoid recomputing expensive aggregations
4. **Freshness**: Scheduled updates keep features current
5. **Serving**: Enable low-latency online serving

### Offline vs Online Stores

| Offline Store | Online Store |
|--------------|-------------|
| Batch training and inference | Real-time serving |
| Delta tables in Unity Catalog | Low-latency key-value store |
| High throughput | Low latency (milliseconds) |
| Historical data | Current data |

### Materialization Configuration

```python
OfflineStoreConfig(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    table_name_prefix="feature_group_"  # Creates tables with this prefix
)
```

### Scheduling

We'll use cron scheduling to keep features fresh:
- `"0 0 * * * ?"` = Daily at midnight UTC
- `pipeline_state="ACTIVE"` = Enable automatic updates

### What We'll Materialize

We'll create separate materialized tables for each feature group:
1. User engagement features → `user_engagement_*` tables
2. Confluence features → `confluence_collaboration_*` tables
3. Jira features → `jira_activity_*` tables
4. Statistical features → `statistical_metrics_*` tables
5. Tenant features → `tenant_level_*` tables

In [0]:
from databricks.feature_engineering.entities import OfflineStoreConfig

# Get feature objects for user engagement features
user_engagement_feature_objects = []
for feature_name in ["user_clicks_7d", "user_clicks_30d", "avg_session_duration_7d", 
                     "total_session_duration_30d", "unique_products_visited_30d"]:
    full_name = f"{CATALOG}.{SCHEMA}.{feature_name}"
    try:
        feature = fe.get_feature(full_name=full_name)
        user_engagement_feature_objects.append(feature)
    except Exception as e:
        print(f"Failed to get {feature_name}: {e}")

# Configure offline store for user engagement features
user_engagement_offline_store = OfflineStoreConfig(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    table_name_prefix="user_engagement"
)

# Materialize user engagement features
fe.materialize_features(
    features=user_engagement_feature_objects,
    offline_config=user_engagement_offline_store,
    pipeline_state="ACTIVE",
    cron_schedule="0 0 * * * ?"  # Daily at midnight
)

print(f"✓ Materialized {len(user_engagement_feature_objects)} user engagement features with prefix 'user_engagement'")

In [0]:
# Get feature objects for confluence features
confluence_feature_objects = []
for feature_name in ["confluence_dwell_time_14d", "confluence_avg_dwell_30d", "confluence_comments_30d",
                     "confluence_unique_pages_30d", "confluence_events_7d_lagged", "total_dwell_with_view_30d_lagged"]:
    full_name = f"{CATALOG}.{SCHEMA}.{feature_name}"
    try:
        feature = fe.get_feature(full_name=full_name)
        confluence_feature_objects.append(feature)
    except Exception as e:
        print(f"Failed to get {feature_name}: {e}")

# Configure offline store for confluence features
confluence_offline_store = OfflineStoreConfig(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    table_name_prefix="confluence_collaboration"
)

# Materialize confluence features
fe.materialize_features(
    features=confluence_feature_objects,
    offline_config=confluence_offline_store,
    pipeline_state="ACTIVE",
    cron_schedule="0 0 * * * ?"  # Daily at midnight
)

print(f"✓ Materialized {len(confluence_feature_objects)} confluence features with prefix 'confluence_collaboration'")

In [0]:
# Get feature objects for jira features
jira_feature_objects = []
for feature_name in ["jira_activities_weekly", "jira_max_reactions_weekly", "jira_total_reactions_monthly",
                     "jira_unique_issues_monthly", "jira_min_reactions_weekly", "jsw_activities_30d",
                     "bug_activities_30d", "issues_resolved_30d"]:
    full_name = f"{CATALOG}.{SCHEMA}.{feature_name}"
    try:
        feature = fe.get_feature(full_name=full_name)
        jira_feature_objects.append(feature)
    except Exception as e:
        print(f"Failed to get {feature_name}: {e}")

# Configure offline store for jira features
jira_offline_store = OfflineStoreConfig(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    table_name_prefix="jira_activity"
)

# Materialize jira features
fe.materialize_features(
    features=jira_feature_objects,
    offline_config=jira_offline_store,
    pipeline_state="ACTIVE",
    cron_schedule="0 0 * * * ?"  # Daily at midnight
)

print(f"✓ Materialized {len(jira_feature_objects)} jira features with prefix 'jira_activity'")

In [0]:
# Get feature objects for statistical features
statistical_feature_objects = []
for feature_name in ["stddev_session_duration_30d", "stddev_confluence_dwell_30d", "stddev_jira_reactions_30d"]:
    full_name = f"{CATALOG}.{SCHEMA}.{feature_name}"
    try:
        feature = fe.get_feature(full_name=full_name)
        statistical_feature_objects.append(feature)
    except Exception as e:
        print(f"Failed to get {feature_name}: {e}")

# Configure offline store for statistical features
statistical_offline_store = OfflineStoreConfig(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    table_name_prefix="statistical_metrics"
)

# Materialize statistical features
fe.materialize_features(
    features=statistical_feature_objects,
    offline_config=statistical_offline_store,
    pipeline_state="ACTIVE",
    cron_schedule="0 0 * * * ?"  # Daily at midnight
)

print(f"✓ Materialized {len(statistical_feature_objects)} statistical features with prefix 'statistical_metrics'")

In [0]:
# Get feature objects for tenant features
tenant_feature_objects = []
for feature_name in ["tenant_avg_key_events_30d", "tenant_max_active_users_30d", 
                     "tenant_total_linked_aris_30d", "tenant_avg_reporting_lines_30d"]:
    full_name = f"{CATALOG}.{SCHEMA}.{feature_name}"
    try:
        feature = fe.get_feature(full_name=full_name)
        tenant_feature_objects.append(feature)
    except Exception as e:
        print(f"Failed to get {feature_name}: {e}")

# Configure offline store for tenant features
tenant_offline_store = OfflineStoreConfig(
    catalog_name=CATALOG,
    schema_name=SCHEMA,
    table_name_prefix="tenant_level"
)

# Materialize tenant features
fe.materialize_features(
    features=tenant_feature_objects,
    offline_config=tenant_offline_store,
    pipeline_state="ACTIVE",
    cron_schedule="0 0 * * * ?"  # Daily at midnight
)

print(f"✓ Materialized {len(tenant_feature_objects)} tenant features with prefix 'tenant_level'")

In [0]:
# Summary of all materialized feature tables
print("\n" + "="*60)
print("FEATURE MATERIALIZATION SUMMARY")
print("="*60)
print(f"\nAll features materialized to: {CATALOG}.{SCHEMA}")
print("\nTable Prefixes Created:")
print(f"  1. user_engagement_* ({len(user_engagement_feature_objects)} features)")
print(f"  2. confluence_collaboration_* ({len(confluence_feature_objects)} features)")
print(f"  3. jira_activity_* ({len(jira_feature_objects)} features)")
print(f"  4. statistical_metrics_* ({len(statistical_feature_objects)} features)")
print(f"  5. tenant_level_* ({len(tenant_feature_objects)} features)")
print(f"\nTotal Features Materialized: {len(user_engagement_feature_objects) + len(confluence_feature_objects) + len(jira_feature_objects) + len(statistical_feature_objects) + len(tenant_feature_objects)}")
print(f"\nSchedule: Daily at midnight (0 0 * * * ?)")
print(f"Pipeline State: ACTIVE")
print("="*60)

## 6. Prepare Training Data with Labels

### Understanding Training Labels

For supervised learning, we need labeled data:
- **Label**: `churned` (1 = churned, 0 = retained)
- **Event Time**: `event_ts` - the point in time when we make the prediction
- **Entity Keys**: `user_id` and `tenant_id` to join with features

### Point-in-Time Correctness

The `event_ts` is critical for preventing **data leakage**:

**Without Point-in-Time Correctness**  
- Features might include data from AFTER the prediction time
- Model learns from "future" information
- Unrealistic performance in training, poor performance in production

**With Point-in-Time Correctness**  
- Features only use data available BEFORE event_ts
- Realistic training conditions
- Production performance matches training performance

### Example

If `event_ts = 2024-01-15`:
- `user_clicks_7d` will count clicks from 2024-01-08 to 2024-01-15
- `user_clicks_30d` will count clicks from 2023-12-16 to 2024-01-15
- No data after 2024-01-15 will be included

The Feature Engineering Client handles this automatically!

In [0]:
# Load churn labels with user and tenant info
labels_df = spark.table(f"{CATALOG}.{SOURCE_SCHEMA}.churn_labels")

# Rename label_ts to be used as point-in-time reference
labels_df = labels_df.withColumnRenamed("label_ts", "event_ts")

print(f"Labels loaded: {labels_df.count()} records")
print(f"Churn rate: {labels_df.filter(F.col('churned') == 1).count() / labels_df.count():.2%}")
labels_df.show(5)

## 7. Create Training Set with Point-in-Time Correctness

### The create_training_set() API

This is where the magic happens! `create_training_set()` automatically:

1. **Identifies Entity Keys**: Matches `user_id` and `tenant_id` from labels to features
2. **Performs Point-in-Time Joins**: Uses `event_ts` to compute features correctly
3. **Handles Multiple Entities**: Joins both user-level and tenant-level features
4. **Computes Aggregations**: Calculates all window-based features on-the-fly
5. **Returns Training DataFrame**: Ready-to-use data for model training

### API Parameters

```python
fe.create_training_set(
    df=labels_df,                    # Labels with event_ts
    features=feature_objects,        # List of Feature objects
    label="churned",                 # Target column name
    exclude_columns=["user_id", "event_ts"]  # Columns to exclude from features
)
```

### What Happens Behind the Scenes

For each row in `labels_df`:
1. Extract `user_id`, `tenant_id`, and `event_ts`
2. For each feature:
   - Find the source table
   - Filter to the entity (user or tenant)
   - Apply the time window relative to `event_ts`
   - Compute the aggregation
3. Join all features together
4. Return complete training dataset

### No Manual Joins Required!

Traditionally, you'd write complex Spark SQL with:
- Multiple window functions
- Careful timestamp filtering
- Manual joins across tables

The declarative API handles all of this automatically and correctly.

In [0]:
labels_df = labels_df.withColumnRenamed('observation_ts' , 'event_ts')
display(labels_df)

In [0]:
# Create training set with user-level features
# The API automatically handles point-in-time joins based on event_ts

# Convert feature names to Feature objects

user_training_set = fe.create_training_set(
    df=labels_df,
    features=user_engagement_feature_objects + confluence_feature_objects + jira_feature_objects + statistical_feature_objects  ,
    label="churned",
    exclude_columns=["user_id","event_ts","tenant_id"]  # Exclude timestamp from features
)

# Load the training DataFrame
user_training_df = user_training_set.load_df()

print(f"User training set created with {len(user_training_df.columns)} columns")
print(f"Columns: {user_training_df.columns}")
display(user_training_df)

## 8. Train Churn Prediction Model

### Model Training Workflow

Now that we have our training set with all features, we'll:

1. **Convert to Pandas**: sklearn works with Pandas DataFrames
2. **Prepare Features**: Select feature columns, handle missing values
3. **Train-Test Split**: Stratified split to maintain class balance
4. **Train Model**: Gradient Boosting Classifier
5. **Evaluate**: Classification metrics and feature importance
6. **Log with MLflow**: Track experiment with feature lineage

### Why Gradient Boosting?

- **Handles Mixed Features**: Works well with count, average, and variance features
- **Feature Importance**: Provides interpretable feature rankings
- **Robust to Missing Data**: Can handle nulls in features
- **Good Performance**: Strong baseline for tabular data

### Feature Importance Analysis

After training, we'll examine which features are most predictive:
- Helps validate feature engineering choices
- Identifies key churn signals
- Guides future feature development

### MLflow Integration

We'll log:
- Model parameters and metrics
- Feature metadata (counts by category)
- Feature names as JSON artifact
- Model with feature lineage (next section)

This creates a complete audit trail from raw data to predictions.

## 9. Log Model with Feature Lineage

### What is Feature Lineage?

**Feature Lineage** tracks the complete data flow from source tables to model predictions:

```
Source Tables → Features → Training Set → Model → Predictions
```

### Why Feature Lineage Matters

1. **Reproducibility**: Know exactly which data created which model
2. **Debugging**: Trace prediction issues back to source data
3. **Compliance**: Audit trail for regulated industries
4. **Impact Analysis**: Understand downstream effects of data changes
5. **Automatic Feature Computation**: Model knows how to compute its features

### The log_model() API

```python
fe.log_model(
    model=model,                     # Trained sklearn model
    artifact_path="churn_model",     # Path in MLflow
    flavor=mlflow.sklearn,           # Model flavor
    training_set=user_training_set,  # Links to feature definitions
    registered_model_name="..."      # Register in Unity Catalog
)
```

### What Gets Stored

1. **Model Artifact**: The trained model itself
2. **Feature Metadata**: Names, sources, aggregations, windows
3. **Source Table References**: Which tables contain the raw data
4. **Computation Logic**: How to compute features from sources
5. **Entity Keys**: How to join features

### Benefits for Inference

When you load this model for inference:
- Provide just the entity keys (`user_id`, `tenant_id`) and timestamp
- Model automatically computes all required features
- No need to manually replicate feature engineering logic
- Guaranteed consistency between training and inference

In [0]:
import pandas as pd 
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report
from sklearn.metrics import roc_auc_score

with mlflow.start_run(run_name="fs_churn_prediction") as run :

    # Convert to Pandas for sklearn training
    training_pdf = user_training_df.toPandas()

    # Prepare features and labels
    feature_columns = [col for col in training_pdf.columns if col not in ["user_id", "tenant_id", "churned"]]
    X = training_pdf[feature_columns].fillna(0)
    y = training_pdf["churned"]

    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    print(f"Training set: {len(X_train)} samples")
    print(f"Test set: {len(X_test)} samples")
    print(f"Feature count: {len(feature_columns)}")

    # Train Gradient Boosting Classifier
    model = Pipeline([('imputer',SimpleImputer(strategy="median")),('classifier',GradientBoostingClassifier(
        n_estimators=100,
        learning_rate=0.1,
        max_depth=5,
        random_state=42))]
    )

    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    print("Model Training Complete")
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred))
    print(f"\nROC-AUC Score: {roc_auc_score(y_test, y_pred_proba):.4f}")

    feature_importance = pd.DataFrame({
        "feature": feature_columns,
        "importance": model.named_steps['classifier'].feature_importances_
    }).sort_values("importance", ascending=False)

    print("Top 15 Most Important Features for Churn Prediction:")
    display(feature_importance.head(15))

    # Log feature metadata
    all_feature_objects = (
                        user_engagement_feature_objects + 
                        confluence_feature_objects + 
                        jira_feature_objects + 
                        statistical_feature_objects)

    # Log feature counts by category
    mlflow.log_param("num_user_engagement_features", len(user_engagement_feature_objects))
    mlflow.log_param("num_confluence_features", len(confluence_feature_objects))
    mlflow.log_param("num_jira_features", len(jira_feature_objects))
    mlflow.log_param("num_statistical_features", len(statistical_feature_objects))
    mlflow.log_param("total_features", len(all_feature_objects))

    # Log feature names as a JSON artifact
    import json
    feature_metadata = {
        "user_engagement_features": [f.name for f in user_engagement_feature_objects],
        "confluence_features": [f.name for f in confluence_feature_objects],
        "jira_features": [f.name for f in jira_feature_objects],
        "statistical_features": [f.name for f in statistical_feature_objects]
    }

    with open("/tmp/feature_metadata.json", "w") as f:
        json.dump(feature_metadata, f, indent=2)
    mlflow.log_artifact("/tmp/feature_metadata.json")

    # Log model with feature engineering lineage
    fe.log_model(
        model=model,
        artifact_path="churn_model",
        flavor=mlflow.sklearn,
        training_set=user_training_set,
        registered_model_name=f"{CATALOG}.{SCHEMA}.fs_churn_prediction_model"
    )

    print(f"✓ Model logged with feature lineage")
    print(f"✓ Registered model: {CATALOG}.{SCHEMA}.churn_prediction_model")
    print(f"✓ Total features logged: {len(all_feature_objects)}")
    print(f"\nFeature breakdown:")
    print(f"  - User Engagement: {len(user_engagement_feature_objects)}")
    print(f"  - Confluence: {len(confluence_feature_objects)}")
    print(f"  - Jira: {len(jira_feature_objects)}")
    print(f"  - Statistical: {len(statistical_feature_objects)}")

## 10. Batch Scoring with score_batch()

### Simplified Inference

The `score_batch()` API makes inference incredibly simple:

**Traditional Approach** 
1. Load source tables
2. Manually compute all features
3. Join features together
4. Load model
5. Make predictions
6. Hope feature logic matches training

**With score_batch()** 
1. Provide entity keys and timestamp
2. Call `score_batch()`
3. Get predictions

### How It Works

```python
fe.score_batch(
    model_uri="models:/catalog.schema.model_name/version",
    df=inference_df  # Just needs: user_id, tenant_id, event_ts
)
```

The API:
1. Loads the model and its feature lineage
2. Identifies required features
3. Computes features from source tables (point-in-time correct)
4. Applies the model
5. Returns predictions with all features

### Point-in-Time Correctness in Inference

Just like training, inference respects the timestamp:
- If `event_ts = 2024-02-06`, features only use data up to that date
- Enables backtesting and historical analysis
- For real-time scoring, use `current_timestamp()`

### What You Get Back

The result DataFrame includes:
- Original columns (`user_id`, `tenant_id`, `event_ts`)
- All computed features
- Model predictions

This transparency helps with debugging and model monitoring.

In [0]:
# Prepare inference data (new users to score)
inference_df = spark.table(f"{CATALOG}.{SOURCE_SCHEMA}.user_tenant_mapping").limit(100)
inference_df = inference_df.withColumn("event_ts", F.current_timestamp())

print(f"Inference data: {inference_df.count()} users to score")
inference_df.show(5)

In [0]:

# Score batch with automatic point-in-time feature computation
predictions_df = fe.score_batch(
    model_uri=f"models:/{CATALOG}.{SCHEMA}.fs_churn_prediction_model/2",
    df=inference_df
)
display(predictions_df)
# print("Batch scoring complete")
# print(f"Predictions: {predictions_df.count()} rows")
# predictions_df.select("user_id", "tenant_id", "prediction").show(10)

In [0]:
display( predictions_df.groupBy("prediction").count())

## Summary

### What We Accomplished

This notebook demonstrated a **complete end-to-end feature engineering and ML workflow** using Databricks declarative APIs:

1.  Defined data sources with entity and timeseries columns
2.  Created 26 features using multiple aggregation functions and window types
3.  Materialized features to offline stores with scheduling
4.  Generated point-in-time correct training data
5.  Trained a churn prediction model
6.  Logged model with complete feature lineage
7.  Performed batch inference with automatic feature computation

---

### APIs Reference

| API | Purpose | Key Parameters |
|-----|---------|---------------|
| `FeatureEngineeringClient()` | Initialize client | None |
| `DeltaTableSource()` | Define source | `catalog_name`, `schema_name`, `table_name`, `entity_columns`, `timeseries_column` |
| `create_feature()` | Create feature | `name`, `source`, `inputs`, `function`, `time_window`, `filter_condition` |
| `create_training_set()` | Generate training data | `df`, `features`, `label`, `exclude_columns` |
| `materialize_features()` | Store features | `features`, `offline_config`, `online_config`, `pipeline_state`, `cron_schedule` |
| `log_model()` | Log with lineage | `model`, `artifact_path`, `flavor`, `training_set`, `registered_model_name` |
| `score_batch()` | Batch inference | `model_uri`, `df` |

---

### Aggregation Functions

| Function | Use Case | Example |
|----------|----------|--------|
| `Count()` | Event frequency | Total clicks, activities |
| `Sum()` | Cumulative values | Total duration, reactions |
| `Avg()` | Central tendency | Average session length |
| `Min()` / `Max()` | Range analysis | Peak/minimum engagement |
| `StddevPop()` | Consistency | Behavior variance |
| `ApproxCountDistinct()` | Cardinality | Unique products, pages |

---

### Time Window Types

#### SlidingWindow
- **Pattern**: Overlapping rolling windows
- **Use Case**: Trend analysis, smooth metrics
- **Example**: "Clicks in last 7 days" (slides daily)
- **Parameters**: `window_duration`, `slide_duration`

#### TumblingWindow
- **Pattern**: Non-overlapping fixed windows
- **Use Case**: Periodic reports, discrete buckets
- **Example**: "Weekly activity count"
- **Parameters**: `window_duration`

#### ContinuousWindow
- **Pattern**: Lookback from event time
- **Use Case**: Point-in-time aggregations with offset
- **Example**: "Events in last 30 days with 1-day lag"
- **Parameters**: `window_duration`, `offset`

---

### Features Created

**User Engagement (5 features)**
- Click counts (7d, 30d)
- Session duration (avg 7d, total 30d)
- Product diversity (30d)

**Confluence Collaboration (6 features)**
- Dwell time (14d, avg 30d)
- Comments (30d)
- Unique pages (30d)
- Lagged metrics (7d, 30d)

**Jira Activity (5 features)**
- Weekly activities
- Reaction statistics (max, min, total)
- Unique issues (monthly)

**Filtered Features (2 features)**
- JSW-specific activities
- Bug-related activities

**Statistical Features (3 features)**
- Session duration variance
- Confluence dwell variance
- Jira reaction variance

**Tenant-Level Features (3 features)**
- Average key events
- Max active users
- Total linked artifacts

**Total: 24 features** across multiple entities and time windows

---

### Best Practices

1. **Use Descriptive Names**: `user_clicks_7d` is better than `feature_1`
2. **Organize by Category**: Group related features for maintainability
3. **Choose Appropriate Windows**: Match window type to use case
4. **Leverage Filter Conditions**: Create focused features without pre-filtering
5. **Include Statistical Features**: Variance often has high predictive power
6. **Multi-Entity Features**: Combine user and organizational signals
7. **Point-in-Time Correctness**: Always use observation timestamps
8. **Feature Lineage**: Log models with training sets for reproducibility
9. **Materialize Strategically**: Balance freshness, cost, and latency
10. **Monitor Feature Quality**: Track feature distributions over time

---

### Next Steps

1. **Experiment with Features**: Try different window sizes and aggregations
2. **Add More Sources**: Incorporate additional data sources
3. **Feature Selection**: Use feature importance to prune low-value features
4. **Online Serving**: Deploy model with online feature store
5. **Monitoring**: Set up feature drift detection
6. **A/B Testing**: Compare models with different feature sets

---

### Resources

- [Databricks Feature Engineering Documentation](https://docs.databricks.com/machine-learning/feature-store/index.html)
- [Unity Catalog Feature Engineering](https://docs.databricks.com/en/machine-learning/feature-store/uc/feature-tables-uc.html)
- [MLflow Model Registry](https://docs.databricks.com/mlflow/model-registry.html)