Reorganizes the TAAR shield add-on pings and writes to s3 as a spark DataFrame.

Some things to note:

* Shield pings send the following data by default:

  ```
[
     'clientId',
     'id',
     'environment',
     'application',
     'version',
     'meta',
     'creationDate',
     'type'
 ]
     ```
     
    + This allows us to get some things for free, namely `locale` in `environment/settings`
    
    
    
* The `payload` field contains all our data explicitly sent from the shield add-on.
    

In [1]:
from moztelemetry import Dataset, get_pings_properties
from pyspark.sql import Row
import datetime as dt

sc.setLogLevel("INFO")

In [2]:
yesterday = dt.datetime.strftime(dt.datetime.today() + dt.timedelta(-1), "%Y%m%d")
print yesterday

20170912


In [3]:
pings = Dataset.from_source("telemetry")\
               .where(docType="shield-study-addon")\
               .where(submissionDate=yesterday).records(sc)

fetching 60.80304MB in 1641 files...


In [4]:
taar_pings = pings.filter(lambda x: x.get("payload", {}).get("study_name") == "TAARExperiment")

In [5]:
taar_pings.count()

7

In [6]:
subset = get_pings_properties(taar_pings, ["clientId", 
                              "environment/settings/locale", 
                              "application/platformVersion",
                              "payload/branch",
                              "payload/data/attributes",
                              "payload/testing",
                              "meta/submissionDate"])

def collapse_fields(x):
    data = x.get("payload/data/attributes", {})
    result = Row(
        client_id=x.get("clientId"),
        locale=x.get('environment/settings/locale'),
        branch=x.get("payload/branch"),
        addon_id=data.get("addon_id"),
        clicked_button=data.get("clickedButton"),
        ping_type=data.get("pingType"),
        saw_popup=data.get("sawPopup"),
        src=data.get("srcURI"),
        start_time=data.get("startTime"),
        testing=x.get("payload/testing"),
        submission_date_s3=x.get("meta/submissionDate")
        )
    return result

In [7]:
collapsed_subset = subset.map(collapse_fields)

Our analysis is going to perform aggregations by branch. We can partition by `submission_date_s3` (as this is run daily), and `branch`, to make the experiment analysis more performant.

In [15]:
for branch in collapsed_subset.map(lambda x: x.branch).distinct().collect():
    # convert RDD to df and repartition
    # drop partitioned columns
    df = collapsed_subset.toDF().repartition(10)\
                         .drop("submission_date_s3")\
                         .drop("branch")
            
    df.write.format("parquet")\
      .save('s3://telemetry-test-bucket/bmiroglio/TAAR/submission_date_s3={}/branch={}'\
            .format(yesterday, branch), mode='overwrite')