# gHealth by Brandon Fetters

This is the final project created during my Data Engineering course at [GalvanizeU](http://www.galvanizeu.com/). We were required to create an application architecture which would be able to support big data requirements.

I chose to develop an application for tracking individual health information with [myfitnesspal](https://www.myfitnesspal.com/) as my inspiration.

The project was divided into the following steps:
1. [Define Avro Schema](#Define-Avro-Schema)
2. [Generate Data](#Generate-Data)
3. [Data Serialization](#Data-Serialization)
4. [Vertical Partitioning](#Vertical-Partitioning)
5. [Batch Views](#Batch-Views) ([Create](#Create-Batch-Views),[Query](#Query-Batch-Views))
6. [Speed Layer](#Speed-Layer)

### Define Avro Schema

<img src='gHealth_GraphSchema.png'>

In [1]:
import avro.io
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

In [2]:
%%writefile gHealth_schema.avsc
[
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "Pedigree",
        "fields": [{"name": "true_as_of_secs", "type": "int"}]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "UserID1",
        "fields": [{"name": "cookie", "type": "string"}]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "UserID2",
        "fields": [{"name": "user_id", "type": "string"}]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "GroupID",
        "fields": [{"name": "group_id", "type": "string"}]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "GoalID",
        "fields": [{"name": "goal_id", "type": "string"}]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "MealID",
        "fields": [{"name": "meal_id", "type": "string"}]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "ActivityID",
        "fields": [{"name": "activity_id", "type": "string"}]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "UserProperty",
        "fields": [
            {
                "name": "user_id",
                "type": ["UserID1","UserID2"]
            },
            {
                "name": "property",
                "type": [
                    {
                        "type": "record",
                        "name": "UserPropertyValue1",
                        "fields": [{"name": "name", "type": "string"}]
                    },
                    {
                        "type": "record",
                        "name": "UserPropertyValue3",
                        "fields": [{"name": "dob", "type": "int"}]
                    },
                    {
                        "type": "record",
                        "name": "UserPropertyValue4",
                        "fields": [
                            {
                                "name": "gender", 
                                "type": {
                                    "type": "enum",
                                    "name": "GenderType",
                                    "symbols": ["MALE", "FEMALE"]
                                }
                            }
                        ]
                    },
                    {
                        "type": "record",
                        "name": "UserPropertyValue5",
                        "fields": [
                            {
                                "name": "location", 
                                "type": {
                                    "type": "record",
                                    "name": "Location",
                                    "fields": [
                                        {"name": "zipcode", "type": ["int", "null"]},
                                        {"name": "city", "type": ["string", "null"]},
                                        {"name": "state", "type": ["string", "null"]},
                                        {"name": "country", "type": [ "string","null"]}
                                    ]
                                }
                            }
                        ]
                    }
                ]
            }
        ]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "GroupProperty",
        "fields": [
            {
                "name": "group_id",
                "type": "GroupID"
            },
            {
                "name": "property",
                "type": [
                    {
                        "type": "record",
                        "name": "GroupPropertyValue1",
                        "fields": [{"name": "group_name", "type": "string"}]
                    }
                ]
            }
        ]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "GoalProperty",
        "fields": [
            {
                "name": "goal_id",
                "type": "GoalID"
            },
            {
                "name": "property",
                "type": [
                    {
                        "type": "record",
                        "name": "GoalPropertyValue1",
                        "fields": [{"name": "goal_name", "type": "string"}]
                    },
                    {
                        "type": "record",
                        "name": "GoalPropertyValue2",
                        "fields": [{"name": "goal_desc", "type": "string"}]
                    },
                    {
                        "type": "record",
                        "name": "GoalPropertyValue3",
                        "fields": [{"name": "goal_value", "type": "int"}]
                    },
                    {
                        "type": "record",
                        "name": "GoalPropertyValue4",
                        "fields": [{"name": "goal_unit", "type": "string"}]
                    },
                    {
                        "type": "record",
                        "name": "GoalPropertyValue5",
                        "fields": [{"name": "goal_status", "type": "string"}]
                    }
                ]
            }      
        ]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "MealProperty",
        "fields": [
            {
                "name": "meal_id",
                "type": "MealID"
            },
            {
                "name": "property",
                "type": [
                    {
                        "type": "record",
                        "name": "MealPropertyValue1",
                        "fields": [{"name": "meal_type", "type": "string"}]
                    },
                    {
                        "type": "record",
                        "name": "MealPropertyValue2",
                        "fields": [{"name": "meal_name", "type": "string"}]
                    },
                    {
                        "type": "record",
                        "name": "MealPropertyValue3",
                        "fields": [{"name": "meal_desc", "type": "string"}]
                    }
                ]
            }
        ]
    },     
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "ActivityProperty",
        "fields": [
            {
                "name": "activity_id",
                "type": "ActivityID"
            },
            {
                "name": "property",
                "type": [
                    {
                        "type": "record",
                        "name": "ActivityPropertyValue1",
                        "fields": [{"name": "activity_type", "type": "string"}]
                    },
                    {
                        "type": "record",
                        "name": "ActivityPropertyValue2",
                        "fields": [{"name": "activity_length", "type": "int"}]
                    },
                    {
                        "type": "record",
                        "name": "ActivityPropertyValue3",
                        "fields": [{"name": "activity_unit", "type": "string"}]
                    }
                ]
            }
        ]
    },    
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "EquivEdge",
        "fields": [
            {"name": "user_id1", "type": ["UserID1", "UserID2"]},
            {"name": "user_id2", "type": ["UserID1", "UserID2"]}
        ]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "GroupEdge",
        "fields": [
            {"name": "group_id", "type": "GroupID"},
            {"name": "user_id", "type": ["UserID1", "UserID2"]},
            {"name": "role", "type": {"type": "enum",
                                      "name": "RoleType",
                                      "symbols": ["OWNER", "MEMBER"]}}
        ]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "GoalEdge",
        "fields": [
            {"name": "user_id", "type": ["UserID1", "UserID2"]},
            {"name": "goal_id", "type": "GoalID"}
        ]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "MealEdge",
        "fields": [
            {"name": "user_id", "type": ["UserID1", "UserID2"]},
            {"name": "meal_id", "type": "MealID"}
        ]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "ActivityEdge",
        "fields": [
            {"name": "user_id", "type": ["UserID1", "UserID2"]},
            {"name": "activity_id", "type": "ActivityID"}
        ]
    },
    {
        "namespace": "gHealth.avro",
        "type": "record",
        "name": "Data",
        "fields": [
            {
                "name": "pedigree",
                "type": "Pedigree"
            },
            {
                "name": "dataunit",
                "type": [
                    {
                        "type": "record",
                        "name": "DataUnit1",
                        "fields": [{"name": "user_property", "type": "UserProperty"}]
                    },
                    {
                        "type": "record",
                        "name": "DataUnit2",
                        "fields": [{"name": "equiv", "type": "EquivEdge"}]
                    },
                    {
                        "type": "record",
                        "name": "DataUnit3",
                        "fields": [{"name": "group_property", "type": "GroupProperty"}]
                    },
                    {
                        "type": "record",
                        "name": "DataUnit4",
                        "fields": [{"name": "group_edge", "type": "GroupEdge"}]
                    },
                    {
                        "type": "record",
                        "name": "DataUnit5",
                        "fields": [{"name": "goal_property", "type": "GoalProperty"}]
                    },
                    {
                        "type": "record",
                        "name": "DataUnit6",
                        "fields": [{"name": "goal_edge", "type": "GoalEdge"}]
                    },
                    {
                        "type": "record",
                        "name": "DataUnit7",
                        "fields": [{"name": "meal_property", "type": "MealProperty"}]
                    },
                    {
                        "type": "record",
                        "name": "DataUnit8",
                        "fields": [{"name": "meal_edge", "type": "MealEdge"}]
                    },
                    {
                        "type": "record",
                        "name": "DataUnit9",
                        "fields": [{"name": "activity_property", "type": "ActivityProperty"}]
                    },
                    {
                        "type": "record",
                        "name": "DataUnit10",
                        "fields": [{"name": "activity_edge", "type": "ActivityEdge"}]
                    }
                ]
            }
        ]
    }
]

Overwriting gHealth_schema.avsc


### Generate Data
[top](#gHealth-by-Brandon-Fetters)

In [3]:
%%file gen_gHealth_data.py
from datetime import datetime
import calendar,random,time,uuid

# Various libraries for data generation
import names
from barnum import gen_data
    
### Create data generator ###
def generate_data(n_users=1,n_meals=1,n_activities=1,gen_goals=True,gen_groups=True):
    
    # List to store all created data which is returned as output
    data = []
    
    # Create generator for unique id's to be used for nodes
    get_uuid = gen_uuids()
    user_ids = [get_uuid.next() for i in xrange(n_users)]
    users_sample = perform_ratio_sampling(user_ids,n_users,.5)
    
    ### GENERATE NODES
    # USERS
    locations = generate_locations(n_users)
    for user_id in user_ids:
        data += generate_user(user_id,locations)

    # MEALS
    for i in xrange(n_meals):
        data += generate_meal(get_uuid,users_sample)
        
    # ACTIVITIES
    for i in xrange(n_activities):
        data += generate_activity(get_uuid,users_sample)

    # GOALS
    if gen_goals:
        data += generate_goals(get_uuid,user_ids,n_users)

    # GROUPS
    if gen_groups:
        group_owners = perform_ratio_sampling(user_ids,n_users,.15)
        for group_owner in group_owners:
            data += generate_group(get_uuid,user_ids,group_owner,n_users)

    return data

def generate_user(user_id,locations):
    '''    
    [{"name": "user_property", "type": "UserProperty"}]
    user_id, name, dob, height, weight, gender, location
    '''
    
    # Reference lists for randomly generating values
    genders = ['MALE','FEMALE']
    
    # Create variables for any random generated values which need to be used multiple times
    # (e.g., names and cookies)
    gender = random.choice(genders)
    name = names.get_full_name(gender=gender.lower())
    dob = random.randrange(-315619200,1104451200) # (1/1/1960 - 12/31/2004) in seconds since epoch
    zipcode,city,state,country = random.choice(locations)
    height = random.randint(60,78)
    weight = random.randint(90,350)
    
    # Generate random timestamp from range.
    # TODO: Need to account for cases when "child" has timestamp earlier than "parent".
    ts = create_timestamp(90,14)
    cookie = ''.join(random.sample(str(ts),len(str(ts))))
    
    # Generate single user
    user = [
           {"pedigree": {"true_as_of_secs": ts},
            "dataunit": {"user_property": {"user_id": {"user_id": user_id},
                                           "property": {"name": name}}}},
           {"pedigree": {"true_as_of_secs": ts},
            "dataunit": {"user_property": {"user_id": {"user_id": user_id},
                                           "property": {"dob": int(dob)}}}},
           {"pedigree": {"true_as_of_secs": ts},
            "dataunit": {"user_property": {"user_id": {"user_id": user_id},
                                           "property": {"gender": gender}}}},
           {"pedigree": {"true_as_of_secs": ts},
            "dataunit": {"user_property": {"user_id": {"user_id": user_id},
                                           "property": {"location": {"zipcode" : zipcode,
                                                                     "city" : city,
                                                                     "state": state,
                                                                     "country": country}}}}}
           ]
    
    ## Also generate equiv edge for every user
    # [{"name": "equiv", "type": "EquivEdge"}]
    # user_id1,user_id2
    equiv = [{"pedigree": {"true_as_of_secs": ts},
              "dataunit": {"equiv": {"user_id1": {"cookie": cookie},
                                     "user_id2": {"user_id": user_id}}}}]
    
    return user + equiv

def generate_meal(get_uuid,users_sample):
    '''
    [{"name": "meal_property", "type": "MealProperty"}]
    meal_id, meal_type, meal_name, meal_desc
    '''
    
    # Reference lists for randomly generating values
    meal_types = ['Breakfast','Lunch','Dinner','Snack']
    meal_desc = ['Popcorn','Doritos','Seafood Paella','Som Tam','Chicken Rice','Poutine',
                 'Fish Tacos','French Toast','Chicken Parmigiana', 'Pulled Pork Sandwich',
                 'Chili Crab','Pancakes with Syrup','Fish N Chips','Ankimo','Sushi',
                 'Steak','Chicken Fajitas','Lasagna','Brownie and Vanilla Ice Cream',
                 'Almond Croissant','Twix','Arepas','Nam Tok Moo','Chicken Kebab','Lobster',
                 'Sour Patch Kids','Chocolate Covered Donut','Shepherds Pie','Rendang',
                 'Chicken Muamba','Tom Yum Goong','Penang Assam Laksa','Cheesburger','Pizza',
                 'Spinach Salad','Protein Shake','Peking Duck','Larb Gai','Palak Paneer',
                 'Cheerios','Mac N Cheese','Hot Dog','NY Cheesecake','PB & J','Cashews',
                 'Chile Rellenos','Nachos','Chicken Wings','Meatloaf','Egg Custard']
    
    # Create variables for any random generated values which need to be used multiple times
    # Generate random timestamp from range.
    meal_id = get_uuid.next()
    # TODO: Need to account for cases when "child" (edge node) has timestamp
    # earlier than "parent" (user node).
    ts = create_timestamp(90,14)
    
    # Generate single meal
    meal = [
           {"pedigree": {"true_as_of_secs": ts},
            "dataunit": {"meal_property": {"meal_id": {"meal_id": meal_id},
                                           "property": {"meal_type": random.choice(meal_types)}}}},
           {"pedigree": {"true_as_of_secs": ts},
            "dataunit": {"meal_property": {"meal_id": {"meal_id": meal_id},
                                           "property": {"meal_name": random.choice(meal_desc)}}}},
           {"pedigree": {"true_as_of_secs": ts},
            "dataunit": {"meal_property": {"meal_id": {"meal_id": meal_id},
                                           "property": {"meal_desc": gen_data.create_sentence()}}}}
           ]
    
    ## Also generate meal_edge for at least 1 user
    # [{"name": "meal_edge", "type": "MealEdge"}]
    # user_id, meal_id
    meal_edge = [{"pedigree": {"true_as_of_secs": ts},
                  "dataunit": {"meal_edge": {"meal_id": {"meal_id": meal_id},
                                             "user_id": {"user_id": random.choice(users_sample)}}}}]
        
    return meal + meal_edge

def generate_activity(get_uuid,users_sample):
    '''
    [{"name": "activity_property", "type": "ActivityProperty"}]
    activity_id, activity_type, activity_length, activity_unit
    '''
    
    # Reference lists for randomly generating values
    activity_types = ['Running','Cycling','Mountain Biking','Walking','Hiking','Downhill Skiing',
                      'Cross Country Skiing','Snowboarding','Skating','Swimming','Rowing',
                      'Crossfit','Weight Training','Other']
    activity_units = ['Miles','Laps','Hours','Minutes']
    
    # Create variables for any random generated values which need to be used multiple times
    # (e.g., names and cookies)
    activity_id = get_uuid.next()
    activity_length = random.randint(1,50)
    # Generate random timestamp from range.
    # TODO: Need to account for cases when "child" has timestamp earlier than "parent".
    ts = create_timestamp(90,14)
    
    # Generate single activity
    activity = [
               {"pedigree": {"true_as_of_secs": ts},
                "dataunit": {"activity_property": {"activity_id": {"activity_id": activity_id},
                                                   "property": {"activity_type": random.choice(activity_types)}}}},
               {"pedigree": {"true_as_of_secs": ts},
                "dataunit": {"activity_property": {"activity_id": {"activity_id": activity_id},
                                                   "property": {"activity_length": activity_length}}}},
               {"pedigree": {"true_as_of_secs": ts},
                "dataunit": {"activity_property": {"activity_id": {"activity_id": activity_id},
                                                   "property": {"activity_unit": random.choice(activity_units)}}}}
               ]
    
    ## Also generate activity_edge for at least 1 user
    # [{"name": "activity_edge", "type": "ActivityEdge"}]
    # user_id, activity_id
    activity_edge = [{"pedigree": {"true_as_of_secs": ts},
                      "dataunit": {"activity_edge": {"activity_id": {"activity_id": activity_id},
                                                     "user_id": {"user_id": random.choice(users_sample)}}}}]
    
    return activity + activity_edge

def generate_goals(get_uuid,user_ids,n_users):
    '''    
    [{"name": "goal_property", "type": "GoalProperty"}]
    goal_id, goal_name, goal_desc, goal_value, goal_unit, goal_status
    [{"name": "goal_edge", "type": "GoalEdge"}]
    user_id, goal_id
    '''
    
    # Reference lists for randomly generating values
    # Create goals for a random sample of 10% of users.
    goal_users = perform_ratio_sampling(user_ids,n_users,.1)
    goals_text = [('Lose Weight','Lose ' + str(random.randint(5,30)) + ' pounds.'),
                  ('Exercise','Exercise ' + str(random.randint(3,7)) + ' times this week.'),
                  ('Eat out less.','Only eat out ' + str(random.randint(1,3)) + ' times per week.'),
                  ('Get more sleep', 'Get ' + str(random.randint(8,12)) + ' hours of sleep per night'),
                  ('Cut out sugar','Cut sugar intake by ' + str(random.choice(['50%','60%','75%']))),
                  ('Workout!','Workout for at least 30 minutes every day this week.')]
    goal_status = ['Not Started','Underway','Continuous','Completed']
    
    goals = []
    for user_id in goal_users:
        # Create variables for any random generated values which need to be used multiple times
        goal_id = get_uuid.next()
        # Generate random timestamp from range.
        # TODO: Need to account for cases when "child" has timestamp earlier than "parent".
        ts = create_timestamp(90,14)
        goal_name,goal_desc = random.choice(goals_text)
        
        goal = [
               {"pedigree": {"true_as_of_secs": ts},
                "dataunit": {"goal_property": {"goal_id": {"goal_id": goal_id},
                                               "property": {"goal_name": goal_name}}}},
               {"pedigree": {"true_as_of_secs": ts},
                "dataunit": {"goal_property": {"goal_id": {"goal_id": goal_id},
                                               "property": {"goal_desc": goal_desc}}}},
               {"pedigree": {"true_as_of_secs": ts},
                "dataunit": {"goal_property": {"goal_id": {"goal_id": goal_id},
                                               "property": {"goal_status": random.choice(goal_status)}}}},
               # Go ahead and create goal_edge inline with the goal itself.
               {"pedigree": {"true_as_of_secs": ts},
                "dataunit": {"goal_edge": {"goal_id": {"goal_id": goal_id},
                                           "user_id": {"user_id": user_id}}}}
               ]
        goals += goal
    
    return goals

def generate_group(get_uuid,user_ids,group_owner,n_users):
    '''
    [{"name": "group_property", "type": "GroupProperty"}]
    group_id, name
    [{"name": "group_edge", "type": "GroupEdge"}]
    group_id, user_id, role
    '''
    
    # Reference lists for randomly generating values
    group_names = ['Family','Friends','Coworkers','Elite','Workout Fun','Siblings','Freedom']
    
    # Create variables for any random generated values which need to be used multiple times
    group_id = get_uuid.next()
    group_name = random.choice(group_names) + '_' + str(group_owner)
    ts = create_timestamp(90,14)
    
    group = [{"pedigree": {"true_as_of_secs": ts},
             "dataunit": {"group_property": {"group_id": {"group_id": group_id},
                                             "property": {"group_name": group_name}}}}]
    
    group_edges = [{"pedigree": {"true_as_of_secs": ts},
                    "dataunit": {"group_edge": {"group_id": {"group_id": group_id},
                                                "user_id": {"user_id": group_owner},
                                                "role": 'OWNER'}}}]
    for group_member in random.sample(user_ids,random.randint(3,min(n_users,15))):
        group_edge = [{"pedigree": {"true_as_of_secs": ts},
                       "dataunit": {"group_edge": {"group_id": {"group_id": group_id},
                                                   "user_id": {"user_id": group_member},
                                                   "role": 'MEMBER'}}}]
        group_edges += group_edge
    
    return group + group_edges    

### HELPER METHODS ###
def gen_uuids():
    while True:
        yield str(uuid.uuid4())
        
def create_timestamp(back,forward):
    '''
    Using the current time generate a timestamp within the range of 90 days in the past
    and 14 days in the future.
    INPUTS: 
        - back(int): how many days back should the range start
        - forward(int): how many days forward should the range end
    OUTPUTS:
        - ts(int): randomly generated timestamp from range
    '''
    start = int(time.time() - (back * 86400))
    end = int(time.time() + (forward * 86400))
    return random.randrange(start,end)

def generate_locations(n_users,location_ratio=.1):
    '''
    Generate a list of locations stored as a tuple of (city,state,country). The number of locations
    generated will depend on the desired ratio of users to locations. The default location_ratio
    is low to ensure a percentage of users will share locations.
    INPUTS:
        - n_users(int): Number of users being generated
        - location_ratio(float): The desired ratio for n_users by n_locations.
    OUTPUTS:
        - locs (list): List of generated location tuples in the form (city,state,country)
    '''
    locs = []
    # Based on the desired ratio of locations to users
    if int(n_users * location_ratio) < 1:
        n_locs = 1
    else:
        n_locs = int(n_users * location_ratio)
    
    for n in xrange(n_locs):
        zipcode,city,state = gen_data.create_city_state_zip()
        locs.append((int(zipcode),city,state,'US'))
    
    return locs

def perform_ratio_sampling(population,count,percent):
    '''
    Control number of non-primary nodes created by randomly sampling a
    population based on a ratio. For example, not all users would be expected to
    create activities, so randomly sample the population of users and only create
    activities for a percentage of the users.
    INPUTS:
        - population(list): data to be sampled
        - count(int): total count to be used for sampling. this is required because
                      you cannot just use len() for generators
        - percent(float): the percent to use for our sampling ratio
    '''
    # The number of samples must be at least 1
    if count < 2:
        n_samples = 1
    else:
        n_samples = int(count * percent)
    return random.sample(population,n_samples)

Overwriting gen_gHealth_data.py


In [3]:
from pretty import pprint

VALIDATE = False

# Generate testing data and output for validation
n_users = 5000
n_meals = n_users * 4
n_activities = n_users * 3
test_data = generate_data(n_users=n_users,n_meals=n_meals,n_activities=n_activities)

schema = avro.schema.parse(open("gHealth_schema.avsc").read())
def test_good_data_health(datum, schema=schema):
    return avro.io.validate(schema, datum)

# print data for quick sanity check on values
if VALIDATE:
#     pprint(test_data)
    print map(test_good_data_health, test_data)

### Data Serialization
[top](#gHealth-by-Brandon-Fetters)

In [112]:
# Delete old file
!rm gHealth.avro

In [113]:
# Importing required packages For file Read & Write 
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter  
from avro.io import DatumReader, DatumWriter
 
# Schema parsing from a schema file
schema = avro.schema.parse(open("gHealth_schema.avsc").read())
 
# Creation of DataFileWriter instance with above schema
writer = DataFileWriter(open("gHealth.avro", "w"), DatumWriter(), schema)
 
# Loop through generated data and append one at a time to the file.
total = len(test_data)-1 # Subtract one because i will start at 0
for i,dataunit in enumerate(test_data):
    if i % 10000 == 0 or i == total:
        print "{} of {} records processed.".format(i,total)
    writer.append(dataunit)
    
# Close the data file
writer.close()

0 of 175306 records processed.
10000 of 175306 records processed.
20000 of 175306 records processed.
30000 of 175306 records processed.
40000 of 175306 records processed.
50000 of 175306 records processed.
60000 of 175306 records processed.
70000 of 175306 records processed.
80000 of 175306 records processed.
90000 of 175306 records processed.
100000 of 175306 records processed.
110000 of 175306 records processed.
120000 of 175306 records processed.
130000 of 175306 records processed.
140000 of 175306 records processed.
150000 of 175306 records processed.
160000 of 175306 records processed.
170000 of 175306 records processed.
175306 of 175306 records processed.


In [114]:
# Read the above created avro data file. Opening file in read mode for quick sanity check. 
reader = DataFileReader(open("gHealth.avro", "r"), DatumReader())
for i,dataunit in enumerate(reader):
    if i < 3:
        print pprint(dataunit)
    else:
        break

# Close the avro data file after completion of reading it.
reader.close()

{u'dataunit': {u'user_property': {u'property': {u'name': u'Janet Crumpton'},
   u'user_id': {u'user_id': u'96708369-055f-422e-abed-91bca7d16a12'}}},
 u'pedigree': {u'true_as_of_secs': 1441214997}}
None
{u'dataunit': {u'user_property': {u'property': {u'dob': 471118225},
   u'user_id': {u'user_id': u'96708369-055f-422e-abed-91bca7d16a12'}}},
 u'pedigree': {u'true_as_of_secs': 1441214997}}
None
{u'dataunit': {u'user_property': {u'property': {u'gender': u'FEMALE'},
   u'user_id': {u'user_id': u'96708369-055f-422e-abed-91bca7d16a12'}}},
 u'pedigree': {u'true_as_of_secs': 1441214997}}
None


In [83]:
# Connect to AWS S3 bucket
from boto.s3.connection import S3Connection
conn = S3Connection(AWS_ACCESS_KEY,AWS_SECRET_ACCESS_KEY)
bucket = conn.get_bucket('gu6007')

# Upload generated data avro file to S3 bucket
filename = "gHealth.avro"
print 'Uploading %s to Amazon S3 bucket %s' % (filename, bucket)

# Output to track status
import sys
def percent_cb(complete, total):
    sys.stdout.write('.')
    sys.stdout.flush()

from boto.s3.key import Key
k = Key(bucket)
k.key = filename
k.set_contents_from_filename(filename,cb=percent_cb, num_cb=10)

### Vertical Partitioning
[top](#gHealth-by-Brandon-Fetters)

In [80]:
import os
import fastavro

from cStringIO import StringIO
import pyspark as ps

sc = ps.SparkContext()
sqlContext = ps.HiveContext(sc)

In [81]:
def partition_data(datum):
    datatype = datum['dataunit'].keys()[0]
    if datatype.endswith('property'):
        return '/'.join((datatype, datum['dataunit'][datatype]['property'].keys()[0])), datum
    else:
        return datatype, datum

In [113]:
keys = sc.parallelize(bucket.get_all_keys(prefix='gHealth'))
avro_data = keys.map(lambda key: StringIO(key.get_contents_as_string()))
json_data = avro_data.flatMap(fastavro.reader)
partitioned_json = json_data.map(partition_data)
partitioned_json.cache()

PythonRDD[34] at RDD at PythonRDD.scala:43

In [119]:
# Remove old partitioned data
!rm -R /Users/bfetters/projects/github/gHealth/

In [120]:
partition_names = partitioned_json.map(lambda t: t[0]).distinct().collect()

for p in partition_names:
    path = "../gHealth/master/{}".format(p)
    if os.path.exists(path):
        print "{} exists".format(path)
    else:
        partitioned_json.filter(lambda t: t[0] == p).values().saveAsPickleFile(path)

In [121]:
# Quick validation that vertical partition was created as expected
!tree ../gHealth/

../gHealth/
└── master
    ├── activity_edge
    │   ├── _SUCCESS
    │   ├── part-00000
    │   ├── part-00001
    │   ├── part-00002
    │   └── part-00003
    ├── activity_property
    │   ├── activity_length
    │   │   ├── _SUCCESS
    │   │   ├── part-00000
    │   │   ├── part-00001
    │   │   ├── part-00002
    │   │   └── part-00003
    │   ├── activity_type
    │   │   ├── _SUCCESS
    │   │   ├── part-00000
    │   │   ├── part-00001
    │   │   ├── part-00002
    │   │   └── part-00003
    │   └── activity_unit
    │       ├── _SUCCESS
    │       ├── part-00000
    │       ├── part-00001
    │       ├── part-00002
    │       └── part-00003
    ├── equiv
    │   ├── _SUCCESS
    │   ├── part-00000
    │   ├── part-00001
    │   ├── part-00002
    │   └── part-00003
    ├── goal_edge
    │   ├── _SUCCESS
    │   ├── part-00000
    │   ├── part-00001
    │   ├── part-00002
    │   └── part-00003
    ├── goal_property
    │   ├── goal_

### Batch Views
[top](#gHealth-by-Brandon-Fetters)

#### Create Batch Views
- [User Features Aggregation](#User-Features-Aggregation)
- [Count of Activities by User by Type](#Count-of-Activities-by-User-by-Type)
- [Count of Meals by User by Type](#Count-of-Meals-by-User-by-Type)
- [Count of Activities by Location by Type](#Count-of-Activities-by-Location-by-Type)

In [122]:
from pretty import pprint

In [123]:
# For now, go ahead and create an initial rdd for each node and edge to make our lives easier
# later when we create batch views.
path = '../gHealth/master/'

# USER
user_dob_pkl = sc.pickleFile(path + 'user_property/dob')
user_loc_pkl = sc.pickleFile(path + 'user_property/location/')
user_name_pkl = sc.pickleFile(path + 'user_property/name')
user_gender_pkl = sc.pickleFile(path + 'user_property/gender')
equiv_edge_pkl = sc.pickleFile(path + 'equiv')

# ACTIVITY
act_type_pkl = sc.pickleFile(path + 'activity_property/activity_type')
act_length_pkl = sc.pickleFile(path + 'activity_property/activity_length')
act_unit_pkl = sc.pickleFile(path + 'activity_property/activity_unit')
act_edge_pkl = sc.pickleFile(path + 'activity_edge')

# MEAL
meal_name_pkl = sc.pickleFile(path + 'meal_property/meal_name')
meal_type_pkl = sc.pickleFile(path + 'meal_property/meal_type')
meal_desc_pkl = sc.pickleFile(path + 'meal_property/meal_desc')
meal_edge_pkl = sc.pickleFile(path + 'meal_edge')

# GOAL
goal_name_pkl = sc.pickleFile(path + 'goal_property/goal_name')
goal_name_pkl = sc.pickleFile(path + 'goal_property/goal_name')
goal_desc_pkl = sc.pickleFile(path + 'goal_property/goal_desc')
goal_status_pkl = sc.pickleFile(path + 'goal_property/goal_status')
goal_edge_pkl = sc.pickleFile(path + 'goal_edge')
    
# GROUP
group_name_pkl = sc.pickleFile(path + 'group_property/group_name')
group_edge_pkl = sc.pickleFile(path + 'group_edge')

##### User Features Aggregation

In [124]:
import time
from datetime import datetime,date

# TODO: Need to come back to this and make it more generic, but closing the loop
# on the end to end process first.
def user_kv_to_dict(joined_data):
    '''
    Given the cumulative joined (key,value) tuples from multiple joins we need to parse the data out
    with the intention of returning a dictionary which will provide the schema for our spark dataframe.
    
    NOTE: The method as currently constructed only works for the User Features Aggregated joined data.
    INPUTS:
        - joined_data(tuple): key-value tuple with nested value tuples from a series of joins
    OUTPUTS:
        - result_dict(dict): parsed and reformated data from user joined key-value tuple to dictionary
    '''
    (user_id,((((((((name,gender),age),(city,state)),last_ts),act_count),meal_count),goal_count),group_count)) = joined_data
    result_dict = {'user_id'    : user_id,
                   'name'       : name,
                   'gender'     : gender,
                   'age'        : age,
                   'city'       : city,
                   'state'      : state,
                   'status'     : is_active(last_ts),
                   'act_count'  : act_count,
                   'meal_count' : meal_count,
                   'goal_count' : goal_count,
                   'group_count': group_count}
    return result_dict

def calculate_age(dob):
    '''
    Given a date of birth in seconds since epoch, convert to age.
    INPUTS:
        - dob(int): date of birth stored in seconds since epoch
    OUTPUTS:
        - age(int): current age based on date of birth
    '''
    today = date.today()
    born = datetime.fromtimestamp(dob)
    age = today.year - born.year - ((today.month, today.day) < (born.month, born.day))
    return age

def is_active(ts):
    '''
    Calculates whether a given event has occurred in the last 30-60 days. If within the
    last 30 days a user is considered to be active, if within the last 60 days a user
    is considered to be inactive, and if longer than 60 days a user is considered to have
    stopped using the app.
    INPUTS:
        - ts(int): event timestamp stored as seconds since epoch
    OUTPUTS:
        - status(str): status of user given last event: [active, inactive, or churn]
    '''
    now = int(time.time())
    sixty_days = now - (60 * 86400)
    thirty_days = now - (30 * 86400)
    if ts < sixty_days:
        return 'churn'
    elif ts < thirty_days:
        return 'inactive'
    else:
        return 'active'

In [125]:
'''
--- LAST USER ACTION ---
(used later during user feature batch view aggregation)

1. Create (k,v) rdd for (user_id,pedigree) of last activity
2. Create (k,v) rdd for (user_id,pedigree) of last meal
3. Join last activity and last meal to find latest event for each user.
'''

# 1. Create (k,v) rdd for (user_id,pedigree) of last activity
last_user_act = act_edge_pkl.map(lambda datum: (datum['dataunit']['activity_edge']['user_id']['user_id'],
                                                datum['pedigree']['true_as_of_secs']))\
                            .reduceByKey(lambda ts1,ts2: ts1)
# last_user_act.take(5)

# 2. Create (k,v) rdd for (user_id,pedigree) of last meal
last_user_meal = meal_edge_pkl.map(lambda datum: (datum['dataunit']['meal_edge']['user_id']['user_id'],
                                                  datum['pedigree']['true_as_of_secs']))\
                              .reduceByKey(lambda ts1,ts2: ts1)
# last_user_meal.take(5)

# 3. Join last activity and last meal to find latest event for each user.
last_user_event = last_user_act.join(last_user_meal).map(lambda (user_id,(ts1,ts2)): (user_id,max(ts1,ts2)))
# last_user_event.take(5)

In [156]:
'''
--- USER FEATURES AGGREGATION ---
1. Create (user,name) pair rdd
2. Create (user,gender) pair rdd
3. Create (user,age) pair rdd
4. Create (user,location) pair rdd
5. Create (user,activity_count) pair rdd
6. Create (user,meal_count) pair rdd
7. Create (k,v) rdd for (user_id,goal_count)
8. Create (k,v) rdd for (user_id,group_count)
9. Join 1-8 into single rdd for "user feature" rdd
'''

# 1. Create (k,v) rdd for (user_id,user_name)
user_name_pair = user_name_pkl.map(lambda datum: (datum['dataunit']['user_property']['user_id']['user_id'],
                                                  datum['dataunit']['user_property']['property']['name']))
# pprint(user_name_pair.take(5))

# 2. Create (k,v) rdd for (user_id,gender)
user_gender_pair = user_gender_pkl.map(lambda datum: (datum['dataunit']['user_property']['user_id']['user_id'],
                                                      datum['dataunit']['user_property']['property']['gender']))
# pprint(user_gender_pair.take(5))

# 3. Create (k,v) rdd for (user_id,(dob,age))
user_dob_pair = user_dob_pkl.map(lambda datum: (datum['dataunit']['user_property']['user_id']['user_id'],
                                                datum['dataunit']['user_property']['property']['dob']))\
                            .map(lambda (user_id,dob): (user_id,calculate_age(dob)))
# pprint(user_dob_pair.take(5))

# 4. Create (k,v) rdd for (user_id,(city,state))
user_loc_pair = user_loc_pkl.map(lambda datum: (datum['dataunit']['user_property']['user_id']['user_id'],
                                                (datum['dataunit']['user_property']['property']['location']['city'],
                                                 datum['dataunit']['user_property']['property']['location']['state'])))
# pprint(user_loc_pair.take(5))

# 5. Create (k,v) rdd for (user_id,act_count)
user_act_count = act_edge_pkl.map(lambda datum: (datum['dataunit']['activity_edge']['user_id']['user_id'],1))\
                             .reduceByKey(lambda cnt1,cnt2: cnt1 + cnt2)
# pprint(user_act_count.take(5))

# 6. Create (k,v) rdd for (user_id,meal_count)
user_meal_count = meal_edge_pkl.map(lambda datum: (datum['dataunit']['meal_edge']['user_id']['user_id'],1))\
                               .reduceByKey(lambda cnt1,cnt2: cnt1 + cnt2)
# pprint(user_meal_count.take(5))

# 7. Create (k,v) rdd for (user_id,goal_count)
user_goal_count = goal_edge_pkl.map(lambda datum: (datum['dataunit']['goal_edge']['user_id']['user_id'],1))\
                               .reduceByKey(lambda cnt1,cnt2: cnt1 + cnt2)
# pprint(user_goal_count.take(5))

# 8. Create (k,v) rdd for (user_id,group_count)
user_group_count = group_edge_pkl.map(lambda datum: (datum['dataunit']['group_edge']['user_id']['user_id'],1))\
                               .reduceByKey(lambda cnt1,cnt2: cnt1 + cnt2)
# pprint(user_group_count.take(5))

# 9. Join all the user features into single aggregated rdd
users_pair = user_name_pair.join(user_gender_pair)\
                           .join(user_dob_pair)\
                           .join(user_loc_pair)\
                           .join(last_user_event)\
                           .leftOuterJoin(user_act_count)\
                           .leftOuterJoin(user_meal_count)\
                           .leftOuterJoin(user_goal_count)\
                           .leftOuterJoin(user_group_count)\
                           .map(lambda joined_data: user_kv_to_dict(joined_data))
# pprint(users_pair.take(5))

# Read in rdd to create DataFrame, register table, and print schema for validation
users_df = sqlContext.createDataFrame(users_pair.collect())
users_df.registerTempTable('users')

# Print schema and head of dataframe for validation
users_df.printSchema()
user_features = sqlContext.sql('select name,gender,age,city,state,act_count, \
                                       meal_count,goal_count,group_count,status \
                                from users order by user_id')
user_features.show(10)

# Write batch view to parquet file
user_features.write.parquet('../gHealth/batch/user_features/', mode='overwrite')

root
 |-- act_count: long (nullable = true)
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- goal_count: long (nullable = true)
 |-- group_count: long (nullable = true)
 |-- meal_count: long (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- status: string (nullable = true)
 |-- user_id: string (nullable = true)

+-------------+------+---+-------------+-----+---------+----------+----------+-----------+--------+
|         name|gender|age|         city|state|act_count|meal_count|goal_count|group_count|  status|
+-------------+------+---+-------------+-----+---------+----------+----------+-----------+--------+
|  Bettie Holt|FEMALE| 22|       Starke|   FL|        5|         7|      null|          2|  active|
|   Jill Flynn|FEMALE| 21|     Thompson|   OH|        8|        10|      null|          1|   churn|
|Robert Foster|  MALE| 21|    Lafayette|   MN|        7|        12|      null

##### Count of Activities by User by Type

In [157]:
'''
1. Create (k,v) rdd for (activity_id,user_id)
2. Create (k,v) rdd for (activity_id,activity_type)
3. Create (k,v) rdd for (user_id,(activity_type,count))
4. Join with user_name_pair to get name and output as dict
'''

# 1. Create (k,v) rdd for (activity_id,user_id)
act_user_id_pair = act_edge_pkl.map(lambda datum: (datum['dataunit']['activity_edge']['activity_id']['activity_id'],
                                                   datum['dataunit']['activity_edge']['user_id']['user_id']))
# pprint(act_user_id_pair.take(5))

# 2. Create (k,v) rdd for (activity_id,activity_type)
act_id_type_pair = act_type_pkl.map(lambda datum: (datum['dataunit']['activity_property']['activity_id']['activity_id'],
                                                   datum['dataunit']['activity_property']['property']['activity_type']))
# pprint(act_id_type_pair.take(5))

# 3. Create (k,v) rdd for (user_id,(activity_type,count))
user_act_type_pair = act_user_id_pair.join(act_id_type_pair)\
                                     .map(lambda (act_id,(user_id,act_type)): ((user_id,act_type),1))\
                                     .reduceByKey(lambda cnt1,cnt2: cnt1 + cnt2)\
                                     .map(lambda ((user_id,act_type),count): (user_id,(act_type,count)))
# pprint(user_act_type_pair.take(5))

# 4. Join with user_name_pair to get name and output as dict
act_type_by_user = user_name_pair.join(user_act_type_pair)\
                                 .map(lambda (user_id,(name,(act_type,count))): {'user_id': user_id,
                                                                                 'name': name,
                                                                                 'act_type': act_type,
                                                                                 'count': count})
# pprint(act_by_user.take(5))

# # Read in rdd to create DataFrame, register table, and print schema for validation
act_type_by_user_df = sqlContext.createDataFrame(act_type_by_user.collect())
act_type_by_user_df.registerTempTable('act_by_user')

# Print schema and head of dataframe for validation
act_type_by_user_df.printSchema()
activities_user = sqlContext.sql('select name,act_type,count from act_by_user')
activities_user.show(10)

# Write batch view to parquet file
activities_user.write.parquet('../gHealth/batch/activities_user/', mode='overwrite')

root
 |-- act_type: string (nullable = true)
 |-- count: long (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)

+------------+--------+-----+
|        name|act_type|count|
+------------+--------+-----+
|David Wilson|Swimming|    2|
|David Wilson|  Rowing|    1|
|David Wilson| Skating|    1|
|David Wilson|   Other|    1|
|David Wilson| Walking|    1|
|David Wilson|  Hiking|    1|
|David Wilson| Cycling|    1|
|   Adam Koch| Walking|    1|
|   Adam Koch| Cycling|    1|
|   Adam Koch| Running|    2|
+------------+--------+-----+



##### Count of Meals by User by Type

In [158]:
'''
1. Create (k,v) rdd for (meal_id,user_id)
2. Create (k,v) rdd for (meal_id,meal_type)
3. Create (k,v) rdd for (user_id,(meal_type,count))
4. Join with user_name_pair to get name and output as dict
'''

# 1. Create (k,v) rdd for (user_id,meal_id)
meal_user_id_pair = meal_edge_pkl.map(lambda datum: (datum['dataunit']['meal_edge']['meal_id']['meal_id'],
                                                     datum['dataunit']['meal_edge']['user_id']['user_id']))
# pprint(meal_user_id_pair.take(5))

# 2. Create (k,v) rdd for (meal_id,meal_type)
meal_id_type_pair = meal_type_pkl.map(lambda datum: (datum['dataunit']['meal_property']['meal_id']['meal_id'],
                                                     datum['dataunit']['meal_property']['property']['meal_type']))
# pprint(meal_id_type_pair.take(5))

# 3. Create (k,v) rdd for (user_id,(meal_type,count))
user_meal_type_pair = meal_user_id_pair.join(meal_id_type_pair)\
                                       .map(lambda (meal_id,(user_id,meal_type)): ((user_id,meal_type),1))\
                                       .reduceByKey(lambda cnt1,cnt2: cnt1 + cnt2)\
                                       .map(lambda ((user_id,meal_type),count): (user_id,(meal_type,count)))
# pprint(user_meal_type_pair.take(5))

# 4. Join with user_name_pair to get name and output as dict
meal_type_by_user = user_name_pair.join(user_meal_type_pair)\
                                   .map(lambda (user_id,(name,(meal_type,count))): {'user_id': user_id,
                                                                                    'name': name,
                                                                                    'meal_type': meal_type,
                                                                                    'count': count})
# pprint(meal_type_by_user.take(5))

# Read in rdd to create DataFrame, register table, and print schema for validation
meal_type_by_user_df = sqlContext.createDataFrame(meal_type_by_user.collect())
meal_type_by_user_df.registerTempTable('meal_by_user')

# Print schema and head of dataframe for validation
meal_type_by_user_df.printSchema()
meals_user = sqlContext.sql('select name,meal_type,count from meal_by_user')
meals_user.show(10)

# Write batch view to parquet file
meals_user.write.parquet('../gHealth/batch/meals_user/', mode='overwrite')

root
 |-- count: long (nullable = true)
 |-- meal_type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)

+-------------+---------+-----+
|         name|meal_type|count|
+-------------+---------+-----+
| David Wilson|    Snack|    3|
| David Wilson|    Lunch|    1|
| David Wilson|Breakfast|    2|
|    Adam Koch|    Lunch|    2|
|    Adam Koch|   Dinner|    2|
|    Adam Koch|    Snack|    3|
|Mary Armstead|Breakfast|    3|
|Mary Armstead|    Snack|    1|
|Mary Armstead|    Lunch|    1|
|Mary Armstead|   Dinner|    3|
+-------------+---------+-----+



##### Count of Activities by Location by Type

In [159]:
'''
1. Create (user,location) pair rdd
2. Join with earlier created user_act_type_pair
'''

# 1. Create (k,v) rdd for (user_id,(city,state))
user_loc_pair = user_loc_pkl.map(lambda datum: (datum['dataunit']['user_property']['user_id']['user_id'],
                                                (datum['dataunit']['user_property']['property']['location']['city'],
                                                 datum['dataunit']['user_property']['property']['location']['state'])))
# pprint(user_loc_pair.take(5))

# 2. Create (k,v) rdd for (location,(activity_type,count))
act_type_per_loc = user_loc_pair.join(user_act_type_pair)\
                                .map(lambda (user_id,(location,(act_type, count))): ((location,act_type),count))\
                                .reduceByKey(lambda cnt1,cnt2: cnt1 + cnt2)\
                                .map(lambda ((location,act_type),count): (location,(act_type,count)))\
                                .sortBy(lambda (location,act_count): location,ascending=True)\
                                .map(lambda ((city,state),(act_type,count)): {'city':city,
                                                                              'state':state,
                                                                              'act_type':act_type,
                                                                              'count':count})
                        
# pprint(act_type_per_loc.take(5))

# Read in rdd to create DataFrame, register table, and print schema for validation
act_type_per_loc_df = sqlContext.createDataFrame(act_type_per_loc)
act_type_per_loc_df.registerTempTable('act_by_loc')

# Print schema and head of dataframe for validation
act_type_per_loc_df.printSchema()
activities_loc = sqlContext.sql('select state,city,act_type,count from act_by_loc')
activities_loc.show(10)

# Write batch view to parquet file
activities_loc.write.parquet('../gHealth/batch/activities_loc/', mode='overwrite')

root
 |-- act_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- count: long (nullable = true)
 |-- state: string (nullable = true)

+-----+-----+---------------+-----+
|state| city|       act_type|count|
+-----+-----+---------------+-----+
|   SC|Aiken|        Walking|    1|
|   SC|Aiken|          Other|    1|
|   SC|Aiken|         Rowing|    1|
|   SC|Aiken|        Cycling|    1|
|   OH|Akron|        Skating|    1|
|   OH|Akron|       Crossfit|    1|
|   TX|Alice|        Running|    3|
|   TX|Alice|Weight Training|    3|
|   TX|Alice|        Cycling|    3|
|   TX|Alice|         Rowing|    3|
+-----+-----+---------------+-----+



#### Query Batch Views

In [160]:
''' User Features '''
# Load user features batch view, register, and query
# users = sqlContext.read.parquet('../gHealth/batch/user_features')
# users.registerTempTable("users");

print "Count of users by state"
sqlContext.sql("""select state,count(*) as total
                  from users
                  group by state
                  order by total desc""").show()

print "Average age of users by state"
sqlContext.sql("""select state,floor(avg(age)) as avg_age
                  from users
                  group by state
                  order by avg_age desc""").show()

print "Customer Churn Metrics"
sqlContext.sql("""select status,count(*) as count
                  from users
                  group by status""").show()

Count of users by state
+-----+-----+
|state|total|
+-----+-----+
|   PA|  166|
|   TX|  153|
|   CA|  141|
|   NY|  118|
|   OH|  103|
|   IL|   96|
|   WV|   80|
|   NJ|   76|
|   MN|   69|
|   MA|   69|
|   AL|   68|
|   MO|   68|
|   NE|   66|
|   FL|   65|
|   WI|   64|
|   LA|   63|
|   NC|   62|
|   VA|   58|
|   MI|   57|
|   GA|   56|
+-----+-----+

Average age of users by state
+-----+-------+
|state|avg_age|
+-----+-------+
|   DC|     46|
|   SD|     39|
|   AR|     37|
|   DE|     37|
|   NM|     36|
|   IA|     35|
|   MS|     35|
|   CO|     35|
|   SC|     34|
|   IL|     34|
|   WV|     34|
|   VT|     34|
|   OK|     34|
|   VA|     33|
|   NH|     33|
|   ND|     33|
|   NJ|     33|
|   AK|     33|
|   WY|     33|
|   AZ|     33|
+-----+-------+

Customer Churn Metrics
+--------+-----+
|  status|count|
+--------+-----+
|  active| 1631|
|   churn|  221|
|inactive|  642|
+--------+-----+



In [163]:
''' Activities by User '''
# Load activities by user batch view, register, and query
activities_user = sqlContext.read.parquet('../gHealth/batch/activities_user')
activities_user.registerTempTable("activities_user");

print "Count of Activities by User > 3"
sqlContext.sql("""select name,act_type,count
                  from activities_user
                  where count > 3""").show()

Count of Activities by User > 3
+-----------------+--------------------+-----+
|             name|            act_type|count|
+-----------------+--------------------+-----+
|    Thomas Barris|     Weight Training|    4|
|    Karen Mcateer|        Snowboarding|    5|
|       Lula Davis|Cross Country Skiing|    4|
|   Donald Herring|     Mountain Biking|    4|
|      Betty Drake|             Skating|    4|
|     Joseph Neese|            Crossfit|    4|
|      Nancy Magee|             Running|    4|
|  Carolyn Tindell|            Crossfit|    4|
|     Kelly Monroy|        Snowboarding|    4|
|   Thelma Leatham|             Running|    4|
|  Charles Carroll|        Snowboarding|    4|
|       Penny Cahn|     Downhill Skiing|    4|
|   Kevin Fountain|             Skating|    4|
|    Kim Alexander|             Running|    4|
|Thomasina Simmons|            Crossfit|    5|
|    Linda Calumag|            Swimming|    5|
|    Randy Compton|             Running|    4|
|      Janet Inman|     Down

In [164]:
''' Activities by Location '''
# Load activities by location batch view, register, and query
activities_loc = sqlContext.read.parquet('../gHealth/batch/activities_loc')
activities_loc.registerTempTable("activities_loc");

print "Most popular activity (or activities) in each state"
act_cnt_by_state = sqlContext.sql("""select state,act_type,count(*) as cnt
                                     from activities_loc
                                     group by state,act_type
                                     order by state,act_type,cnt desc""")
act_cnt_by_state.registerTempTable("act_cnt_by_state");

sqlContext.sql("""select acbs2.state,acbs2.act_type,acbs2.cnt
                  from (select state,max(cnt) as max_cnt
                        from act_cnt_by_state
                        group by state
                        order by state) as acbs1
                  inner join act_cnt_by_state as acbs2
                  on acbs2.state = acbs1.state and acbs2.cnt = acbs1.max_cnt
                  order by acbs2.state""").show()

Most popular activity (or activities) in each state
+-----+--------------------+---+
|state|            act_type|cnt|
+-----+--------------------+---+
|   AK|             Cycling|  4|
|   AL|            Crossfit| 12|
|   AL|        Snowboarding| 12|
|   AL|             Walking| 12|
|   AL|     Downhill Skiing| 12|
|   AR|               Other|  6|
|   AR|     Downhill Skiing|  6|
|   AR|             Walking|  6|
|   AR|Cross Country Skiing|  6|
|   AZ|              Rowing|  5|
|   AZ|            Crossfit|  5|
|   AZ|     Weight Training|  5|
|   AZ|     Mountain Biking|  5|
|   AZ|             Cycling|  5|
|   CA|     Weight Training| 24|
|   CO|               Other|  6|
|   CO|              Hiking|  6|
|   CO|     Downhill Skiing|  6|
|   CO|             Cycling|  6|
|   CO|             Running|  6|
+-----+--------------------+---+



### Speed Layer
[top](#gHealth-by-Brandon-Fetters)

In [None]:
# ./bin/zookeeper-server-start.sh config/zookeeper.properties
# ./bin/kafka-server-start.sh config/server.properties

In [1]:
import io, random, threading, logging, time, json

import avro.io
import avro.schema

from kafka.client   import KafkaClient
from kafka.consumer import KafkaConsumer
from kafka.producer import SimpleProducer

from gen_gHealth_data import generate_data

KAFKA_TOPIC = 'gHealth'

def get_next_event():
    new_data = generate_data(n_users=1,n_meals=0,n_activities=0,gen_goals=False,gen_groups=False)
    return new_data

class Producer(threading.Thread):
    '''Produces login events and publishes them to Kafka topic.'''
    daemon = True
    def run(self):
        client = KafkaClient('localhost:9092')
        producer = SimpleProducer(client)
        while True:
            new_data = get_next_event()
            for datum in new_data:
                json_datum = json.dumps(datum).encode('utf-8')
                producer.send_messages(KAFKA_TOPIC, json_datum)
                time.sleep(1)

#### When integrating with Spark the KafkaConsumer is replace by Spark Streaming
# class Consumer(threading.Thread):
#     '''Consumes users from Kafka topic.'''
#     pass

def main():
    '''Starts producer and consumer threads.'''
    threads = [ Producer() ]
    for t in threads: t.start()
    time.sleep(1)

if __name__ == '__main__':
    logging.basicConfig(
        format='%(asctime)s.%(name)s:%(message)s',
        level=logging.DEBUG)
    main()

In [7]:
%%file kafka_gHealth.py

from __future__ import print_function

import sys,json, happybase, datetime

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

KAFKA_TOPIC = 'gHealth'

def partition_data(datum):
    datatype = datum['dataunit'].keys()[0]
    if datatype.endswith('property'):
        return '/'.join((datatype, datum['dataunit'][datatype]['property'].keys()[0])), datum
    else:
        return datatype, datum

def isLocation(data):
    return ('location' in str(data['dataunit']))
    
def reformat_for_hbase(data):
    ts = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%dT%H')
    
    state,count = data
    
    rowkey = state + ':' + ts
    value = state + ',' + str(count)
    row = [rowkey,'d','statecount',value]
    
    return (rowkey,row)
    
if __name__ == "__main__":
    
    sc = SparkContext(appName="PythonStreamingKafka_gHealth")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic = 'localhost:2181', KAFKA_TOPIC
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    
    table = 'user_loc'
    
    conf = {"hbase.zookeeper.quorum": zkQuorum,
            "hbase.mapred.outputtable": table,
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"    
    
    ds = kvs.window(windowDuration=100,slideDuration=10)\
            .map(lambda dataunit: (partition_data(json.loads(dataunit[1]))[1]))\
            .filter(lambda datum: isLocation(datum))\
            .map(lambda datum: (datum['dataunit']['user_property']['property']['location']['state'],1))\
            .reduceByKey(lambda cnt1,cnt2: cnt1+cnt2)\
            .map(lambda result: reformat_for_hbase(result))\
            .foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopDataset(conf=conf,
                                                                  keyConverter=keyConv,
                                                                  valueConverter=valueConv))
    
    ssc.start()
    ssc.awaitTermination()

Overwriting kafka_gHealth.py


```
$SPARK_HOME/bin/spark-submit \
    --driver-class-path spark-1.4.1-bin-hadoop2.4/lib/spark-examples-1.4.1-hadoop2.4.0.jar \
    --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.1 \
    /Users/bfetters/projects/github/DSCI6007-student/kafka_gHealth.py \
    localhost:2181 gHealth
```

In [2]:
!/Users/bfetters/hbase-1.1.2/bin/start-hbase.sh
!/Users/bfetters/hbase-1.1.2/bin/hbase-daemon.sh start thrift

starting master, logging to /Users/bfetters/hbase-1.1.2/bin/../logs/hbase-bfetters-master-m4cb00kpr0.local.out
starting thrift, logging to /Users/bfetters/hbase-1.1.2/bin/../logs/hbase-bfetters-thrift-m4cb00kpr0.local.out


In [2]:
# !/Users/bfetters/hbase-1.1.2/bin/stop-hbase.sh
# !/Users/bfetters/hbase-1.1.2/bin/hbase-daemon.sh stop thrift

In [59]:
import happybase
def read_from_hbase(tablename):
    connection = happybase.Connection(host='localhost')
    table = connection.table(tablename)
    for row in table.scan():
        print row
        
read_from_hbase('user_count_by_state')