# Crash Rates ETL
## Anomaly Detection and Explanation of Crash Rates

We will be observing crash rates, with metrics dervived from the [e10s stability dashboard][1]. The crash rate is defined as

$$
\text{rate} = \frac
    {\text{# crashes}}
    {\text{usage khours}}
$$

This definition provides a normalized crash rate that can be used to compare clients across segments. The stability dashboard also includes a color-coded table for comparing rates relative to the leading week's rate.

[1]: https://chutten.github.io/telemetry-dashboard/crashes/ "Stability Dashboard (Telemetry) - e10s"


In [None]:
from pyspark.sql import functions as F

path = (
    "s3://net-mozaws-prod-us-west-2-pipeline-analysis"
    "/amiyaguchi/macrobase/crash_rates/v1/"
)
path

## Data Preparation

###  Feature Selection
#### Attribution Selection
The following features have been used for the e10s rollout, and should be useful for finding anomalous sub-populations.

In [None]:
attributes = [
    'normalized_channel',
    'env_build_version',
    'env_build_id',
    'app_name',
    'os',
    'os_version',
    'env_build_arch',
    'country',
    'active_experiment_id',
    'active_experiment_branch',
    'e10s_enabled',
    'e10s_cohort',
    'gfx_compositor',
    "submission_date_s3"
]

#### Metric Selection

`usage_khours` are derived from the client subsession length. The three types of available crash counts in the main_summary dataset as of `main_summary/v4` are content, plugin, and gmplugin crashes. Finally, the crash rates are derived from the crash type and the `usage_khours`.

In [None]:
seconds_per_hour = 60 * 60
seconds_per_day = seconds_per_hour * 24

def crash_rate(crashes, usage="usage_khours"):
    return (
        (F.col(crashes) / F.col(usage))
        .alias("{}_rate".format(crashes))
    )


usage_khours = (
    F.when((F.col("subsession_length") >= 0) &
           (F.col("subsession_length") < 180 * seconds_per_day),
           (F.col("subsession_length") / seconds_per_hour / 1000))
    .otherwise(0.0)
    .cast('double')
    .alias("usage_khours")
)

crash_fields = [
    "crashes_detected_content",
    "crashes_detected_plugin",
    "crashes_detected_gmplugin",
]
crash_metrics = crash_fields + [crash_rate(x) for x in crash_fields]

metrics = [F.col("usage_khours")] + crash_metrics

### Extract relevant features from `main_summary`

In [None]:
import operator

main_summary = (
    spark
    .read
    .option("mergeSchema", "true")
    .parquet("s3://telemetry-parquet/main_summary/v4")
)

# take a 1% percent sample, bucket #27
crash_rates = (
    main_summary
    .where(F.col("sample_id") == 27)
    .withColumn("usage_khours", usage_khours)
    .select(["timestamp", "sample_id"] + attributes + metrics)
    .where(
        reduce(operator.__or__, 
               [F.col(x).isNotNull() for x in crash_fields]))
)

### Repartition and Persist data

In [None]:
timestamp = F.from_unixtime(F.col("timestamp")/10**9)

crash_rates_by_day = (
    crash_rates
    .withColumn("submission_day", F.dayofyear(timestamp))
    .orderBy("timestamp")
)

(
    crash_rates_by_day
    .write
    .partitionBy("submission_date_s3")
    .parquet(path, mode="overwrite")
)