In [7]:
from datetime import date, timedelta, datetime
from pyspark.sql.functions import collect_list
from pyspark.sql import Row
import pandas as pd
import numpy as np
import ujson as json
import os.path
from collections import Counter
from matplotlib import pyplot as plt

%matplotlib inline

In [8]:
def int2date(n):
    """
    This function converts a number of days since Jan 1st 1970 <n> to a date.
    """
    try:
        return date(1970,1,1)+timedelta(days=n)
    except:
        return date(1970,1,1)

def date2int(d):
    """
    This function converts a date <d> to number of days since Jan 1st 1970.
    """
    try:
        return (d-date(1970,1,1)).days
    except:
        return -1

def str2date(s, f="%Y%m%d"):
    """
    This function converts a string <s> in the format <f> to a date.
    """
    try:
        return datetime.strptime(s, f).date()
    except:
        return ""

TODAY = date.today()                                    # today (date)
TODAY_INT = date2int(TODAY)                             # today (days since Jan 1st, 1970)
D0 = date(2016, 12, 12)                                 # start date of week of analysis (date)
D0_INT = date2int(D0)                                   # start date of week of analysis (days since Jan 1st, 1970)
PCD_CUTOUT_START_DATE = date(2010, 1, 1)                # profiles created before this date are removed (date)
PCD_CUTOUT_START_INT = date2int(PCD_CUTOUT_START_DATE)  # profiles created before this date are removed (days since Jan 1st, 1970)
T0 = 180                                                # time to consider stop using Fx
D0_T0 = D0 - timedelta(days=T0)                         # date beginning of stoppage period (date)
D0_T0_INT = D0_INT - T0                                 # date beginning of stoppage period (days since Jan 1st, 1970)
D6 = D0 + timedelta(days=6)                             # date end of week of analysis (date)
D6_INT = D0_INT + 6                                     # date end of week of analysis (days since Jan 1st, 1970)

In [9]:
def read_main_summary():
    """
    This function imports the main_summary dataset from S3, selects the variables of interest,
    applies several filters, and returns the filtered dataset.
    """
    
    # connect to the main summary dataset
    allPingsDF = sqlContext.read.load("s3://telemetry-parquet/main_summary/v3", "parquet", mergeSchema=True)
    
    # perform variable selection with column renaming
    allPingsDFSelect = allPingsDF.select(
               allPingsDF.client_id.alias("cid"),
               allPingsDF.sample_id.alias("sid"),
               allPingsDF.normalized_channel.alias("channel"),
               allPingsDF.subsession_start_date.alias("ssd"),
               allPingsDF.app_name.alias("appname"),
               allPingsDF.subsession_length.alias("ssl"),
               allPingsDF.profile_creation_date.alias("pcd"),
               allPingsDF.app_version.alias("av"))

    # filter, replace missing values with zeroes, and cache dataframe
    # - 1% sample (sample_id is 42)
    # - channel is release
    # - application is Firefox
    # - subsession length is positive and less than 24h
    # - profile creation date is after Jan 1st, 2010 and before TODAY, either before D0-T0 or after D0 (during week of analysis)
    # Dataframe is `cache`d to memory for performance improvements
    filteredPingsDF = allPingsDFSelect.filter(allPingsDFSelect.sid == "42")\
                                      .filter(allPingsDFSelect.channel == "release")\
                                      .filter(allPingsDFSelect.appname == "Firefox")\
                                      .filter(allPingsDFSelect.ssl >= 0)\
                                      .filter(allPingsDFSelect.ssl <= 86400)\
                                      .filter(allPingsDFSelect.pcd >= PCD_CUTOUT_START_INT)\
                                      .filter(allPingsDFSelect.pcd <= TODAY_INT)\
                                      .filter(~allPingsDFSelect.pcd.isin(range(D0_T0_INT, D0_INT)))\
                                      .select(["cid","ssd","pcd","ssl","av"])\
                                      .cache()
                                        
    # return filtered dataset
    return filteredPingsDF

In [10]:
def make_longitudinal(df):
    """
    This function creates and returns a longitudinal dataframe from a dataframe.
    Each Row from this dataframe contains the sequential information (lists):
        - subsession_length (ssl),
        - subsession_start_date (ssd),
        - profile_creation_date (pcd),
        - app_version (av)
    for each cid.
    
    @params:
        df: [dataframe] dataframe returned by read_main_summary()
    """
    
    longitudinal = df.groupBy("cid")\
                     .agg({"ssl": "collect_list",
                           "ssd": "collect_list",
                           "av": "collect_list",
                           "pcd": "first"})\
                     .withColumnRenamed("first(pcd)","pcd")\
                     .withColumnRenamed("collect_list(ssl)","ssl")\
                     .withColumnRenamed("collect_list(ssd)","ssd")\
                     .withColumnRenamed("collect_list(av)","av")
    
    return longitudinal

In [11]:
def mapping(row):
    """
    Applied to an RDD, this mapping function returns a tuple of seven elements for each Row of the dataframe:
        - cohort: [string] One of {'no: active s','no: inactive b','no: inactive w',
                                   'no: pcd in s','yes: active b, inactive s, active w','yes: pcd in w'}
        - cid: [string] Client_id
        - pcd: [int] Profile creation date (in days since Jan 1st, 1970)
        - ssd: [list] List of all subsession start dates (ascending)
        - ssl: [list] List of all subsession lengths (ordered with repect to ssd) 
        - pcd_date: [date] Profile creation date (as datetime.date)
        - num_ssd_in_w: [int] Number of subsessions that started in the week of interest
        - tot_ssl_in_w: [int] Sum of subsession lengths within the week of interest
        - num_ssd_in_b: [int] Number of subsessions before D0-T0
        - tot_ssl_in_b: [int] Sum of subsession lengths before D0-T0
    
    @params:
        row: [Row] a row from a longitudinal RDD that includes:
            - cid: client_id
            - ssl: subsession_length
            - ssd: subsession_start_date
            - pcd: profile_creation_date
            - av: app_version
    
    @logic:
        - sort the row based on ssd (ascending). Keep only entries up to D6 (end of week of analysis)
        - if pcd within the week: update cohort ("yes")
        - else
            - if user was
                - not active during the week of analysis,
                - OR active during [D0-T0, D0],
                - OR not active before D0-T0
                - THEN update cohort ("no")
            - else: update cohort ("yes")
    """
    
    def sort_row(row):
        # sort ssd and ssl by descending ssd
        zipped = sorted(zip(row.ssd, row.ssl, row.av), reverse=False)
        ssd, ssl, av = zip(*zipped)
        
        # keep only subsessions that started before the end of the week of interest
        ssd = [s[:10] for s in ssd if s[:10] <= D6.strftime("%Y-%m-%d")]
        
        # trim ssl, av (since it was sorted in ascending fashion, trim the end)
        ssl = ssl[:len(ssd)]
        av = av[:len(ssd)]
        
        # indices within week of analysis
        indices_in_w = [i for i in range(len(row.ssd)) if row.ssd[i][:10]<=D6.strftime("%Y-%m-%d") and row.ssd[i][:10]>=D0.strftime("%Y-%m-%d")]
        # indices before D0-T0
        indices_in_b = [i for i in range(len(row.ssd)) if row.ssd[i][:10]<=D0_T0.strftime("%Y-%m-%d")]
        
        # ssd, ssl, av for each period
        ssd_in_w = [row.ssd[i][:10] for i in indices_in_w]
        ssl_in_w = [row.ssl[i] for i in indices_in_w]
        av_in_w = [row.av[i] for i in indices_in_w]
        ssd_in_b = [row.ssd[i][:10] for i in indices_in_b]
        ssl_in_b = [row.ssl[i] for i in indices_in_b]
        av_in_b = [row.av[i] for i in indices_in_b]
        
        # return reformatted Row with additional info
        return Row(cid=row.cid,
                   pcd=row.pcd,
                   ssd=list(ssd),
                   ssl=list(ssl),
                   av=list(av),
                   av_in_w=av_in_w,
                   av_in_b=av_in_b,
                   pcd_date=int2date(row.pcd),
                   ssd_in_w=ssd_in_w,
                   ssd_in_b=ssd_in_b,
                   num_ssd_in_w=len(ssd_in_w),
                   tot_ssl_in_w=sum(ssl_in_w),
                   num_ssd_in_b=len(ssd_in_b),
                   tot_ssl_in_b=sum(ssl_in_b))
    
    cohort = ""
    last_version_in_b = "NA"
    first_version_in_w = "NA"
    days_in_s = 0
    
    s_row = sort_row(row)
    
    if s_row.pcd >= D0_INT and s_row.pcd <= D6_INT:
        cohort = "yes: pcd in w"
        #first_version_in_w = s_row.av_in_w[0]
    elif s_row.pcd <= D0_T0_INT:
        week = [(D0 + timedelta(days=i)).strftime("%Y-%m-%d") for i in range((D6-D0).days + 1)]
        stagnant = [(D0_T0 + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(1, (D0-D0_T0).days)]
        beginning = [(s_row.pcd_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range((D0_T0-s_row.pcd_date).days +1)]
        set_ssd = set(s_row.ssd)
        if len(set(week) & set_ssd) == 0: # not active during week
            cohort = "no: inactive w"
        elif len(set(stagnant) & set_ssd) > 0: # active during stagnant
            cohort = "no: active s"
        elif len(set(beginning) & set_ssd) == 0: # not active during beginning
            cohort = "no: inactive b"
        else:
            cohort = "yes: active b, inactive s, active w"
            last_version_in_b = s_row.av_in_b[-1]
            first_version_in_w = s_row.av_in_w[0]
            last_day_in_b = s_row.ssd_in_b[-1]
            first_day_in_w = s_row.ssd_in_w[0]
            days_in_s = (datetime.strptime(first_day_in_w, "%Y-%m-%d") - datetime.strptime(last_day_in_b, "%Y-%m-%d")).days
    else:
        cohort = "no: pcd in s"
    
    return (cohort,                 # cohort
            last_version_in_b,      # last Fx version in beginning
            first_version_in_w,     # first Fx version in week of interest
            s_row.cid,              # cid
            s_row.pcd,              # pcd (days since Jan 1st, 1970)
            s_row.ssd,              # list of ssd (ascending)
            s_row.ssl,              # list of ssl (based on ssd)
            s_row.av,               # list of av (based on ssd)
            s_row.pcd_date,         # pcd (date)
            s_row.num_ssd_in_w,     # number of days in week of analysis
            s_row.tot_ssl_in_w,     # total ssl in week of analysis
            s_row.num_ssd_in_b,     # number of days before D0-T0
            s_row.tot_ssl_in_b,     # total ssl before D0-T0
            days_in_s)              # days spent without use of Fx

def get_positives(mapped_rdd):
    return mapped_rdd.filter(lambda row: row[0].startswith("yes"))

def get_cohort(positives_rdd, condition=""):
    return positives_rdd.filter(lambda row: row[0] == condition)

def toDF(positives_rdd, cols = ["cohort","last_version_in_b","first_version_in_w","cid","pcd",
                                "ssd","ssl","av","pcd_date","num_ssd_in_w","tot_ssl_in_w",
                                "num_ssd_in_b","tot_ssl_in_b","days_in_s"]):
    return positives_rdd.toDF(cols)

def upgraded_not_upgraded(df):
    return (df[df.last_version_in_b != df.first_version_in_w], df[df.last_version_in_b == df.first_version_in_w])

In [12]:
def write_dict_json(fn, res_data, start_date_str, end_date_str):
    """
    This function writes the content of a dictionary to a json file.
    
    @params:
        fn: [string] file name of output json
        res_data: [dict] dictionary object with summary data
        start_date_str: [string] start date to append to file name
        end_date_str: [string] end date to append to file names
    """
    
    suffix = "-" + start_date_str + "-" + end_date_str
    file_name = fn + suffix + ".json"
    
    if os.path.exists(file_name):
        print "{} exists, we will overwrite it.".format(file_name)

    # res_data is a JSON object.
    json_entry = json.dumps(res_data)

    with open(file_name, "w") as json_file:
        json_file.write("[" + json_entry.encode('utf8') + "]\n")

In [13]:
def main_alg(start_backfill=None, end_backfill=None):
    """
    This function ties everything together.
    If no date parameters are specified, the analysis is done for the week ending today-14 days.
    If both date parameters are specified (as "%Y-%m-%d" strings), the program will find all
        Sundays in-between the two dates and run the algorithm for all weeks ending on those days.
        This is used for backfilling data only!
    
    @params:
        start_backfill: [string "%Y-%m-%d"] first end_date for backfilling
        end_backfill: [string "%Y-%m-%d"] last end_date for backfilling
        ex: start_backfill="2016-10-01" and end_backfill="2017-01-01" will produce data for **all** Sundays
            between October 1st 2016 and January 1st 2017
    """
    
    # read and clean data; save as SQL table
    print "***** READING DATA...",
    filteredPingsDF = read_main_summary()
    print "DONE!"

    # get date ranges
    dates = [] # --list of all end_dates in period
    if start_backfill and end_backfill:
        print "***** BACKFILL DATES PROVIDED"
        print "***** FINDING ALL Sundays BETWEEN {start} AND {end}..."\
               .format(start=start_backfill, end=end_backfill),
        # get all Sundays between the two provided dates
        d1 = datetime.strptime(start_backfill, "%Y-%m-%d").date()
        d2 = datetime.strptime(end_backfill, "%Y-%m-%d").date()

        delta = d2 - d1
        for i in range(delta.days + 1):
            day = d2 - timedelta(days=i)
            if day.weekday() in [6]:
                end_date = day
                start_date = day - timedelta(days=6)
                dates.append( (start_date, end_date) )
        print "{} DATES".format(len(dates))
    else:
        # only use week ending today - 14 days
        end_date = date.today() - timedelta(days=14)
        start_date = end_date - timedelta(days=6)
        dates.append( (start_date, end_date) )

    # loop through all dates
    for i, d in enumerate(dates):
        print
        print "***** DATE {curr} of {tot}".format(curr=i+1, tot=len(dates))
        start_date = d[0]
        end_date = d[1]
        start_date_str = start_date.strftime("%Y%m%d")
        end_date_str = end_date.strftime("%Y%m%d")
        print "***** Week of interest: {start} :: {end}".format(start=start_date, end=end_date)

        # transform into longitudinal format
        longitudinal = make_longitudinal(filteredPingsDF)

        # apply mapping function
        mapped = longitudinal.rdd.map(mapping)

        # get counts of cohorts
        cohorts = mapped.countByKey()
        print "\tNumber of profiles that created their profile in the week of analysis:"
        print "\t\t{:,}".format(cohorts["yes: pcd in w"]*100)
        print "\tNumber of profiles that were active before {}, inactive until {},\n\tand active in the week of analysis:"\
              .format(D0_T0.isoformat(), D0.isoformat())
        print "\t\t{:,}".format(cohorts["yes: active b, inactive s, active w"]*100)
        
        # get positives, convert to DF
        print "\tConverting to DF...",
        positives_rdd = get_positives(mapped)
        cohort1_rdd = get_cohort(positives_rdd, "yes: active b, inactive s, active w")
        cohort2_rdd = get_cohort(positives_rdd, "yes: pcd in w")
        print "Done!",
        positives_df = toDF(positives_rdd)
        cohort1_df = toDF(cohort1_rdd)
        cohort2_df = toDF(cohort2_rdd)
        print "Done!",
        upgraded, not_upgraded = upgraded_not_upgraded(cohort1_df)
        print "Done!"
        
        # write to dict
        write_dict_json("fx_newusers", cohorts, start_date_str, end_date_str)
        print "DONE!"

    print 
    print "**** MERGING SUMMARY JSON FILES...",
    # merge summary JSON files into one
    !jq -c -s "[.[]|.[]]" fx_newusers-*.json > "fx_newusers.json"
    print "DONE!"

In [14]:
main_alg()

***** READING DATA... DONE!

***** DATE 1 of 1
***** Week of interest: 2017-02-07 :: 2017-02-13
	Number of profiles that created their profile in the week of analysis:
		9,186,100
	Number of profiles that were active before 2016-06-15, inactive until 2016-12-12,
	and active in the week of analysis:
		652,100
	Converting to DF... Done! Done! Done!
DataFrame[cohort: string, last_version_in_b: string, first_version_in_w: string, cid: string, pcd: bigint, ssd: array<string>, ssl: array<bigint>, av: array<string>, pcd_date: date, num_ssd_in_w: bigint, tot_ssl_in_w: bigint, num_ssd_in_b: bigint, tot_ssl_in_b: bigint, days_in_s: bigint]
DataFrame[cohort: string, last_version_in_b: string, first_version_in_w: string, cid: string, pcd: bigint, ssd: array<string>, ssl: array<bigint>, av: array<string>, pcd_date: date, num_ssd_in_w: bigint, tot_ssl_in_w: bigint, num_ssd_in_b: bigint, tot_ssl_in_b: bigint, days_in_s: bigint]
DONE!

**** MERGING SUMMARY JSON FILES... DONE!


In [None]:
#main_alg("2016-09-01", "2016-10-01")

***** READING DATA... DONE!
***** BACKFILL DATES PROVIDED
***** FINDING ALL Sundays BETWEEN 2016-09-01 AND 2016-10-01... 4 DATES

***** DATE 1 of 4
***** Week of interest: 2016-09-19 :: 2016-09-25
