In [1]:
# notebook parameters

import os

spark_master = "local[*]"
app_name = "augment"
input_file = os.path.join("data", "WA_Fn-UseC_-Telco-Customer-Churn-.csv")
output_prefix = ""
output_mode = "overwrite"
output_kind = "csv"


# Sanity-checking

We're going to make sure we're running with a compatible JVM first — if we run on macOS, we might get one that doesn't work with Scala.

In [2]:
from os import getenv

In [3]:
getenv("JAVA_HOME")

# Spark setup

In [4]:
import pyspark

In [5]:
session = pyspark.sql.SparkSession.builder \
    .master(spark_master) \
    .appName(app_name) \
    .getOrCreate()
session

# Schema definition

Most of the fields are strings representing booleans or categoricals, but a few (`tenure`, `MonthlyCharges`, and `TotalCharges`) are numeric.

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

fields = ["customerID", "gender", "SeniorCitizen", "Partner", "Dependents", "tenure", "PhoneService", "MultipleLines", "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies", "Contract", "PaperlessBilling", "PaymentMethod", "MonthlyCharges", "TotalCharges", "Churn"]
double_fields = set(["tenure", "MonthlyCharges", "TotalCharges"])

schema = pyspark.sql.types.StructType(
    [ pyspark.sql.types.StructField(f, DoubleType() if f in double_fields else StringType()) for f in fields ]
)

In [7]:
original_df = session.read.csv(input_file, header=True, schema=schema)
df = original_df

# Categorical and boolean features

In [8]:
columns = ["SeniorCitizen", 
           "Partner", 
           "Dependents", 
           "PhoneService", 
           "MultipleLines", 
           "InternetService", 
           "OnlineSecurity", 
           "OnlineBackup", 
           "DeviceProtection", 
           "TechSupport", 
           "StreamingTV", 
           "StreamingMovies",
           "Contract",
           "PaperlessBilling",
           "PaymentMethod"]

for c in columns:
    print(c, [row[0] for row in original_df.select(c).distinct().rdd.collect()])

SeniorCitizen ['0', '1']
Partner ['No', 'Yes']
Dependents ['No', 'Yes']
PhoneService ['No', 'Yes']
MultipleLines ['No phone service', 'No', 'Yes']
InternetService ['Fiber optic', 'No', 'DSL']
OnlineSecurity ['No', 'Yes', 'No internet service']
OnlineBackup ['No', 'Yes', 'No internet service']
DeviceProtection ['No', 'Yes', 'No internet service']
TechSupport ['No', 'Yes', 'No internet service']
StreamingTV ['No', 'Yes', 'No internet service']
StreamingMovies ['No', 'Yes', 'No internet service']
Contract ['Month-to-month', 'One year', 'Two year']
PaperlessBilling ['No', 'Yes']
PaymentMethod ['Credit card (automatic)', 'Mailed check', 'Bank transfer (automatic)', 'Electronic check']


# Splitting the data frame

The training data schema looks like this:

- customerID
- gender
- SeniorCitizen
- Partner
- Dependents
- tenure
- PhoneService
- MultipleLines
- InternetService
- OnlineSecurity
- OnlineBackup
- DeviceProtection
- TechSupport
- StreamingTV
- StreamingMovies
- Contract
- PaperlessBilling
- PaymentMethod
- MonthlyCharges
- TotalCharges
- Churn

We want to divide the data frame into several frames that we can join together in an ETL job.

Those frames will look like this:

- **Customer metadata**
  - customerID
  - gender
  - date of birth (we'll derive age and senior citizen status from this)
  - Partner
  - Dependents
  - (nominal) MonthlyCharges
- **Billing events**
  - customerID
  - date (we'll derive tenure from the number/duration of billing events)
  - kind (one of "AccountCreation", "Charge", or "AccountTermination")
  - value (either a positive nonzero amount or 0.00; we'll derive TotalCharges from the sum of amounts and Churn from the existence of an AccountTermination event)
- **Customer phone features**
  - customerID
  - feature (one of "PhoneService" or "MultipleLines")
- **Customer internet features**
  - customerID
  - feature (one of "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies")
  - value (one of "Fiber", "DSL", "Yes", "No")
- **Customer account features**
  - customerID
  - feature (one of "Contract", "PaperlessBilling", "PaymentMethod")
  - value (one of "Month-to-month", "One year", "Two year", "No", "Yes", "Credit card (automatic)", "Mailed check", "Bank transfer (automatic)", "Electronic check")

In [9]:
original_df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: double (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)



We'll start by generating a series of monthly charges (in the `charges` data frame), then a series of account creation events (`serviceStarts`) and churn events (`serviceTerminations`).

In [10]:
import pyspark.sql.functions as F
import datetime

now = datetime.datetime.now(datetime.timezone.utc)

w = pyspark.sql.Window.orderBy(F.lit('')).partitionBy(df.customerID)

charges = df.select(
    df.customerID, 
    F.lit("Charge").alias("kind"),
    F.explode(F.array_repeat(df.TotalCharges / df.tenure, df.tenure.cast("int"))).alias("value")
).withColumn("now", F.lit(now)
).withColumn("month_number", -F.row_number().over(w)
).withColumn("date", F.expr("add_months(now, month_number)")).drop(
    "now", "month_number"
)

serviceStarts = df.select(
    df.customerID,
    F.lit("AccountCreation").alias("kind"),
    F.lit(0.0).alias("value"),
    F.lit(now).alias("now"),
    (-df.tenure - 1).alias("month_number")
).withColumn("date", F.expr("add_months(now, month_number)")).drop(
    "now", "month_number"
)

serviceTerminations = df.where(df.Churn == "Yes").select(
    df.customerID,
    F.lit("AccountTermination").alias("kind"),
    F.lit(0.0).alias("value"),
    F.add_months(F.lit(now), 0).alias("date")
)



`billingEvents` is the data frame containing all of these events:  account activation, account termination, and individual payment events.

In [11]:
billingEvents = charges.union(serviceStarts).union(serviceTerminations).orderBy("date")


We'll define a little helper function to use the parameters we defined earlier while writing data frames to Parquet.

In [12]:
def write_df(df, name):
    name = "%s.%s" % (name, output_kind)
    if output_prefix != "":
        name = "%s-%s" % (output_prefix, name)
    kwargs = {}
    if output_kind == "csv":
        kwargs['header'] = True
    getattr(df.write.mode(output_mode), output_kind)(name, **kwargs)

In [13]:
write_df(billingEvents, "billing_events")

Our next step is to generate customer metadata, which includes the following fields:

  - gender
  - date of birth (we'll derive age and senior citizen status from this)
  - Partner
  - Dependents
  
We'll calculate date of birth by using the hash of the customer ID as a pseudorandom number and then assuming that ages are uniformly distributed between 18-65 and exponentially distributed over 65.

In [14]:
SENIOR_CUTOFF = 65
ADULT_CUTOFF = 18
DAYS_IN_YEAR = 365.25
EXPONENTIAL_DIST_SCALE = 6.3

customerMetaRaw = original_df.select(
    "customerID",                
    F.lit(now).alias("now"),
    (F.abs(F.hash(original_df.customerID)) % 4096 / 4096).alias("choice"),
    "SeniorCitizen",
    "gender",
    "Partner",
    "Dependents",
    "MonthlyCharges"
)

customerMetaRaw = customerMetaRaw.withColumn(
    "ageInDays",
    F.floor(
        F.when(
            customerMetaRaw.SeniorCitizen == 0, 
            (customerMetaRaw.choice * 
             ((SENIOR_CUTOFF - ADULT_CUTOFF - 1) * DAYS_IN_YEAR)) 
            + (ADULT_CUTOFF * DAYS_IN_YEAR)
        ).otherwise(
            (SENIOR_CUTOFF * DAYS_IN_YEAR) + 
            (DAYS_IN_YEAR * (-F.log1p(-customerMetaRaw.choice) * EXPONENTIAL_DIST_SCALE))
        )
    ).cast("int")
)

customerMetaRaw = customerMetaRaw.withColumn(
    "dateOfBirth",
    F.expr("date_sub(now, ageInDays)")
)

customerMeta = customerMetaRaw.select(
    "customerID",
    "dateOfBirth",
    "gender",
    "SeniorCitizen",
    "Partner",
    "Dependents",
    "MonthlyCharges",
    "now"
).orderBy("customerID")

In [15]:
write_df(customerMeta, "customer_meta")


Now we can generate customer phone features, which include:

  - customerID
  - feature (one of "PhoneService" or "MultipleLines")
  - value (always "Yes"; there are no records for "No" or "No Phone Service")

In [16]:
phoneService = original_df.select(
    "customerID",
    F.lit("PhoneService").alias("feature"),
    F.lit("Yes").alias("value")
).where(original_df.PhoneService == "Yes")

multipleLines = original_df.select(
    "customerID",
    F.lit("MultipleLines").alias("feature"),
    F.lit("Yes").alias("value")
).where(original_df.MultipleLines == "Yes")

customerPhoneFeatures = phoneService.union(multipleLines).orderBy("customerID")


In [17]:
write_df(customerPhoneFeatures, "customer_phone_features")


Customer internet features include:
  - customerID
  - feature (one of "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies")
  - value (one of "Fiber", "DSL", "Yes" -- no records for "No" or "No internet service")

In [18]:
internet_service = original_df.select(
    "customerID",
    F.lit("InternetService").alias("feature"),
    original_df.InternetService.alias("value")
).where(original_df.InternetService != "No")

customerInternetFeatures = internet_service

for feature in ["InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies"]:
    df = original_df.select(
        "customerID",
        F.lit(feature).alias("feature"),
        original_df[feature].alias("value")
    ).where(original_df[feature] == "Yes")
    
    customerInternetFeatures = customerInternetFeatures.union(df)

write_df(customerInternetFeatures.orderBy("customerID"), "customer_internet_features")


Customer account features include:

  - customerID
  - feature (one of "Contract", "PaperlessBilling", "PaymentMethod")
  - value (one of "Month-to-month", "One year", "Two year", "Yes", "Credit card (automatic)", "Mailed check", "Bank transfer (automatic)", "Electronic check")

In [19]:
accountSchema = pyspark.sql.types.StructType(
    [ pyspark.sql.types.StructField(f, StringType()) for f in ["customerID", "feature", "value"]]
)

customerAccountFeatures = session.createDataFrame(schema=accountSchema, data=[])

for feature in ["Contract", "PaperlessBilling", "PaymentMethod"]:
    df = original_df.select(
        "customerID",
        F.lit(feature).alias("feature"),
        original_df[feature].alias("value")
    ).where(original_df[feature] != "No")
    
    customerAccountFeatures = customerAccountFeatures.union(df)
    
write_df(customerAccountFeatures, "customer_account_features")

    