# Imports

In [1]:
from operator import add
import numpy as np

## Get data

In [2]:
frame = sqlContext.sql("SELECT * FROM longitudinal")

## Convert to RDD

In [3]:
rdd = frame.rdd

In [4]:
# uncomment to see what one row of data looks like
#rdd.take(1)

In [5]:
# uncomment to see how to get one value from a row (i.e. 'client_id')
#rdd.take(1)[0]['client_id']

In [6]:
# easy example of applying a function to a df/rdd
# count all the rows in the df/rdd
frame.select("client_id").rdd.count()

7834101

In [8]:
# uncomment to see how to get one value from a list of values (i.e. 'application_name' in 'build')
#rdd.take(1)[0].build[0].application_name

## Create RDD with only the columns that we need

In [4]:
rdd1 = frame.select("client_id", "subsession_start_date").rdd

In [5]:
# small sample for testing
rdd2 = frame.select("client_id", "subsession_start_date").limit(1000).rdd

### Create list with all dates in September

In [11]:
from dateutil import rrule
from datetime import datetime

start_date = '2016-09-01'
end_date = '2016-09-30'

september_dates = [dt.strftime('%Y-%m-%d') for dt in rrule.rrule(rrule.DAILY,
                      dtstart=datetime.strptime(start_date, '%Y-%m-%d'),
                      until=datetime.strptime(end_date, '%Y-%m-%d'))]
#print september_dates
#['2016-09-01', '2016-09-02', '2016-09-03', '2016-09-04', '2016-09-05', '2016-09-06',
#'2016-09-07', '2016-09-08', '2016-09-09', '2016-09-10', '2016-09-11', '2016-09-12',
#'2016-09-13', '2016-09-14', '2016-09-15', '2016-09-16', '2016-09-17', '2016-09-18',
#'2016-09-19', '2016-09-20', '2016-09-21', '2016-09-22', '2016-09-23', '2016-09-24',
#'2016-09-25', '2016-09-26', '2016-09-27', '2016-09-28', '2016-09-29', '2016-09-30']

start_date = '2016-10-01'
end_date = '2016-10-31'

october_dates = [dt.strftime('%Y-%m-%d') for dt in rrule.rrule(rrule.DAILY,
                      dtstart=datetime.strptime(start_date, '%Y-%m-%d'),
                      until=datetime.strptime(end_date, '%Y-%m-%d'))]

## MAU in September

I think this is correct, but there may be a better implementation possible

- set: used to only keep one entry per user per month
- subset for September (is there a better way to do this?)

### MapReduce
For each row (unique user), map when the user was active using a flatMap

```
user1 -> (month1, 1)
         (month4, 1)
         ...
user2 -> (month1, 1)
user3 -> (month2, 1)
         (month3, 1)
         (month4, 1)
         ...
...
```

Then reduce by key (months) while adding the values together (i.e. ~ `select sum() group by keys`)

```
(month1, 244)
(month2, 352)
(month3, 187)
(month4, 555)
...
```

In [22]:
# Example to see output of a **map** (not flatMap) on small subset of data
#rdd2\
#     .map(lambda x: list(set([(x['subsession_start_date'][i][:7], 1) for i in range(len(x['subsession_start_date']))])))\
#     .collect()

In [7]:
MAU = rdd1\
     .flatMap(lambda x: list(set([(x['subsession_start_date'][i][:7], 1) for i in range(len(x['subsession_start_date']))])))\
     .reduceByKey(add)\
     .collectAsMap()


In [8]:
MAU

{u'2041-10': 7,
 u'2103-02': 1,
 u'2514-07': 1,
 u'2099-11': 11,
 u'2062-05': 4,
 u'3012-07': 1,
 u'9900-07': 1,
 u'2099-12': 43,
 u'2103-01': 8,
 u'2081-07': 5,
 u'2081-06': 2,
 u'2081-05': 8,
 u'2081-04': 2,
 u'2081-03': 1,
 u'2081-02': 2,
 u'2081-01': 3,
 u'6240-01': 1,
 u'2081-09': 5,
 u'2081-08': 5,
 u'2020-01': 50,
 u'2035-04': 3,
 u'5009-10': 1,
 u'4643-01': 1,
 u'2035-05': 13,
 u'2020-04': 31,
 u'10004-0': 3,
 u'2020-05': 61,
 u'2095-11': 3,
 u'2035-08': 4,
 u'2095-12': 1,
 u'2020-08': 72,
 u'19222-1': 6,
 u'2158-06': 1,
 u'2089-09': 4,
 u'2089-08': 4,
 u'2089-07': 7,
 u'2089-06': 1,
 u'2089-05': 5,
 u'2089-04': 2,
 u'2089-02': 2,
 u'2089-01': 4,
 u'3544-08': 1,
 u'2255-03': 1,
 u'2132-01': 1,
 u'3216-06': 1,
 u'3216-07': 2,
 u'3216-08': 1,
 u'3216-09': 3,
 u'13601-0': 1,
 u'2086-11': 1,
 u'2086-10': 1,
 u'2086-12': 2,
 u'5039-07': 13,
 u'15300-0': 1,
 u'2766-05': 1,
 u'12561-0': 1,
 u'4680-09': 1,
 u'16516-1': 1,
 u'16516-0': 5,
 u'2116-08': 7,
 u'2116-09': 4,
 u'2116-04': 3,


In [9]:
MAU['2016-09']

3538471

## DAU in September
I think this is correct, but there may be a better implementation possible
- if user had two submissions the same day, then only keep one (done: set)
- keep only september (done: maybe there's a better solution)

### MapReduce
For each row (unique user), map when the user was active using a flatMap

```
user1 -> (day1, 1)
         (day2, 1)
         (day5, 1)
         (day6, 1)
         ...
user2 -> (day1, 1)
         (day4, 1)
         (day5, 1)
         ...
...
```

Then reduce by key (dates) while adding the values together (i.e. ~ `select sum() group by keys`)

```
(day1, 24)
(day2, 3)
(day3, 18)
(day4, 0)
...
(day54, 63)
...
```

In [24]:
# Example to see output of a **map** (not flatMap) on small subset of data
#rdd2\
#     .map(lambda x: list(set([(x['subsession_start_date'][i][:10], 1) for i in range(len(x['subsession_start_date']))])))\
#     .collect()

In [10]:
DAU = rdd1\
     .flatMap(lambda x: list(set([(x['subsession_start_date'][i][:10], 1) for i in range(len(x['subsession_start_date']))])))\
     .reduceByKey(add)\
     .collectAsMap()


### Subset for dates that we want

In [12]:
DAU_September = {key: value for key, value in DAU.items() if key in september_dates}
DAU_October = {key: value for key, value in DAU.items() if key in october_dates}

### Final output

In [13]:
DAU_October

{u'2016-10-01': 942261,
 u'2016-10-02': 906932,
 u'2016-10-03': 1279734,
 u'2016-10-04': 1316103,
 u'2016-10-05': 1308159,
 u'2016-10-06': 1295709,
 u'2016-10-07': 1236799,
 u'2016-10-08': 949844,
 u'2016-10-09': 924480,
 u'2016-10-10': 1288736,
 u'2016-10-11': 1298171,
 u'2016-10-12': 1262745,
 u'2016-10-13': 1321990,
 u'2016-10-14': 1263683,
 u'2016-10-15': 959830,
 u'2016-10-16': 931143,
 u'2016-10-17': 1341367,
 u'2016-10-18': 1342128,
 u'2016-10-19': 1331150,
 u'2016-10-20': 1312737,
 u'2016-10-21': 1247593,
 u'2016-10-22': 954651,
 u'2016-10-23': 925447,
 u'2016-10-24': 1322436,
 u'2016-10-25': 1327730,
 u'2016-10-26': 1303623,
 u'2016-10-27': 1283953,
 u'2016-10-28': 1220465,
 u'2016-10-29': 926534,
 u'2016-10-30': 881221,
 u'2016-10-31': 1189876}

# Mean of the mean inter-day activity of a profile in June-July 16

In [None]:
## ERROR: rdd1 is too big for this job! Need to reduce (works fine with rdd2 though)
# List of lists of unique subsession_start_date per profile
ssd_per_profile = rdd2\
     .map(lambda x: list(set([(x['subsession_start_date'][i][:10], 1) for i in range(len(x['subsession_start_date']))])))\
     .collect()

In [None]:
# Keep only those that were active more than once
ssd_per_profile_gt1 = [entry for entry in ssd_per_profile if len(entry)>1]

In [None]:
# Proportion of profile with >1 activity?
print "%s percent of profiles are active more than one day" %(1.0*len(ssd_per_profile_gt1)/len(ssd_per_profile) * 100)

In [None]:
# Filter out entries outside our dates range
# '2016-06-01' and '2016-07-31'
# keep only those with >1 entry
ssd_per_profile_june_july = []
for profile in ssd_per_profile_gt1:
    profile_entries = []
    for entry in profile:
        if '2016-06-01' <= entry[0] <= '2016-07-31':
            profile_entries.append(entry[0])
    if len(profile_entries)>1:
        ssd_per_profile_june_july.append(sorted(profile_entries))

In [None]:
 # For each profile, get all inter-day intervals
interday_per_profile = []
for profile in ssd_per_profile_june_july:
    interday = []
    for i in range(len(profile)-1):
        curr_date = datetime.strptime(profile[i], '%Y-%m-%d')
        next_date = datetime.strptime(profile[i+1], '%Y-%m-%d')
        diff_days = abs(curr_date - next_date).days
        interday.append(diff_days)
    interday_per_profile.append(interday)

In [None]:
# For each profile, get the mean inter-day interval length
mean_interdays_per_profile = [np.mean(profile) for profile in interday_per_profile]

In [None]:
# Mean inter-day interval length
print "The mean inter-day interval length is %f days" %np.mean(mean_interdays_per_profile)

# Frequeny distribution of the values of this mean observed in the data set

In [None]:
mean_interdays_per_profile_rounded = [round(value, 2) for value in mean_interdays_per_profile]

In [None]:
import matplotlib.pyplot as plt

plt.hist(mean_interdays_per_profile_rounded, bins=50, normed=True)
plt.show()

In [None]:
print "25th percentile: %f" % np.percentile(mean_interdays_per_profile_rounded, 25)
print "50th percentile: %f" % np.percentile(mean_interdays_per_profile_rounded, 50)
print "75th percentile: %f" % np.percentile(mean_interdays_per_profile_rounded, 75)

# Testing

In [20]:
# still too big
# need to figure out the "reduce" part for this
## 1) keep only specified dates
## 2) get distance between each entry
## 3) get mean per profile
## 4) optional: get mean of means
test = rdd1\
     .map(lambda x: list(set([(x['subsession_start_date'][i][:10], 1) for i in range(len(x['subsession_start_date']))])))\
     .filter(lambda x: len(x)>1)\
     .collect()

KeyboardInterrupt: 

In [18]:
len(test)

597