In [1]:
import datetime as dt
import pandas as pd
import ujson as json

from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client

%pylab inline

Unable to parse whitelist (/home/hadoop/anaconda2/lib/python2.7/site-packages/moztelemetry/bucket-whitelist.json). Assuming all histograms are acceptable.
Populating the interactive namespace from numpy and matplotlib


Create a set of pings from "saved-session" to build a set of core client data.

In [2]:
update_channel = "beta"
now = dt.datetime.now()
start = now - dt.timedelta(30)
end = now - dt.timedelta(1)

pings = get_pings(sc, app="Fennec", channel=update_channel,
                  submission_date=(start.strftime("%Y%m%d"), end.strftime("%Y%m%d")),
                  build_id=("20100101000000", "99999999999999"),
                  fraction=.001)

subset = get_pings_properties(pings, ["clientId",
                                      "application/channel",
                                      "application/version",
                                      "meta/submissionDate",
                                      "meta/documentId",
                                      "environment/profile/creationDate",
                                      "environment/system/os/version",
                                      "environment/system/memoryMB"])

Remove any pings without a clientId.

In [3]:
subset = subset.filter(lambda p: p["clientId"] is not None)
print subset.first()

{'meta/documentId': u'ca4cc6b7-10b4-4924-9e73-ac8806dcb01d', 'meta/submissionDate': u'20160212', 'environment/system/os/version': 21, 'application/version': u'41.0', 'application/channel': u'beta', 'clientId': u'c386a150-9a21-4929-9649-cbf930c84e10', 'environment/profile/creationDate': None, 'environment/system/memoryMB': 1817}


In [4]:
#deduplicate by documentID for each client, then output as a list of 

#algo: http://stackoverflow.com/a/10024750
def remove_dupes(p):
    clientId,ping_list = p
    seen_docs = set()
    docs = [seen_docs.add(p["meta/documentId"]) \
            or p for p in ping_list if p["meta/documentId"] not in seen_docs]
    return (clientId,docs)
    
def dedupe_by_client(rdd):
    return rdd.map(lambda x: (x["clientId"],[x]))\
    .reduceByKey(lambda x,y: x+y)\
    .map(lambda x: remove_dupes(x))\
    .flatMap(lambda x: x[1])
    
cleaned = dedupe_by_client(subset)

In [7]:
cleaned.take(5)

[{'application/channel': u'beta',
  'application/version': u'41.0',
  'clientId': u'28bb43b1-4c1a-42c2-a1cb-d57a0d34c770',
  'environment/profile/creationDate': None,
  'environment/system/memoryMB': 1739,
  'environment/system/os/version': 22,
  'meta/documentId': u'3cd5aaee-964d-4908-ad1b-ef83c535de64',
  'meta/submissionDate': u'20160208'},
 {'application/channel': u'beta',
  'application/version': u'41.0',
  'clientId': u'28bb43b1-4c1a-42c2-a1cb-d57a0d34c770',
  'environment/profile/creationDate': None,
  'environment/system/memoryMB': 1739,
  'environment/system/os/version': 22,
  'meta/documentId': u'5a0cda70-28e5-4ee1-aad6-7cd0d5874853',
  'meta/submissionDate': u'20160208'},
 {'application/channel': u'beta',
  'application/version': u'41.0',
  'clientId': u'28bb43b1-4c1a-42c2-a1cb-d57a0d34c770',
  'environment/profile/creationDate': None,
  'environment/system/memoryMB': 1739,
  'environment/system/os/version': 22,
  'meta/documentId': u'3610144d-77a4-4130-94d6-12bbd175ad46',
 

Sanitize the pings and reduce the set of pings to one ping per client per day.

In [8]:
def transform(ping):    
    clientId = ping["clientId"] # Should not be None since we filter those out

    profileDate = None
    profileDaynum = ping["environment/profile/creationDate"]
    if profileDaynum is not None:
        profileDate = (dt.date(1970, 1, 1) + dt.timedelta(int(profileDaynum))).strftime("%Y%m%d")

    submissionDate = ping["meta/submissionDate"] # Added via the ingestion process so should not be None

    channel = ping["application/channel"]
    version = ping["application/version"]
    os_version = int(ping["environment/system/os/version"])
    memory = ping["environment/system/memoryMB"]
    if memory is None:
        memory = 0
    else:
        memory = int(memory)
            
    return [clientId, channel, profileDate, submissionDate, version, os_version, memory]

transformed = cleaned.map(transform)
print transformed.first()

[u'28bb43b1-4c1a-42c2-a1cb-d57a0d34c770', u'beta', None, u'20160208', u'41.0', 22, 1739]


Output the data to CSV.

In [9]:
grouped = pd.DataFrame(transformed.collect(), columns=["clientid", "channel", "profiledate", "submissiondate", "version", "osversion", "memory"])
!mkdir -p ./output
grouped.to_csv("./output/fennec-clients-" + update_channel + "-" + end.strftime("%Y%m%d") + ".csv", index=False)

s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/mfinkle/fennec-clients-" + update_channel + "-" + end.strftime("%Y%m%d") 
grouped = sqlContext.createDataFrame(transformed, ["clientid", "channel", "profiledate", "submissiondate", "version", "osversion", "memory"])
grouped.saveAsParquetFile(s3_output)


