# Transforming and joining raw data

The "raw" data is divided among the following tables:

- **Customer metadata**
  - customerID
  - gender
  - date of birth (we'll derive age and senior citizen status from this)
  - Partner
  - Dependents
  - (nominal) MonthlyCharges
- **Billing events**
  - customerID
  - date (we'll derive tenure from the number/duration of billing events)
  - kind (one of "AccountCreation", "Charge", or "AccountTermination")
  - value (either a positive nonzero amount or 0.00; we'll derive TotalCharges from the sum of amounts and Churn from the existence of an AccountTermination event)
- **Customer phone features**
  - customerID
  - feature (one of "PhoneService" or "MultipleLines")
- **Customer internet features**
  - customerID
  - feature (one of "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies")
  - value (one of "Fiber", "DSL", "Yes", "No")
- **Customer account features**
  - customerID
  - feature (one of "Contract", "PaperlessBilling", "PaymentMethod")
  - value (one of "Month-to-month", "One year", "Two year", "No", "Yes", "Credit card (automatic)", "Mailed check", "Bank transfer (automatic)", "Electronic check")

We want to join these together to reconstitute a training data set with this schema:

- customerID
- gender
- SeniorCitizen
- Partner
- Dependents
- tenure
- PhoneService
- MultipleLines
- InternetService
- OnlineSecurity
- OnlineBackup
- DeviceProtection
- TechSupport
- StreamingTV
- StreamingMovies
- Contract
- PaperlessBilling
- PaymentMethod
- MonthlyCharges
- TotalCharges
- Churn

In [None]:
# notebook parameters

import os

spark_master = "local[*]"
app_name = "churn-etl"
input_files = dict(
    billing="billing_events", 
    account_features="customer_account_features", 
    internet_features="customer_internet_features", 
    meta="customer_meta", 
    phone_features="customer_phone_features"
)
output_file = "churn-etl"
output_prefix = ""
output_mode = "overwrite"
output_kind = "csv"
input_kind = "csv"


In [None]:
import pyspark

session = pyspark.sql.SparkSession.builder \
    .master(spark_master) \
    .appName(app_name) \
    .config("spark.eventLog.enabled", True) \
    .config("spark.eventLog.dir", ".") \
    .getOrCreate()
session

In [None]:
def read_df(fn):
    kwargs = {}
    if input_kind == "csv":
        kwargs['header'] = True
    return getattr(session.read, input_kind)("%s.%s" % (fn, input_kind), **kwargs)

# Reconstructing billing events and charges

In [None]:
billing_events = read_df(input_files["billing"])
billing_events.printSchema()

In [None]:
billing_events = billing_events.withColumn("value", billing_events.value.cast("float"))

In [None]:
import pyspark.sql.functions as F

counts_and_charges = billing_events. \
   groupBy("customerID", "kind"). \
   agg(
    F.count(billing_events.value).alias("event_counts"), 
    F.sum(billing_events.value).alias("total_charges")
   )


In [None]:
customers = billing_events.select("customerID").distinct()

terminations = billing_events.where(
    F.col("kind") == "AccountTermination"
).select(
    F.col("customerID").alias("Churn")
)

churned = customers.join(
    terminations, 
    customers.customerID == terminations.Churn, 
    how="leftouter"
).select(
    "customerID", 
    F.when(
        F.col("Churn").isNull(), "No"
    ).otherwise("Yes").alias("Churn")
)

customer_charges = customers.join(
        counts_and_charges.where(F.col("kind") == "Charge"), 
        "customerID"   
    ).select(
        "customerID",
        F.col("event_counts").alias("tenure"),
        F.col("total_charges").alias("TotalCharges")
    )
    
customer_billing = churned.join(
    customer_charges, 
    "customerID"
)

In [None]:
customer_billing.show()

# Reconstructing phone features


In [None]:
phone_features = read_df(input_files["phone_features"])
phone_features.printSchema()

In [None]:
phone_service = phone_features.where(
    F.col("feature") == "PhoneService"
).select("customerID", F.lit("Yes").alias("PhoneService"))

multiple_lines = phone_features.where(
    F.col("feature") == "MultipleLines"
).select("customerID", F.lit("Yes").alias("MultipleLines"))


In [None]:
customer_phone_features = customers.join(
    phone_service,
    "customerID", 
    how="leftouter"
).join(
    multiple_lines,
    "customerID", 
    how="leftouter"    
).select(
    "customerID",
    F.when(
        F.col("PhoneService").isNull(), "No"
    ).otherwise("Yes").alias("PhoneService"),
    "MultipleLines"
).select(
    "customerID",
    "PhoneService",
    F.when(
        F.col("PhoneService") == "No", "No phone service"
    ).otherwise(
        F.when(
            F.col("MultipleLines").isNull(), "No"
        ).otherwise("Yes")
    ).alias("MultipleLines")
)

# Reconstructing internet features

Whereas phone features only include whether or not there are multiple lines, there are several internet-specific features in accounts:

- `InternetService` (one of `Fiber optic` or `DSL` in the "raw" data; its absence translates to `No` in the processed data)
- `OnlineSecurity` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `OnlineBackup` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `DeviceProtection` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `TechSupport` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `StreamingTV` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `StreamingMovies` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)

This will lead to some slightly more interesting joins!

In [None]:
internet_features = read_df(input_files["internet_features"])
internet_features.printSchema()
internet_features.show()

In [None]:
def untidy_feature(df, feature):
    """ 'untidies' a feature by turning it into a column """
    return df.where(
        F.col("feature") == feature
    ).select("customerID", F.col("value").alias(feature))

internet_service = untidy_feature(internet_features, "InternetService")

online_security = untidy_feature(internet_features, "OnlineSecurity")

online_backup = untidy_feature(internet_features, "OnlineBackup")

device_protection = untidy_feature(internet_features, "DeviceProtection")

tech_support = untidy_feature(internet_features, "TechSupport")

streaming_tv = untidy_feature(internet_features, "StreamingTV")

streaming_movies = untidy_feature(internet_features, "StreamingMovies")


In [None]:
def chained_join(column, base_df, dfs, how="leftouter"):
    acc = base_df
    for df in dfs:
        acc = acc.join(df, column, how=how)

    return acc

customer_internet_features = chained_join(
    "customerID", 
    customers,
    [
        internet_service, 
        online_security, 
        online_backup, 
        device_protection, 
        tech_support, 
        streaming_tv, 
        streaming_movies
    ]
)


In [None]:
def resolve_nullable_column(df, col, null_val="No"):
    return F.when(
        df[col].isNull(), null_val
    ).otherwise(
        df[col]
    ).alias(col)

def resolve_dependent_column(df, col, parent_col="InternetService", 
                             null_val="No", 
                             null_parent_val="No internet service"):
    return F.when(
        df[parent_col] == "No", null_parent_val
    ).otherwise(
        F.when(
            df[col].isNull(), null_val
        ).otherwise(df[col])
    ).alias(col)

customer_internet_features = customer_internet_features.select(
    "customerID",
    resolve_nullable_column(customer_internet_features, "InternetService"),
    resolve_dependent_column(customer_internet_features, "OnlineSecurity", "InternetService"),
    resolve_dependent_column(customer_internet_features, "OnlineBackup", "InternetService"),
    resolve_dependent_column(customer_internet_features, "DeviceProtection", "InternetService"),
    resolve_dependent_column(customer_internet_features, "TechSupport", "InternetService"),
    resolve_dependent_column(customer_internet_features, "StreamingTV", "InternetService"),
    resolve_dependent_column(customer_internet_features, "StreamingMovies", "InternetService")
)

# Reconstructing account features

In [None]:
account_features = read_df(input_files["account_features"])
account_features.printSchema()
account_features.show()

In [None]:
contracts = untidy_feature(account_features, "Contract")

paperless = untidy_feature(account_features, "PaperlessBilling")

payment = untidy_feature(account_features, "PaymentMethod")

customer_account_features = chained_join(
    "customerID", 
    customers,
    [contracts, paperless, payment]
)

customer_account_features = customer_account_features.select(
    "customerID",
    "Contract",
    resolve_nullable_column(customer_account_features, "PaperlessBilling"),
    "PaymentMethod"
)

# Account metadata

In [None]:
account_meta = read_df(input_files["meta"])

account_meta.printSchema()

In [None]:
account_meta.select(
    "customerID",
    F.when(
        F.col("now") >= F.add_months(F.col("dateOfBirth"), 65 * 12), 
        "Yes"
    ).otherwise("No").alias("SeniorCitizen"),
    "Partner",
    "Dependents",
    "gender",
    "MonthlyCharges"
).show()

# Putting it all together

In [None]:
wide_data = chained_join(
    "customerID",
    customers,
    [
        customer_billing,
        customer_phone_features,
        customer_internet_features,
        customer_account_features,
        account_meta
    ]
).select(
    "customerID", 
    "gender", 
    "SeniorCitizen", 
    "Partner", 
    "Dependents", 
    "tenure", 
    "PhoneService", 
    "MultipleLines", 
    "InternetService", 
    "OnlineSecurity", 
    "OnlineBackup", 
    "DeviceProtection", 
    "TechSupport", 
    "StreamingTV", 
    "StreamingMovies", 
    "Contract", 
    "PaperlessBilling", 
    "PaymentMethod", 
    "MonthlyCharges", 
    "TotalCharges", 
    "Churn"
)

In [None]:
wide_data.explain()

In [None]:
def write_df(df, name):
    name = "%s.%s" % (name, output_kind)
    if output_prefix != "":
        name = "%s-%s" % (output_prefix, name)
    kwargs = {}
    if output_kind == "csv":
        kwargs['header'] = True
    getattr(df.write.mode(output_mode), output_kind)(name, **kwargs)

In [None]:
write_df(wide_data, output_file)

In [None]:
session.stop()