In [13]:
from moztelemetry import get_pings, get_pings_properties
import datetime as dt
from pyspark.sql.types import *
import numpy as np

%pylab inline

Populating the interactive namespace from numpy and matplotlib


Take the set of pings, make sure we have actual clientIds and remove duplicate pings. We collect each unique ping.

In [14]:
def dedupe_pings(rdd):
    return rdd.filter(lambda p: p["meta/clientId"] is not None)\
              .map(lambda p: (p["meta/documentId"], p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])


In [15]:
def only_update_pings(rdd):
    return rdd.filter(lambda p: p["payload/histograms/UPDATE_PING_COUNT_NOTIFY"] is not None)


Transform and sanitize the pings into arrays.

In [16]:
def transform(ping):
    # Should not be None since we filter those out.
    clientId = ping["meta/clientId"]

    # Added via the ingestion process so should not be None.
#    submissionDate = dt.datetime.strptime(ping["meta/submissionDate"], "%Y%m%d")
    submissionDate = dt.datetime.strptime(ping["meta/submissionDate"], "%Y%m%d").strftime('%Y%m%d')

    # Create date should already be in ISO format
#    profileCreationDate = ping["creationDate"]
#    if profileCreationDate is not None:
        # We only care about the year month and day.
#        profileCreationDate = dt.datetime.strptime(ping["creationDate"][:10], "%Y-%m-%d")
#        profileCreationDate = dt.datetime.strptime(ping["creationDate"][:10], "%Y-%m-%d").strftime('%Y%m%d')

    appVersion = ping["meta/appVersion"]
    updatePingCount = ping["payload/histograms/UPDATE_PING_COUNT_NOTIFY"]

    updateCheckCodeNotify = []
    checkCode = ping["payload/histograms/UPDATE_CHECK_CODE_NOTIFY"]
    noUpdate = ping["payload/histograms/UPDATE_CHECK_NO_UPDATE_NOTIFY"]
    if updatePingCount is not None and updatePingCount > 0 and checkCode is not None:
        updateCheckCodeNotify = np.asarray(checkCode.astype(np.int32).tolist()).tolist()
#        mehagain2 = mehagain.tolist()
#        updateCheckCodeNotify = np.asarray(mehagain2).tolist()
#    updateCheckCodeNotify = [10, 20, 30]

    settingsUpdateEnabled = ping["environment/settings/update/enabled"]
    settingsUpdateAuto = ping["environment/settings/update/autoDownload"]
    subsessionStartDate = ping["payload/info/subsessionStartDate"]
    if subsessionStartDate is not None:
#        subsessionStartDate = dt.datetime.strptime(ping["payload/info/subsessionStartDate"][:10], "%Y-%m-%d")
        subsessionStartDate = dt.datetime.strptime(ping["payload/info/subsessionStartDate"][:10], "%Y-%m-%d").strftime('%Y%m%d')

    subsessionLength = ping["payload/info/subsessionLength"]


    updateAuto = None
    updateEnabled = None
    if updatePingCount > 0:
        updateAuto = True
        if ping["payload/histograms/UPDATE_NOT_PREF_UPDATE_AUTO_NOTIFY"] > 0:
            updateAuto = False
        updateEnabled = True
        if ping["payload/histograms/UPDATE_NOT_PREF_UPDATE_ENABLED_NOTIFY"] > 0:
            updateEnabled = False

    return [clientId, submissionDate, appVersion, subsessionStartDate, subsessionLength, settingsUpdateEnabled, settingsUpdateAuto, updateEnabled, updateAuto, updatePingCount, updateCheckCodeNotify]
#    return [clientId, submissionDate, profileCreationDate, appVersion, subsessionStartDate, subsessionLength, settingsUpdateEnabled, settingsUpdateAuto, updateEnabled, updateAuto, updatePingCount, updateCheckCodeNotify]


Create a set of pings from "core" to build a set of core client data. Output the data to CSV or Parquet.

This script is designed to loop over a range of days and output a single day for the given channels. Use explicit date ranges for backfilling, or now() - '1day' for automated runs.

In [17]:
channels = ["nightly", "aurora", "beta", "release"]
#versions = ["42.0", "43.0"]
versions = ["43.0.1"]

#batch_date = os.environ.get('date')
batch_date = None
if batch_date:
    start = end = dt.datetime.strptime(batch_date, '%Y%m%d')
else:
    start = dt.datetime.now() - dt.timedelta(1)
    end = dt.datetime.now() - dt.timedelta(1)

channel = "release"
print "\nstart date: " + start.strftime("%Y%m%d")
print "end date  : " + end.strftime("%Y%m%d")
day = start
for version in versions:
    print "\nversion: " + version + ", date: " + day.strftime("%Y%m%d")

    kwargs = dict(
        submission_date=(start.strftime("%Y%m%d"), end.strftime("%Y%m%d")),
        channel=channel,
        app="Firefox",
        version=version,
        fraction=1
    )

    # Grab all available source_version pings
    pings = get_pings(sc, source_version="*", **kwargs)

    subset = get_pings_properties(pings, ["meta/clientId",
                                          "meta/documentId",
                                          "meta/submissionDate",
                                          "meta/appVersion",
                                          "payload/info/subsessionStartDate",
                                          "payload/info/subsessionLength",
                                          "payload/histograms/UPDATE_CHECK_CODE_NOTIFY",
                                          "payload/histograms/UPDATE_CHECK_NO_UPDATE_NOTIFY",
                                          "payload/histograms/UPDATE_NOT_PREF_UPDATE_AUTO_NOTIFY",
                                          "payload/histograms/UPDATE_NOT_PREF_UPDATE_ENABLED_NOTIFY",
                                          "payload/histograms/UPDATE_PING_COUNT_NOTIFY",
                                          "environment/settings/update/enabled",
                                          "environment/settings/update/autoDownload"])

    print "\npings:" + str(subset.count())

    subset = dedupe_pings(subset)
    print "\nDe-duped pings:" + str(subset.count())
    print subset.first()

#        subset = only_update_pings(subset)
#        print "\nType: " + str(type(subset.first()["payload/histograms/UPDATE_CHECK_CODE_NOTIFY"]))

    transformed = subset.map(transform)
    print "\nTransformed pings:" + str(transformed.count())
    print transformed.first()
#        print "\nType: " + str(type(transformed.first()[10]))

#        s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/mobile/mobile_clients"
#        s3_output += "/v1/channel=" + channel + "/submission=" + day.strftime("%Y%m%d") 
    schema = StructType([
        StructField("client_id", StringType(), False),
        StructField("submission_date", StringType(), False),
        StructField("app_version", StringType(), True),
        StructField("subsession_start_date", StringType(), True),
        StructField("subsession_length", LongType(), True),
        StructField("settings_update_enabled", BooleanType(), True),
        StructField("settings_update_auto", BooleanType(), True),
        StructField("update_enabled", BooleanType(), True),
        StructField("update_auto", BooleanType(), True),
        StructField("update_ping_count", IntegerType(), True),
        StructField("update_check_code_notify", ArrayType(IntegerType(), False), False)
    ])

# Make parquet parition file size large, but not too large for s3 to handle
    grouped = sqlContext.createDataFrame(transformed, schema)
 
    #day += dt.timedelta(1)



start date: 20161026
end date  : 20161026

version: 43.0.1, date: 20161026

pings:75981

De-duped pings:71017
{'meta/clientId': u'b710a107-23bc-43c6-aed3-9cbd943b7845', 'meta/submissionDate': u'20161026', 'environment/settings/update/autoDownload': False, 'payload/histograms/UPDATE_PING_COUNT_NOTIFY': None, 'payload/info/subsessionStartDate': u'2016-10-26T00:00:00.0+05:30', 'payload/histograms/UPDATE_CHECK_NO_UPDATE_NOTIFY': None, 'meta/appVersion': u'43.0.1', 'payload/info/subsessionLength': 80, 'payload/histograms/UPDATE_CHECK_CODE_NOTIFY': None, 'payload/histograms/UPDATE_NOT_PREF_UPDATE_ENABLED_NOTIFY': None, 'payload/histograms/UPDATE_NOT_PREF_UPDATE_AUTO_NOTIFY': None, 'environment/settings/update/enabled': True, 'meta/documentId': u'709ffff1-b8ec-41e9-bf53-ccd774c05325'}

Transformed pings:71017
[u'c83434cf-59ab-4ffe-9213-4b9cc9bf24c0', '20161026', u'43.0.1', '20100225', 2, True, True, None, None, None, []]


In [18]:
grouped.registerTempTable("grouped")

Join with MainSummary to get the latest record's Firefox version

How do we get combined subsession length over X weeks when we restrict the query to 1 week?

In [19]:
grouped.take(10)

[Row(client_id=u'54ccbda9-9efd-473c-9934-c83a8bf20299', submission_date=u'20161026', app_version=u'43.0.1', subsession_start_date=u'20161022', subsession_length=211, settings_update_enabled=True, settings_update_auto=True, update_enabled=None, update_auto=None, update_ping_count=None, update_check_code_notify=[]),
 Row(client_id=u'd9a13d58-8bd6-4c5f-987a-22e0b6fbb945', submission_date=u'20161026', app_version=u'43.0.1', subsession_start_date=u'20161002', subsession_length=41, settings_update_enabled=False, settings_update_auto=False, update_enabled=None, update_auto=None, update_ping_count=None, update_check_code_notify=[]),
 Row(client_id=u'8a7067dc-4535-45d7-a7f0-720962edfa1c', submission_date=u'20161026', app_version=u'43.0.1', subsession_start_date=u'20161026', subsession_length=2503, settings_update_enabled=True, settings_update_auto=True, update_enabled=True, update_auto=True, update_ping_count=1, update_check_code_notify=[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

In [20]:
query = (""
"SELECT COUNT(DISTINCT client_id) FROM grouped")

In [21]:
%time results = sqlContext.sql(query)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 42.4 ms


In [22]:
results.take(10)

[Row(_c0=23419)]

In [35]:
mainSummaryParquetPath = "s3://telemetry-parquet/main_summary/v3"
minSubmissionDate = "20161001"
query = ("SELECT "
             "DISTINCT d.lastVersion, "
             "COUNT(DISTINCT a.client_id) AS Total "
         "FROM grouped a "
             "INNER JOIN ("
                 "SELECT "
                     "DISTINCT b.client_id, "
                     "MAX(b.app_version) AS lastVersion, "
                     "MAX(c.lastVersionSubmissionDate) AS lastVersionSubmissionDate "
                 "FROM parquet.`{}` b "
                     "INNER JOIN ("
                         "SELECT "
                             "DISTINCT client_id, "
                             "MAX(submission_date_s3) AS lastVersionSubmissionDate "
                         "FROM parquet.`{}` "
                             "WHERE channel = 'release' AND submission_date_s3 > '{}' "
                             "GROUP BY client_id) c "
                     "ON b.client_id = c.client_id AND b.submission_date_s3 = c.lastVersionSubmissionDate "
                     "WHERE b.channel = 'release' AND b.submission_date_s3 > '{}' "
                     "GROUP BY b.client_id) d "
             "ON a.client_id = d.client_id "
             "GROUP BY d.lastVersion").format(mainSummaryParquetPath, mainSummaryParquetPath,
                                           minSubmissionDate, minSubmissionDate)
query

"SELECT DISTINCT d.lastVersion, COUNT(DISTINCT a.client_id) AS Total FROM grouped a INNER JOIN (SELECT DISTINCT b.client_id, MAX(b.app_version) AS lastVersion, MAX(c.lastVersionSubmissionDate) AS lastVersionSubmissionDate FROM parquet.`s3://telemetry-parquet/main_summary/v3` b INNER JOIN (SELECT DISTINCT client_id, MAX(submission_date_s3) AS lastVersionSubmissionDate FROM parquet.`s3://telemetry-parquet/main_summary/v3` WHERE channel = 'release' AND submission_date_s3 > '20161001' GROUP BY client_id) c ON b.client_id = c.client_id AND b.submission_date_s3 = c.lastVersionSubmissionDate WHERE b.channel = 'release' AND b.submission_date_s3 > '20161001' GROUP BY b.client_id) d ON a.client_id = d.client_id GROUP BY d.lastVersion"

In [36]:
%time results = sqlContext.sql(query)

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 44.4 s


In [37]:
results.toPandas()

Unnamed: 0,lastVersion,Total
0,45.0.1,49
1,45.0.2,30
2,45.0,7
3,47.0.1,1379
4,44.0,7
5,49.0,6
6,44.0.1,4
7,44.0.2,58
8,49.0.1,72
9,49.0.2,491
