# 02 Feature Engineering (Telecom Churn / Cease)

This notebook builds a **leakage-safe modelling dataset** for churn/cease prediction aligned to the business objective:

> **Prioritise retention resources by identifying customers most likely to place a cease in the next 30 days.**

## What this notebook does
- Loads `customer_info`, `calls`, `usage`, `cease`
- Standardises schemas and types
- Removes duplicate snapshots and duplicate event rows
- Builds a **leakage-safe target**: `target_cease_30d`
- Engineers high-value predictive features (contract, payment, calls, usage, trend, recency)
- Produces a clean feature table ready for modelling
- Saves the modelling dataset for the next notebook


In [1]:
# Core libraries
from pathlib import Path
import os
import warnings
warnings.filterwarnings("ignore")

import duckdb
import numpy as np
import pandas as pd

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 200)


In [2]:
# Project path detection (works from /notebooks or repo root)
cwd = Path.cwd()
repo_dir = cwd.parent if cwd.name.lower() in {"notebook", "notebooks"} else cwd

data_dir = repo_dir / "data"
outputs_dir = repo_dir / "outputs"
features_dir = outputs_dir / "features"
outputs_dir.mkdir(parents=True, exist_ok=True)
features_dir.mkdir(parents=True, exist_ok=True)

# Default data paths
cease_path_default = data_dir / "cease.csv"
calls_path_default = data_dir / "calls.csv"
customer_path_default = data_dir / "customer_info.parquet"
usage_path_default = data_dir / "usage.parquet"




In [12]:
# Connect to DuckDB (file-based for reproducibility)
db_path = repo_dir / "notebooks" / "K_telecom_session2.duckdb"
db_path.parent.mkdir(parents=True, exist_ok=True)
con = duckdb.connect(str(db_path))
print("DuckDB:", db_path)


DuckDB: c:\Users\Admin\OneDrive - University of West London\Desktop\AA\TECH_REYAL_project\Talk_talk\Churn_retention_taltalk\notebooks\K_telecom_session2.duckdb


In [16]:
#Recall  Default data paths
cease_path = cease_path_default 
calls_path = calls_path_default 
customer_path = customer_path_default 
usage_path = usage_path_default 



In [15]:
# Register raw views (DuckDB reads files directly; efficient for large parquet)
con.execute(f"CREATE OR REPLACE VIEW customer_raw AS SELECT * FROM read_parquet('{customer_path.as_posix()}')")
con.execute(f"CREATE OR REPLACE VIEW calls_raw    AS SELECT * FROM read_csv_auto('{calls_path.as_posix()}')")
con.execute(f"CREATE OR REPLACE VIEW cease_raw    AS SELECT * FROM read_csv_auto('{cease_path.as_posix()}')")
con.execute(f"CREATE OR REPLACE VIEW usage_raw    AS SELECT * FROM read_parquet('{usage_path.as_posix()}')")

print("Schemas")
display(con.execute("DESCRIBE customer_raw").df())
display(con.execute("DESCRIBE calls_raw").df())
display(con.execute("DESCRIBE cease_raw").df())
display(con.execute("DESCRIBE usage_raw").df())


Schemas


Unnamed: 0,column_name,column_type,null,key,default,extra
0,unique_customer_identifier,VARCHAR,YES,,,
1,datevalue,DATE,YES,,,
2,contract_status,VARCHAR,YES,,,
3,contract_dd_cancels,BIGINT,YES,,,
4,dd_cancel_60_day,INTEGER,YES,,,
5,ooc_days,INTEGER,YES,,,
6,technology,VARCHAR,YES,,,
7,speed,INTEGER,YES,,,
8,line_speed,DOUBLE,YES,,,
9,sales_channel,VARCHAR,YES,,,


Unnamed: 0,column_name,column_type,null,key,default,extra
0,unique_customer_identifier,VARCHAR,YES,,,
1,event_date,DATE,YES,,,
2,call_type,VARCHAR,YES,,,
3,talk_time_seconds,DOUBLE,YES,,,
4,hold_time_seconds,DOUBLE,YES,,,


Unnamed: 0,column_name,column_type,null,key,default,extra
0,unique_customer_identifier,VARCHAR,YES,,,
1,cease_placed_date,DATE,YES,,,
2,cease_completed_date,VARCHAR,YES,,,
3,reason_description,VARCHAR,YES,,,
4,reason_description_insight,VARCHAR,YES,,,


Unnamed: 0,column_name,column_type,null,key,default,extra
0,unique_customer_identifier,VARCHAR,YES,,,
1,calendar_date,DATE,YES,,,
2,usage_download_mbs,VARCHAR,YES,,,
3,usage_upload_mbs,VARCHAR,YES,,,


## 1) Clean and standardise base tables

We normalise date and numeric types, then de-duplicate records before feature engineering.


In [17]:
# Customer snapshot table (standardised)
con.execute("""
CREATE OR REPLACE VIEW customer_info_std AS
SELECT
    unique_customer_identifier,
    CAST(datevalue AS DATE) AS snapshot_date,
    CAST(contract_status AS VARCHAR) AS contract_status,
    TRY_CAST(ooc_days AS DOUBLE) AS ooc_days,
    TRY_CAST(dd_cancel_60_day AS DOUBLE) AS dd_cancel_60_day,
    TRY_CAST(contract_dd_cancels AS DOUBLE) AS contract_dd_cancels,
    CAST(Technology AS VARCHAR) AS technology,
    CAST(crm_package_name AS VARCHAR) AS crm_package_name,
    CAST(sales_channel AS VARCHAR) AS sales_channel,
    TRY_CAST(speed AS DOUBLE) AS speed,
    TRY_CAST(line_speed AS DOUBLE) AS line_speed,
    TRY_CAST(tenure_days AS DOUBLE) AS tenure_days
FROM customer_raw
WHERE unique_customer_identifier IS NOT NULL
  AND datevalue IS NOT NULL
""")

# De-duplicate by customer + snapshot_date
con.execute("""
CREATE OR REPLACE VIEW customer_info_dedup AS
SELECT *
FROM (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY unique_customer_identifier, snapshot_date
               ORDER BY snapshot_date DESC
           ) AS rn
    FROM customer_info_std
)
WHERE rn = 1
""")

# Calls standardisation
con.execute("""
CREATE OR REPLACE VIEW calls_std AS
SELECT
    unique_customer_identifier,
    CAST(event_date AS DATE) AS event_date,
    CAST(call_type AS VARCHAR) AS call_type,
    TRY_CAST(talk_time_seconds AS DOUBLE) AS talk_time_seconds,
    TRY_CAST(hold_time_seconds AS DOUBLE) AS hold_time_seconds
FROM calls_raw
WHERE unique_customer_identifier IS NOT NULL
  AND event_date IS NOT NULL
""")

# De-duplicate call rows (exact duplicates)
con.execute("""
CREATE OR REPLACE VIEW calls_dedup AS
SELECT *
FROM (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY unique_customer_identifier, event_date, call_type,
                            coalesce(talk_time_seconds, -1), coalesce(hold_time_seconds, -1)
               ORDER BY event_date DESC
           ) AS rn
    FROM calls_std
)
WHERE rn = 1
""")

# Cease standardisation
con.execute("""
CREATE OR REPLACE VIEW cease_std AS
SELECT
    unique_customer_identifier,
    CAST(cease_placed_date AS DATE) AS cease_placed_date,
    CAST(cease_completed_date AS DATE) AS cease_completed_date,
    CAST(reason_description AS VARCHAR) AS reason_description,
    CAST(reason_description_insight AS VARCHAR) AS reason_description_insight
FROM cease_raw
WHERE unique_customer_identifier IS NOT NULL
  AND cease_placed_date IS NOT NULL
""")

# Usage standardisation
con.execute("""
CREATE OR REPLACE VIEW usage_std AS
SELECT
    unique_customer_identifier,
    CAST(calendar_date AS DATE) AS usage_date,
    TRY_CAST(usage_download_mbs AS DOUBLE) AS usage_download_mbs,
    TRY_CAST(usage_upload_mbs AS DOUBLE) AS usage_upload_mbs
FROM usage_raw
WHERE unique_customer_identifier IS NOT NULL
  AND calendar_date IS NOT NULL
""")

# Deduplicate usage by customer/date
con.execute("""
CREATE OR REPLACE VIEW usage_dedup AS
SELECT *
FROM (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY unique_customer_identifier, usage_date
               ORDER BY usage_date DESC
           ) AS rn
    FROM usage_std
)
WHERE rn = 1
""")

print("Row counts (raw -> dedup)")
for t in ["customer_info_std","customer_info_dedup","calls_std","calls_dedup","cease_std","usage_std","usage_dedup"]:
    print(t, con.execute(f"SELECT COUNT(*) AS n FROM {t}").fetchone()[0])


Row counts (raw -> dedup)
customer_info_std 3545538


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

customer_info_dedup 3532720
calls_std 628437
calls_dedup 621951
cease_std 146363
usage_std 83185050


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

usage_dedup 82869427


## 2) Leakage-safe target (`target_cease_30d`)

For each customer snapshot, the target is:

- `1` if a cease is placed **after** the snapshot date and **within 30 days**
- else `0`

This matches the retention prioritisation use case (who to call now).


In [19]:
con.execute("""
CREATE OR REPLACE VIEW snapshot_target AS
SELECT
    c.*,
    CASE WHEN EXISTS (
        SELECT 1
        FROM cease_std z
        WHERE z.unique_customer_identifier = c.unique_customer_identifier
          AND z.cease_placed_date > c.snapshot_date
          AND z.cease_placed_date <= c.snapshot_date + INTERVAL 30 DAY
    ) THEN 1 ELSE 0 END AS target_cease_30d
FROM customer_info_dedup c
""")

display(con.execute("""
SELECT target_cease_30d, COUNT(*) AS n
FROM snapshot_target
GROUP BY target_cease_30d
ORDER BY target_cease_30d
""").df())


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,target_cease_30d,n
0,0,3394382
1,1,138338


## 3) Engineer predictive features

### Feature families
- **Contract / lifecycle:** out-of-contract status, contract-end proximity, tenure
- **Payment distress:** direct debit cancels
- **Call behaviour:** total calls, loyalty calls, tech calls, hold/talk ratios, recency
- **Usage trends:** recent usage, prior usage, decline %, recency
- **Product context:** technology, package, sales channel


In [20]:
# Snapshot-level customer features (safe; all available at snapshot time)
con.execute("""
CREATE OR REPLACE VIEW customer_feats AS
SELECT
    unique_customer_identifier,
    snapshot_date,
    target_cease_30d,

    -- raw core attributes
    contract_status,
    technology,
    crm_package_name,
    sales_channel,
    ooc_days,
    tenure_days,
    dd_cancel_60_day,
    contract_dd_cancels,
    speed,
    line_speed,

    -- engineered contract/payment features
    CASE WHEN ooc_days > 0 THEN 1 ELSE 0 END AS is_out_of_contract,
    CASE WHEN ooc_days BETWEEN -30 AND 0 THEN 1 ELSE 0 END AS is_near_contract_end,
    CASE
        WHEN ooc_days < -90 THEN 'In contract >90d'
        WHEN ooc_days BETWEEN -90 AND -31 THEN 'In contract 31-90d'
        WHEN ooc_days BETWEEN -30 AND 0 THEN 'Near contract end'
        WHEN ooc_days BETWEEN 1 AND 30 THEN 'OOC 1-30d'
        WHEN ooc_days BETWEEN 31 AND 90 THEN 'OOC 31-90d'
        WHEN ooc_days > 90 THEN 'OOC 90d+'
        ELSE 'Unknown'
    END AS ooc_days_bucket,
    CASE WHEN coalesce(dd_cancel_60_day,0) > 0 THEN 1 ELSE 0 END AS dd_cancel_flag_60d,
    CASE
        WHEN dd_cancel_60_day IS NULL THEN 'Unknown'
        WHEN dd_cancel_60_day = 0 THEN '0'
        WHEN dd_cancel_60_day = 1 THEN '1'
        WHEN dd_cancel_60_day = 2 THEN '2'
        ELSE '3+'
    END AS dd_cancel_60d_bucket,

    -- speed quality features
    (speed - line_speed) AS speed_gap,
    CASE WHEN speed > 0 THEN (line_speed / speed) ELSE NULL END AS speed_ratio
FROM snapshot_target
""")


<_duckdb.DuckDBPyConnection at 0x22bb73c1f30>

<b>Daily call aggregates (one row per customer per day)</b>

In [45]:
con.execute("""
CREATE OR REPLACE TABLE calls_daily AS
SELECT
    unique_customer_identifier,
    event_date,
    COUNT(*) AS calls_cnt,
    SUM(CASE WHEN lower(coalesce(call_type,'')) LIKE '%loyalty%' THEN 1 ELSE 0 END) AS loyalty_calls_cnt,
    SUM(CASE WHEN lower(coalesce(call_type,'')) LIKE '%tech%' THEN 1 ELSE 0 END) AS tech_calls_cnt,
    SUM(CASE WHEN lower(coalesce(call_type,'')) LIKE '%cs%' THEN 1 ELSE 0 END) AS cs_calls_cnt,
    AVG(talk_time_seconds) AS avg_talk_time,
    AVG(hold_time_seconds) AS avg_hold_time,
    AVG(CASE WHEN talk_time_seconds > 0 THEN hold_time_seconds / talk_time_seconds END) AS avg_hold_ratio,
    MAX(CASE WHEN talk_time_seconds > 0 THEN hold_time_seconds / talk_time_seconds END) AS max_hold_ratio
FROM calls_dedup
GROUP BY 1,2
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<_duckdb.DuckDBPyConnection at 0x22bb73c1f30>

<b>Snapshot call features from daily aggregates</b>

In [46]:
con.execute("""
CREATE OR REPLACE VIEW call_feats AS
SELECT
    s.unique_customer_identifier,
    s.snapshot_date,

    -- rolling counts
    SUM(cd.calls_cnt) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 7 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS calls_7d,

    SUM(cd.calls_cnt) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 30 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS calls_30d,

    SUM(cd.calls_cnt) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 60 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS calls_60d,

    -- rolling call-type counts
    SUM(cd.loyalty_calls_cnt) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 30 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS loyalty_calls_30d,

    SUM(cd.tech_calls_cnt) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 30 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS tech_calls_30d,

    SUM(cd.cs_calls_cnt) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 30 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS cs_calls_30d,

    -- duration/hold behaviour (averaged across days with activity)
    AVG(cd.avg_talk_time) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 30 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS avg_talk_time_30d,

    AVG(cd.avg_hold_time) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 30 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS avg_hold_time_30d,

    AVG(cd.avg_hold_ratio) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 30 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS avg_hold_ratio_30d,

    MAX(cd.max_hold_ratio) FILTER (
      WHERE cd.event_date > s.snapshot_date - INTERVAL 30 DAY
        AND cd.event_date <= s.snapshot_date
    ) AS max_hold_ratio_30d,

    -- recency
    DATE_DIFF('day', MAX(cd.event_date), s.snapshot_date) AS days_since_last_call,
    DATE_DIFF('day',
      MAX(CASE WHEN cd.loyalty_calls_cnt > 0 THEN cd.event_date END),
      s.snapshot_date
    ) AS days_since_last_loyalty_call

FROM snapshot_target s
LEFT JOIN calls_daily cd
  ON s.unique_customer_identifier = cd.unique_customer_identifier
 AND cd.event_date <= s.snapshot_date
 AND cd.event_date > s.snapshot_date - INTERVAL 60 DAY
GROUP BY 1,2
""")

<_duckdb.DuckDBPyConnection at 0x22bb73c1f30>

<b>Daily usage aggregate (one row per customer per day)</b>

<b>Create a snapshot sample first (e.g., 5% or fixed N) </b>

In [48]:
con.execute("""
CREATE OR REPLACE TABLE snapshot_sample AS
SELECT *
FROM snapshot_target
WHERE random() <= 0.05
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<_duckdb.DuckDBPyConnection at 0x22bb73c1f30>

<b> Create a list of customers in the sample </b>

In [49]:
con.execute("""
CREATE OR REPLACE TABLE sample_customers AS
SELECT DISTINCT unique_customer_identifier
FROM snapshot_sample
""")

<_duckdb.DuckDBPyConnection at 0x22bb73c1f30>

<b> Compute usage features directly</b>

In [51]:
from pathlib import Path

# Low-memory settings (run BEFORE query)
con.execute("SET threads=1")
con.execute("SET preserve_insertion_order=false")
con.execute("SET memory_limit='11GB'")
Path("../temp_duckdb").mkdir(parents=True, exist_ok=True)
#con.execute("PRAGMA temp_directory='../temp_duckdb'")

con.execute("""
CREATE OR REPLACE VIEW usage_feats_sample AS
SELECT
    s.unique_customer_identifier,
    s.snapshot_date,

    -- 30d total usage (download + upload)
    SUM(coalesce(u.usage_download_mbs,0) + coalesce(u.usage_upload_mbs,0)) FILTER (
      WHERE u.usage_date > s.snapshot_date - INTERVAL 30 DAY
        AND u.usage_date <= s.snapshot_date
    ) AS usage_30d_total_mb,

    -- previous 30d usage
    SUM(coalesce(u.usage_download_mbs,0) + coalesce(u.usage_upload_mbs,0)) FILTER (
      WHERE u.usage_date > s.snapshot_date - INTERVAL 60 DAY
        AND u.usage_date <= s.snapshot_date - INTERVAL 30 DAY
    ) AS usage_prev_30d_total_mb,

    DATE_DIFF('day', MAX(u.usage_date), s.snapshot_date) AS days_since_last_usage

FROM snapshot_sample s
LEFT JOIN usage_dedup u
  ON s.unique_customer_identifier = u.unique_customer_identifier
 AND u.usage_date <= s.snapshot_date
 AND u.usage_date > s.snapshot_date - INTERVAL 60 DAY
GROUP BY 1,2
""")

<_duckdb.DuckDBPyConnection at 0x22bb73c1f30>

In [30]:
con.execute("SET threads=2")
con.execute("SET preserve_insertion_order=false")
con.execute("SET memory_limit='12GB'")   # set below your machine RAM
#con.execute("PRAGMA temp_directory='../temp_duckdb'")  # allow disk spilling

<_duckdb.DuckDBPyConnection at 0x22bb73c1f30>

## 4) Save modelling dataset for Notebook 03

We save both **Parquet** (fast, typed) and **CSV** (portable).


In [41]:
con.execute("SET threads=1")
con.execute("SET preserve_insertion_order=false")
con.execute("SET memory_limit='10 GB'")

<_duckdb.DuckDBPyConnection at 0x22bb73c1f30>

In [None]:
from pathlib import Path

features_dir = Path("../outputs/features")
features_dir.mkdir(parents=True, exist_ok=True)

sample_out = features_dir / "telecom_churn_model_features_slim_5pct_random.parquet"

con.execute(f"""
    COPY (
        SELECT
            unique_customer_identifier,
            snapshot_date,
            target_cease_30d,
            contract_status, technology, crm_package_name, sales_channel,
            ooc_days, tenure_days, dd_cancel_60_day, contract_dd_cancels,
            is_out_of_contract, is_near_contract_end,
            speed, line_speed, speed_gap, speed_ratio,
            calls_7d, calls_30d, calls_60d,
            loyalty_calls_30d, tech_calls_30d, cs_calls_30d,
            avg_talk_time_30d, avg_hold_time_30d, avg_hold_ratio_30d, max_hold_ratio_30d,
            days_since_last_call, days_since_last_loyalty_call,
            usage_7d_total_mb, usage_30d_total_mb, usage_prev_30d_total_mb,
            avg_daily_usage_30d, days_since_last_usage,
            usage_change_abs_30d, usage_change_pct_30d,
            usage_decline_bucket,
            calls_30d_x_ooc, loyalty_calls_x_ddcancel, usage30d_x_ooc
        FROM model_features_v1
        WHERE random() <= 0.00005
    )
    TO '{sample_out.as_posix()}'
    (FORMAT PARQUET)
""")

print("Saved:", sample_out)