In [55]:
import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import math
import plotly.plotly as py
import IPython
import pyspark.sql.functions as fun
from pyspark.sql import Row
from datetime import date
import feather
from collections import defaultdict
import os

from __future__ import division
from moztelemetry.spark import get_pings, get_one_ping_per_client, get_pings_properties
from moztelemetry.dataset import Dataset
from montecarlino import grouped_permutation_test

%pylab inline
IPython.core.pylabtools.figsize(16, 7)

Populating the interactive namespace from numpy and matplotlib


In [2]:
sc.defaultParallelism

96

In [3]:
sc.version

u'2.0.0'

### Load beta data to test

How many sessions do we have on recent Beta builds?

In [4]:
DF = sqlContext.read.parquet("s3://telemetry-parquet/main_summary/v3/")

In [5]:
DF_beta_builds = DF.filter("submission_date_s3 > '20161015'")\
    .filter("app_name = 'Firefox' and channel = 'beta' and app_build_id >= '20161018000000'")\
    .groupBy("app_build_id", "app_version").count()

In [6]:
DF_beta_builds.orderBy("count", ascending = False).toPandas()[:20]

Unnamed: 0,app_build_id,app_version,count
0,20161027110534,50.0,28370508
1,20161101104304,50.0,27816617
2,20161020152750,50.0,21507597
3,20161024172922,50.0,19329942
4,20161104212021,50.0,12624879
5,20161019084923,49.0.2,111223
6,20161031133903,47.0.2,37583
7,20161028075540,50.0,24749
8,20161028075404,50.0,18482
9,20161021085142,50.0,11410


### Check hang occurrences

When comparing single builds, we saw hardly any hangs on the build after the change. Look at hang numbers across multiple before/after builds.

In [7]:
sample_before = get_pings(sc, app="Firefox", channel="beta", build_id=("20161018000000", "20161027000000"),
                          fraction=0.05)
%time print("Num sessions before: {:,}\n".format(sample_before.count()))

Num sessions before: 1,712,592

CPU times: user 32 ms, sys: 12 ms, total: 44 ms
Wall time: 3min 32s


In [3]:
sample_after = get_pings(sc, app="Firefox", channel="beta", build_id=("20161031000000", "20161108000000"),
                          fraction=0.05)
%time print("Num sessions after: {:,}\n".format(sample_after.count()))

Num sessions after: 1,947,880

CPU times: user 24 ms, sys: 16 ms, total: 40 ms
Wall time: 2min 3s


In [30]:
sample_full = sample_before.union(sample_after)

In [72]:
sample_full = get_pings(sc, app="Firefox", channel="beta",
                        build_id=("20161015000000", "20161108000000"),
                        fraction=0.1)

%time print("Num sessions: {:,}\n".format(sample_full.count()))

Num sessions: 11,024,531

CPU times: user 44 ms, sys: 20 ms, total: 64 ms
Wall time: 7min 24s


In [71]:
## Not working?

#spark_dataset = Dataset.from_source('telemetry')
#sample_full = spark_dataset.where(docType="saved_session", sourceName="telemetry", sourceVersion="4")\
#    .where(appName="Firefox", appUpdateChannel="beta")\
#    .where(submissionDate=lambda x: x >= "20161015")\
#    .where(appBuildId=lambda x: x >= "20161015000000")\
#    .records(sc, sample=0.1)

#%time print("Num sessions: {:,}\n".format(sample_full.count()))

Num sessions: 0

CPU times: user 0 ns, sys: 24 ms, total: 24 ms
Wall time: 1.47 s


In [73]:
def get_hang_stats(ping):
    payload = ping.get("payload", {})
    return {
        "build_id": ping.get("application", {}).get("buildId"),
        "main_hangs": payload.get("threadHangStats", []),
        "child_hangs": map(lambda p: p.get("threadHangStats", []), payload.get("childPayloads", {}))
    }

In [74]:
sample_hangs = sample_full.map(get_hang_stats).cache()
sample_hangs.count()

11024531

In [75]:
sample_hangs_rdd = sample_hangs.map(lambda r:
    Row(build=r["build_id"],
        #period="before" if r["build_id"] <= "20161028000000" else "after",
        has_main_hangs = len(r["main_hangs"]) > 0,
        has_child_hangs = len(filter(lambda ch: len(ch) > 0, r["child_hangs"])) > 0))

sample_hangs_DF = spark.createDataFrame(sample_hangs_rdd)

In [76]:
sample_hangs_DF.groupBy("build").count().orderBy("build").toPandas()

Unnamed: 0,build,count
0,20161015124135,92
1,20161016004034,3
2,20161017130949,1838388
3,20161018061921,2
4,20161018084128,64
5,20161018084136,326
6,20161018084432,132
7,20161018084433,337
8,20161018084600,14
9,20161018084854,109


In [86]:
pct_hangs_DF = sample_hangs_DF.groupBy(fun.substring("build", 1, 8).alias("build_date"))\
    .agg(fun.sum(fun.when(sample_hangs_DF.has_main_hangs, 1).otherwise(0)).alias("has_main_hangs"),
         fun.sum(fun.when(sample_hangs_DF.has_child_hangs, 1).otherwise(0)).alias("has_child_hangs"),
         fun.count("has_main_hangs").alias("n_sessions"))
pct_hangs_DF = pct_hangs_DF.withColumn("pct_has_hangs",
    pct_hangs_DF.has_main_hangs / pct_hangs_DF.n_sessions * 100)
pct_hangs_DF.orderBy("build_date").toPandas()

Unnamed: 0,build_date,has_main_hangs,has_child_hangs,n_sessions,pct_has_hangs
0,20161015,6,0,92,6.521739
1,20161016,3,1,3,100.0
2,20161017,1830853,656465,1838388,99.59013
3,20161018,1475,364,1515,97.359736
4,20161019,1,1,9799,0.010205
5,20161020,1548771,551836,1555504,99.56715
6,20161021,2216,493,2343,94.579599
7,20161023,0,0,18,0.0
8,20161024,1636629,590779,1644011,99.550976
9,20161025,1240,296,1332,93.093093


In [None]:
def has_addon_hangs(h):
    if len(h) == 0:
        return False
    gh = filter(lambda hh: hh["name"] == "Gecko", h)
    if len(gh) == 0:
        return False
    gh = gh[0]["hangs"]
    return len(filter(addon_sdk_hang, gh)) > 0

In [None]:
hang_counts_rdd = dataset.map(lambda r: Row(period=r["period"], e10s=r["e10sEnabled"],
                       has_addons=len(r["addons"]["non_system"]) > 0,
                       has_hangs=len(r["hangs"]) > 0,
                       has_addon_hangs=has_addon_hangs(r["hangs"])))
hang_counts = spark.createDataFrame(hang_counts_rdd)

### Data for comparison

Based on this, selecting a single build before and after from among the top builds will provide us with enough data.

In [4]:
dataset_before = get_pings(sc, app="Firefox", channel="beta", build_id="20161020152750")
%time print("Num sessions before: {:,}\n".format(dataset_before.count()))

Num sessions before: 17,312,866

CPU times: user 76 ms, sys: 24 ms, total: 100 ms
Wall time: 8min 7s


In [5]:
dataset_after = get_pings(sc, app="Firefox", channel="beta", build_id="20161101104304")
%time print("Num sessions after: {:,}\n".format(dataset_after.count()))

Num sessions after: 22,237,606

CPU times: user 64 ms, sys: 24 ms, total: 88 ms
Wall time: 8min 15s


Combine data and restrict to data of interest.

In [21]:
def extract_data(ping):
    data = {
        "clientId": ping.get("clientId"),
        #"buildId": ping.get("application", {}).get("buildId"),
    }
    data["period"] = "before" if ping.get("application", {}).get("buildId") == "20161020152750" else "after"
    
    env = ping.get("environment")
    if not env:
        return None
    data["architecture"] = env.get("build", {}).get("architecture")
    data["e10sEnabled"] = env.get("settings", {}).get("e10sEnabled")
    
    sys = env.get("system", {})
    ## Some system data won't be relevant here.
    for subfld in "hdd", "gfx":
        if subfld in sys:
            del sys[subfld]
    data["system"] = sys
    
    ## Only need IDs/versions of installed add-ons, split according to whether or not
    ## they are system add-ons.
    addons = env.get("addons", {}).get("activeAddons", {})
    sys_addons = []
    nonsys_addons = []
    def addon_listing(guid, meta):
        return (guid, meta.get("version"))
    
    for guid, meta in addons.iteritems():
        if meta.get("isSystem"):
            sys_addons.append(addon_listing(guid, meta))
        else:
            nonsys_addons.append(addon_listing(guid, meta))
    data["addons"] = {"system": sys_addons, "non_system": nonsys_addons}
    
    payload = ping.get("payload", {})
    if not payload:
        return None
    #data["histograms"] = payload.get("histograms", {})
    #data["keyedHistograms"] = payload.get("keyedHistograms", {})
    data["simpleMeasurements"] = payload.get("simpleMeasurements", {})
    data["hangs"] = payload.get("threadHangStats", [])
    #data["processes"] = payload.get("processes", {})
    #data["childPayloads"] = payload.get("childPayloads", [])
    data["MEMORY_JS_COMPARTMENTS_SYSTEM"] = payload.get("histograms", {}).get("MEMORY_JS_COMPARTMENTS_SYSTEM", {})
    data["GC_MS"] = payload.get("histograms", {}).get("GC_MS", {})
    
    return data

def good_payload(data):
    return (data is not None and
            data["clientId"] is not None and
            #data["buildId"] is not None and
            data["e10sEnabled"] is not None)

In [22]:
full_data = dataset_before.union(dataset_after)
dataset = full_data.map(extract_data).filter(good_payload)
dataset = dataset.persist(StorageLevel.MEMORY_AND_DISK_SER)

In [26]:
%time print("Overall num sessions: {:,}\n".format(dataset.count()))

Overall num sessions: 39,550,472

CPU times: user 44 ms, sys: 28 ms, total: 72 ms
Wall time: 41.8 s


In [24]:
## Save this dataset to S3.
#s3_path = "s3://mozilla-metrics/user/dzeber/tmp/addon-sdk-fix/beta_{}/".format(date.today().isoformat())

In [25]:
#dataset.saveAsPickleFile(s3_path)

In [2]:
s3_path = "s3://mozilla-metrics/user/dzeber/tmp/addon-sdk-fix/beta_2016-11-09/"
dataset = sc.pickleFile(s3_path).persist(StorageLevel.MEMORY_AND_DISK_SER)

### Create a dataset to analyze

#### Longitudinal properties

Ideally, we would compare metrics between builds before and after the changes, within each profile. For this we need profiles to have sessions both before and after the change, with other factors (add-ons and e10s setting) staying constant over the sessions we observe.

How many profiles have these properties?

In [3]:
def session_vals_for_check(session):
    return (session["clientId"], {
        "e10s": session["e10sEnabled"],
        "addons": session["addons"]["non_system"],
        "period": session["period"]
    })

def constant_e10s_setting(session_vals):
    e10s_settings = [s["e10s"] for s in session_vals]
    return len(set(e10s_settings)) == 1

def constant_active_addons(session_vals):
    addons = [s["addons"] for s in session_vals]
    ## All add-on lists must have the same length...
    if len(set(map(len, addons))) != 1:
        return False
    ## ...and they must contain the same add-on IDs.
    addon_ids = map(lambda a: [guid for (guid, ver) in a], addons)
    all_addon_ids = set().union(*addon_ids)
    return len(all_addon_ids) == len(addon_ids[0])

def both_periods(session_vals):
    periods = [s["period"] for s in session_vals]
    return len(set(periods)) == 2

In [4]:
prof_data = dataset.map(session_vals_for_check).groupByKey()
n_prof = prof_data.count()
print("Num unique profiles represented in the dataset: {:,}".format(n_prof))

Num unique profiles represented in the dataset: 2,611,968


In [10]:
prof_data_1 = prof_data.filter(lambda (cid, vals): constant_e10s_setting(vals))
n_prof_1 = prof_data_1.count()
n_prof_dropped = n_prof - n_prof_1
print("Num profiles with changing e10s: {:,} ({:.2f}%)"\
          .format(n_prof_dropped, n_prof_dropped / n_prof * 100))

Num profiles with changing e10s: 82,315 (3.15%)


In [11]:
prof_data_2 = prof_data.filter(lambda (cid, vals): constant_active_addons(vals))
n_prof_2 = prof_data_2.count()
n_prof_dropped = n_prof - n_prof_2
print("Num profiles with changing (non-system) add-on IDs: {:,} ({:.2f}%)"\
          .format(n_prof_dropped, n_prof_dropped / n_prof * 100))

Num profiles with changing (non-system) add-on IDs: 278,703 (10.67%)


In [12]:
prof_data_3 = prof_data.filter(lambda (cid, vals): both_periods(vals))
n_prof_3 = prof_data_3.count()
n_prof_dropped = n_prof - n_prof_3
print("Num profiles without both periods: {:,} ({:.2f}%)"\
          .format(n_prof_dropped, n_prof_dropped / n_prof * 100))

Num profiles without both periods: 980,214 (37.53%)


In [5]:
prof_data_comb = prof_data.filter(lambda (cid, vals):
    constant_e10s_setting(vals) and constant_active_addons(vals) and both_periods(vals))
n_prof_comb = prof_data_comb.count()
print("Num profiles meeting all conditions: {:,} ({:.2f}%)"\
          .format(n_prof_comb, n_prof_comb / n_prof * 100))

Num profiles meeting all conditions: 1,399,069 (53.56%)


Restrict to profiles with these properties.

In [6]:
good_clients = prof_data_comb.map(lambda (cid, d): cid).distinct().collect()
good_clients = set(good_clients)
dataset_longit = dataset.filter(lambda d: d["clientId"] in good_clients)
    #.persist(StorageLevel.MEMORY_AND_DISK_SER)

In [9]:
print("Num sessions remaining: {:,}".format(dataset_longit.count()))

Num sessions remaining: 26,515,128


In [None]:
## Sanity check
#print("Num unique profiles: {:,}".format(dataset_longit.map(lambda d: d["clientId"]).distinct().count()))

First create an RDD with all the measurements we will be working with.

In [7]:
def get_hist_values(hist):
    ## Keep only non-zero histogram values.
    return { k:v for k, v in hist.get("values", {}).iteritems() if v > 0 }    

def addon_sdk_hang(hang):
    try:
    ## Check the stack info for add-on sdk code.
        has_sdk_js = ["sdk/addon/runner.js" in line for line in hang.get("stack", [])]
        return any(has_sdk_js)
    except:
        return False

def get_addon_hang_data(data):
    ## Only need hangs from the main thread.
    hangs = filter(lambda h: h["name"] == "Gecko", data["hangs"])
    if not hangs:
        return None
    hangs = hangs[0]["hangs"]
    sdk_hangs = filter(addon_sdk_hang, hangs)
    return [ {"stack": h.get("stack"), "values": get_hist_values(h.get("histogram", {}))}
               for h in sdk_hangs ]


def get_hist_data(data, hist_name):
    ## Keep only non-zero histogram values.
    return get_hist_values(data["histograms"].get(hist_name, {}))

def longit_row(data):
    sm = data["simpleMeasurements"]
    return {
        "client_id": data["clientId"],
        #"build_id": data["buildId"],
        "period": data["period"],
        "e10s": data["e10sEnabled"],
        "addon_nonsys": data["addons"]["non_system"],
        ## Keep only the count of system add-ons
        "addons_sys_num": len(data["addons"]["system"]),
        
        ## Some system covariates
        "sys_arch": data["architecture"],
        "sys_mem": data["system"].get("memoryMB"),
        "sys_cpu_count": data["system"].get("cpu", {}).get("count"),
        "sys_os": data["system"].get("os", {}).get("name"),
        "sys_os_version": data["system"].get("os", {}).get("version"),
        
        ## Startup times and info (missing times recorded as -1)
        "was_startup_interrupted": bool(sm.get("startupInterrupted", 0)),
        "startup_main": sm.get("main", -1),
        "startup_AMIstart": sm.get("AMI_startup_begin", -1),
        "startup_XPIstart": sm.get("XPI_bootstrap_addons_begin", -1),
        "startup_AMIend": sm.get("AMI_startup_end", -1),
        "startup_toplevelwindow": sm.get("createTopLevelWindow", -1),
        "startup_firstpaint": sm.get("firstPaint", -1),
        "startup_sessionrestored": sm.get("sessionRestored", -1),
        
        ## Shutdown times may also show an effect
        "shutdown": sm.get("shutdownDuration", -1),
        
        ## Some histograms
        "hist_compartments": get_hist_values(data["MEMORY_JS_COMPARTMENTS_SYSTEM"]),
        "hist_gc": get_hist_values(data["GC_MS"]),
        
        ## Thread hangs.
        "hangs": get_addon_hang_data(data)
    }

In [8]:
dataset_rows = dataset_longit.map(longit_row)

In [9]:
## Add a session ID.
def add_sess_id(d_with_i):
    d, i = d_with_i
    d["session_id"] = i
    return d
dataset_rows = dataset_rows.zipWithUniqueId().map(add_sess_id)

Shorten client IDs for convenience.

In [10]:
client_ids = dataset_rows.map(lambda d: d["client_id"]).distinct().zipWithUniqueId()

In [11]:
def replace_client_id(d_with_new_cid):
    d, new_cid = d_with_new_cid
    d["client_id"] = new_cid
    return d

dataset_rows = dataset_rows.map(lambda d: (d["client_id"], d))\
    .leftOuterJoin(client_ids)\
    .mapValues(replace_client_id)\
    .map(lambda (cid, d): d)

In [12]:
## Need to cache at this point to ensure stability of indices added with zipWithIndex().
## Otherwise, they keep getting recomputed.
dataset.unpersist()
dataset_rows.cache()
dataset_rows.count()

26515128

In [7]:
dataset_rows.take(5)

[{'addon_nonsys': [],
  'addons_sys_num': 4,
  'client_id': 213600,
  'e10s': True,
  'hangs': None,
  'hist_compartments': {u'243': 1},
  'hist_gc': {u'57': 1},
  'period': 'after',
  'session_id': 69138302,
  'shutdown': 9110,
  'startup_AMIend': 334,
  'startup_AMIstart': 210,
  'startup_XPIstart': 262,
  'startup_firstpaint': 922,
  'startup_main': 75,
  'startup_sessionrestored': 1141,
  'startup_toplevelwindow': 544,
  'sys_arch': u'x86',
  'sys_cpu_count': 4,
  'sys_mem': 6108,
  'sys_os': u'Windows_NT',
  'sys_os_version': u'6.1',
  'was_startup_interrupted': False},
 {'addon_nonsys': [],
  'addons_sys_num': 4,
  'client_id': 213600,
  'e10s': True,
  'hangs': None,
  'hist_compartments': {u'273': 1},
  'hist_gc': {u'40': 2, u'57': 1, u'68': 2},
  'period': 'after',
  'session_id': 69145103,
  'shutdown': 534,
  'startup_AMIend': 21495,
  'startup_AMIstart': 16764,
  'startup_XPIstart': 20528,
  'startup_firstpaint': 39146,
  'startup_main': 12558,
  'startup_sessionrestored': 

Sanity-check hangs.

In [None]:
def has_addon_hangs(h):
    if len(h) == 0:
        return False
    gh = filter(lambda hh: hh["name"] == "Gecko", h)
    if len(gh) == 0:
        return False
    gh = gh[0]["hangs"]
    return len(filter(addon_sdk_hang, gh)) > 0

In [None]:
hang_counts_rdd = dataset.map(lambda r: Row(period=r["period"], e10s=r["e10sEnabled"],
                       has_addons=len(r["addons"]["non_system"]) > 0,
                       has_hangs=len(r["hangs"]) > 0,
                       has_addon_hangs=has_addon_hangs(r["hangs"])))
hang_counts = spark.createDataFrame(hang_counts_rdd)

In [None]:
hang_counts_coll = hang_counts.groupBy(hang_counts.columns).count().toPandas()

In [None]:
hang_counts_coll.sort_values(["period", "e10s", "has_addons", "has_hangs", "has_addon_hangs"])

Save this final dataset in case something fails later.

In [4]:
s3_path = "s3://mozilla-metrics/user/dzeber/tmp/addon-sdk-fix/beta-longit_{}/".format(date.today().isoformat())
#dataset_rows.saveAsPickleFile(s3_path)

In [5]:
dataset_rows = sc.pickleFile(s3_path).cache()

In [6]:
dataset_rows.count()

26515128

Sample profiles at 10% to reduce the size of the data.

In [14]:
client_ids = dataset_rows.map(lambda d: d["client_id"]).distinct()
client_ids_subset = client_ids.sample(withReplacement=False, fraction=0.1)
dataset_subset = dataset_rows.map(lambda d: (d["client_id"], d))\
    .join(client_ids_subset.map(lambda v: (v, None)))

In [15]:
dataset_rows.unpersist()
dataset_subset.cache()
dataset_subset.count()

2649175

In [18]:
dataset_rows = dataset_subset.map(lambda (cid, (d, nn)): d)

In [19]:
dataset_rows.map(lambda d: d["client_id"]).distinct().count()

139628

Separate complex fields into separate DFs for easier handling and summarization.

#### Add-ons

In [20]:
def addon_rows(r):
    return [Row(client_id = r["client_id"], session_id = r["session_id"],
        guid = guid, version = ver) for (guid, ver) in r["addon_nonsys"]]
    
rdd_addons = dataset_rows.flatMap(addon_rows)
DF_addons = spark.createDataFrame(rdd_addons)
DF_addons.printSchema()

root
 |-- client_id: long (nullable = true)
 |-- guid: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- version: string (nullable = true)



In [21]:
DF_addons.count()

2624677

In [22]:
## Double-check that the session IDs correspond.
s1 = DF_addons.select("session_id").distinct().rdd.map(lambda r: r.session_id)
s2 = dataset_rows.filter(lambda r: r["addon_nonsys"]).map(lambda r: r["session_id"]).distinct()
ss = s1.union(s2).distinct()
ss.count() == s1.count() and s1.count() == s2.count()

True

In [23]:
## Double-check that the client IDs correspond.
s1 = DF_addons.select("client_id").distinct().rdd.map(lambda r: r.client_id)
s2 = dataset_rows.filter(lambda r: r["addon_nonsys"]).map(lambda r: r["client_id"]).distinct()
ss = s1.union(s2).distinct()
ss.count() == s1.count() and s1.count() == s2.count()

True

#### Hangs

In [24]:
dataset_rows.map(lambda r: r["hangs"]).filter(lambda r: r).take(3)

[[{'stack': [u'Startup::XRE_Main',
    u'gre/modules/Promise-backend.js:750',
    u'self-hosted:868',
    u'gre/modules/commonjs/sdk/addon/runner.js:87',
    u'(chrome script)'],
   'values': {u'2047': 1}}],
 [{'stack': [u'Startup::XRE_Main',
    u'gre/modules/Promise-backend.js:750',
    u'self-hosted:868',
    u'gre/modules/commonjs/sdk/addon/runner.js:68',
    u'gre/modules/commonjs/toolkit/loader.js:617',
    u'gre/modules/commonjs/sdk/l10n/loader.js:10',
    u'gre/modules/commonjs/toolkit/loader.js:617',
    u'gre/modules/commonjs/sdk/l10n/locale.js:10',
    u'gre/modules/commonjs/toolkit/loader.js:617',
    u'gre/modules/NetUtil.jsm:191'],
   'values': {u'255': 1}},
  {'stack': [u'Startup::XRE_Main',
    u'gre/modules/Promise-backend.js:750',
    u'self-hosted:868',
    u'gre/modules/commonjs/sdk/addon/runner.js:87',
    u'gre/modules/commonjs/toolkit/loader.js:617',
    u'gre/modules/commonjs/sdk/l10n/html.js:10',
    u'gre/modules/commonjs/toolkit/loader.js:617',
    u'gre/modu

What are the unique combinations of `runner.js` lines mentioned in the hang stack traces?

In [25]:
sorted(dataset_rows.filter(lambda r: r["hangs"])\
    .flatMap(lambda r: [h["stack"] for h in r["hangs"]])\
    .map(lambda s: filter(lambda sl: "runner.js" in sl, s))\
    .map(lambda s: ",".join(sorted(set(map(lambda sl: sl.split(":")[-1], s)))))\
    .distinct().collect())

[u'41,87', u'66', u'68', u'78', u'84', u'87', u'9']

For now, lump all `runner.js` hangs for the session together, and convert the hang stats to a DF.

In [26]:
def hang_rows(r):
    if not r["hangs"]:
        return []
    hang_hist = defaultdict(int)
    for h in r["hangs"]:
        for k, v in h["values"].iteritems():
            hang_hist[k] += v
    return [Row(client_id = r["client_id"], session_id = r["session_id"],
               num_hang_stats = len(r["hangs"]), hang_time = int(t), count = n)
                for t, n in hang_hist.iteritems()]

rdd_hangs = dataset_rows.flatMap(hang_rows)
DF_hangs = spark.createDataFrame(rdd_hangs)
DF_hangs.printSchema()

root
 |-- client_id: long (nullable = true)
 |-- count: long (nullable = true)
 |-- hang_time: long (nullable = true)
 |-- num_hang_stats: long (nullable = true)
 |-- session_id: long (nullable = true)



In [27]:
DF_hangs.count()

331491

In [28]:
## Double-check that the session IDs correspond.
s1 = DF_hangs.select("session_id").distinct().rdd.map(lambda r: r.session_id)
s2 = dataset_rows.filter(lambda r: r["hangs"]).map(lambda r: r["session_id"]).distinct()
ss = s1.union(s2).distinct()
ss.count() == s1.count() and s1.count() == s2.count()

True

In [29]:
## Double-check that the client IDs correspond.
s1 = DF_hangs.select("client_id").distinct().rdd.map(lambda r: r.client_id)
s2 = dataset_rows.filter(lambda r: r["hangs"]).map(lambda r: r["client_id"]).distinct()
ss = s1.union(s2).distinct()
ss.count() == s1.count() and s1.count() == s2.count()

True

#### Histograms

Collect histogram info aggregated by profile and period.

In [30]:
def hist_row(r, hist_fld):
    if not r[hist_fld]:
        return []
    return [Row(client_id = r["client_id"], session_id = r["session_id"], period = r["period"],
                hist = hist_fld, hist_value = int(v), count = n)
            for v, n in r[hist_fld].iteritems()]

def hist_row_all(r):
    return hist_row(r, "hist_gc") + hist_row(r, "hist_compartments")

rdd_hist = dataset_rows.flatMap(hist_row_all)
DF_hist = spark.createDataFrame(rdd_hist)
DF_hist = DF_hist.groupBy("client_id", "period", "hist", "hist_value")\
    .agg(fun.sum("count").alias("count"))
DF_hist.printSchema()

root
 |-- client_id: long (nullable = true)
 |-- period: string (nullable = true)
 |-- hist: string (nullable = true)
 |-- hist_value: long (nullable = true)
 |-- count: long (nullable = true)



In [31]:
DF_hist.count()

6030102

#### Scalar data

Create a main DF for the remaining scalar measures.

In [32]:
scalar_flds = dataset_rows.first().keys()
for fld in ["addon_nonsys", "hangs", "hist_compartments", "hist_gc"]:
    scalar_flds.remove(fld)

def main_row(r):
    row_entries = {fld: r[fld] for fld in scalar_flds}
    row_entries["has_hangs"] = r["hangs"] is not None and len(r["hangs"]) > 0
    row_entries["num_addons_nonsys"] = len(r["addon_nonsys"])
    return Row(**row_entries)

rdd_main = dataset_rows.map(main_row)
DF_main = spark.createDataFrame(rdd_main)
DF_main.printSchema()

root
 |-- addons_sys_num: long (nullable = true)
 |-- client_id: long (nullable = true)
 |-- e10s: boolean (nullable = true)
 |-- has_hangs: boolean (nullable = true)
 |-- num_addons_nonsys: long (nullable = true)
 |-- period: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- shutdown: long (nullable = true)
 |-- startup_AMIend: long (nullable = true)
 |-- startup_AMIstart: long (nullable = true)
 |-- startup_XPIstart: long (nullable = true)
 |-- startup_firstpaint: long (nullable = true)
 |-- startup_main: long (nullable = true)
 |-- startup_sessionrestored: long (nullable = true)
 |-- startup_toplevelwindow: long (nullable = true)
 |-- sys_arch: string (nullable = true)
 |-- sys_cpu_count: long (nullable = true)
 |-- sys_mem: long (nullable = true)
 |-- sys_os: string (nullable = true)
 |-- sys_os_version: string (nullable = true)
 |-- was_startup_interrupted: boolean (nullable = true)



In [33]:
DF_main.count()

2649175

Sanity-check our assumptions about the dataset.

- Was the e10s setting consistent across all client sessions?
- Does the client have sessions both before and after the change?

In [34]:
client_stats = DF_main.groupBy("client_id").agg(
    (fun.countDistinct("e10s") == 1).alias("constant_e10s"),
    (fun.countDistinct("period") == 2).alias("both_periods")
)

client_stats.select("constant_e10s", "both_periods").distinct().collect()

[Row(constant_e10s=True, both_periods=True)]

- Were add-ons consistent across client sessions?

In [35]:
def consistent_addons(rows_for_client, include_versions=True):
    addon_info = (lambda r: (r.guid, r.version)) if include_versions else (lambda r: r.guid)
    rows_for_client = list(rows_for_client)
    unique_addons = set([addon_info(r) for r in rows_for_client])
    single_session = filter(lambda r: r.session_id == rows_for_client[0].session_id, rows_for_client)
    single_sess_addons = [addon_info(r) for r in single_session]
    return len(unique_addons) == len(single_sess_addons)

addon_stats = DF_addons.rdd.groupBy(lambda r: r.client_id)\
    .map(lambda (cid, gp): Row(client_id = cid,
                               constant_addons = consistent_addons(gp),
                               constant_addons_guid = consistent_addons(gp, False)))
addon_stats = spark.createDataFrame(addon_stats)

#DF_main = DF_main.join(addon_stats, "client_id", "outer")
addon_stats.groupBy("constant_addons", "constant_addons_guid").count().collect()

[Row(constant_addons=True, constant_addons_guid=True, count=43256),
 Row(constant_addons=False, constant_addons_guid=True, count=18573)]

#### Write datasets to file

In [37]:
datasets_path = "addon-sdk-fix-data_beta_{}".format(date.today().isoformat())
os.mkdir(datasets_path)

In [38]:
feather.write_dataframe(DF_main.toPandas(), datasets_path + "/main.feather")

In [39]:
feather.write_dataframe(DF_addons.toPandas(), datasets_path + "/addons.feather")
feather.write_dataframe(DF_hangs.toPandas(), datasets_path + "/hangs.feather")
feather.write_dataframe(DF_hist.toPandas(), datasets_path + "/hist.feather")

In [40]:
os.system("tar cfz {}.tar.gz {}/*.feather".format(datasets_path, datasets_path))

0