In [10]:
def get_samples(spark):
    df = (spark.sql("SELECT * FROM clients_daily")\
        .where("active_addons IS NOT null")\
        .where("size(active_addons) > 2")\
        .where("size(active_addons) < 100")\
        .where("channel = 'release'")\
        .where("app_name = 'Firefox'")\
        .selectExpr(\
            "client_id as client_id",\
            "active_addons as active_addons",\
            "city as city",\
            "subsession_hours_sum as subsession_hours_sum",\
            "locale as locale",\
            "os as os",\
            "places_bookmarks_count_mean AS bookmark_count",\
            "scalar_parent_browser_engagement_tab_open_event_count_sum AS tab_open_count",\
            "scalar_parent_browser_engagement_total_uri_count_sum AS total_uri",\
            "scalar_parent_browser_engagement_unique_domains_count_mean AS unique_tlds"))
    return df

In [11]:
def get_addons_per_client(users_df, addon_whitelist, minimum_addons_count):
    """ Extracts a DataFrame that contains one row
    for each client along with the list of active add-on GUIDs.
    """

    def is_valid_addon(guid, addon):
        return not (
            addon.is_system
            or addon.app_disabled
            or addon.type != "extension"
            or addon.user_disabled
            or addon.foreign_install
            or guid not in addon_whitelist
        )

    # Create an add-ons dataset un-nesting the add-on map from each
    # user to a list of add-on GUIDs. Also filter undesired add-ons.

    # Note that this list comprehension was restructured
    # from the original longitudinal query.  In particular, note that
    # each client's 'active_addons' entry is a list containing the
    # a dictionary of {addon_guid: {addon_metadata_dict}}

    def flatten_valid_guid_generator(p):
        for data in p["active_addons"]:
            addon_guid = data["addon_id"]
            if not is_valid_addon(addon_guid, data):
                continue
            yield addon_guid

    return (
        users_df.rdd.map(
            lambda p: (p["client_id"], list(flatten_valid_guid_generator(p)))
        )
        .filter(lambda p: len(p[1]) > minimum_addons_count)
        .toDF(["client_id", "addon_ids"])
    )


In [16]:
import boto3
import json

def read_from_s3(s3_dest_file_name, s3_prefix, bucket):
    """
    Read JSON from an S3 bucket and return the decoded JSON blob
    """

    full_s3_name = '{}{}'.format(s3_prefix, s3_dest_file_name)
    conn = boto3.resource('s3', region_name='us-west-2')
    stored_data = json.loads(
        conn
        .Object(bucket, full_s3_name)
        .get()['Body']
        .read()
        .decode('utf-8')
    )
    return stored_data

def load_amo_curated_whitelist():
    """
    Return the curated whitelist of addon GUIDs
    """
    whitelist = read_from_s3('only_guids_top_200.json',
                             'telemetry-ml/addon_recommender/',
                             'telemetry-parquet')
    return list(whitelist)

In [17]:
whitelist = load_amo_curated_whitelist()

In [19]:
users_sample = get_samples(spark)

In [20]:
addons_df = get_addons_per_client(users_sample, whitelist, 2)

# Port of : future_contents_of_load_training_data.py done 

In [21]:
# TODO: port over the rest of 
# https://gist.github.com/mlopatka/da6b435d23424ea873615d0fd039a9c3#file-new_ensemble_example-py-L112
#
# and add in Amazon Athena query tools to join sqltelemetry to the dataframe