In [1]:
ls ../../input

input.txt


In [2]:
from pyspark import SparkContext
import json

# Import pyspark etl methods from separate file to develop test-driven development
from methods_pyspark_etl import *

import os

In [3]:
path = "../../input/input.txt"
sc = None
# Create a SparkContext object, main entry point for accessing a cluster
sc = SparkContext()
# Create an RDD of the lines in the csv given its path
csv_lines = sc.textFile(path)

In [4]:
# Check the version of Spark
sc.version

u'2.0.2'

In [5]:
# View the first 10 lines in the RDD
csv_lines.take(10)

[u'{"last_name": "Thompson", "event_time": "2017-02-23T16:59:24.755Z", "verb": "NEW", "key": "9bc11d8ac1dd", "adr_state": "CO", "adr_city": "North Johnny", "type": "CUSTOMER"}',
 u'{"event_time": "2017-02-17T16:59:24.770Z", "tags": {"some key": "some value"}, "verb": "NEW", "key": "9f69ce6df5a1", "customer_id": "9bc11d8ac1dd", "type": "SITE_VISIT"}',
 u'{"camera_make": "Olympus", "event_time": "2017-02-24T16:59:24.807Z", "camera_model": "FE-45", "verb": "UPLOAD", "key": "1bd83b6247c9", "customer_id": "9bc11d8ac1dd", "type": "IMAGE"}',
 u'{"total_amount": "48.64 USD", "event_time": "2017-02-19T16:59:24.835Z", "verb": "NEW", "key": "c12ad0d93a2b", "customer_id": "9bc11d8ac1dd", "type": "ORDER"}',
 u'{"last_name": "Jones", "event_time": "2017-02-14T16:59:24.864Z", "verb": "NEW", "key": "136395ff6a28", "adr_state": "DC", "adr_city": "Colebury", "type": "CUSTOMER"}',
 u'{"event_time": "2017-02-13T16:59:24.878Z", "tags": {"some key": "some value"}, "verb": "NEW", "key": "a1e844194542", "cust

In [6]:
# Use a map transformation on the RDD of lines to clean out trailing brackets and commas from each line
list_of_lines = csv_lines.map(lambda line: line.replace('[','').strip(',').replace(']',''))
# Print first line
print list_of_lines.first()

{"last_name": "Thompson", "event_time": "2017-02-23T16:59:24.755Z", "verb": "NEW", "key": "9bc11d8ac1dd", "adr_state": "CO", "adr_city": "North Johnny", "type": "CUSTOMER"}


In [7]:
# Map transform lines into json objects
data = list_of_lines.map(lambda line: json.dumps(json.loads(line)))
data.take(10)

['{"last_name": "Thompson", "event_time": "2017-02-23T16:59:24.755Z", "verb": "NEW", "key": "9bc11d8ac1dd", "adr_state": "CO", "adr_city": "North Johnny", "type": "CUSTOMER"}',
 '{"event_time": "2017-02-17T16:59:24.770Z", "tags": {"some key": "some value"}, "verb": "NEW", "key": "9f69ce6df5a1", "customer_id": "9bc11d8ac1dd", "type": "SITE_VISIT"}',
 '{"camera_make": "Olympus", "event_time": "2017-02-24T16:59:24.807Z", "camera_model": "FE-45", "verb": "UPLOAD", "key": "1bd83b6247c9", "customer_id": "9bc11d8ac1dd", "type": "IMAGE"}',
 '{"total_amount": "48.64 USD", "event_time": "2017-02-19T16:59:24.835Z", "verb": "NEW", "key": "c12ad0d93a2b", "customer_id": "9bc11d8ac1dd", "type": "ORDER"}',
 '{"last_name": "Jones", "event_time": "2017-02-14T16:59:24.864Z", "verb": "NEW", "key": "136395ff6a28", "adr_state": "DC", "adr_city": "Colebury", "type": "CUSTOMER"}',
 '{"event_time": "2017-02-13T16:59:24.878Z", "tags": {"some key": "some value"}, "verb": "NEW", "key": "a1e844194542", "customer_i

In [8]:
# Check data types for RDDs
print type(csv_lines)
print type(list_of_lines)
print type(data)

<class 'pyspark.rdd.RDD'>
<class 'pyspark.rdd.PipelinedRDD'>
<class 'pyspark.rdd.PipelinedRDD'>


## methods_pyspark_etl.customer_key_to_id(line)
#### Extracts customer id for each event

    def customer_key_to_id(line):
        event = json.loads(line)
        if event['type'] == 'CUSTOMER':
            customer_id = event['key']
        else:
            customer_id = event['customer_id']
        return customer_id

In [9]:
# Group events by customer id
grouped_lines = list_of_lines.groupBy(lambda line: customer_key_to_id(line))
# Create tuples of grouped events where each tuple consists of a customer id and a list of events
byCustomer = grouped_lines.map(lambda x: (x[0], list(x[1])))
byCustomer.first()

(u'59e1334b00e9',
 [u'{"last_name": "Garcia", "event_time": "2017-02-24T16:59:28.397Z", "verb": "NEW", "key": "59e1334b00e9", "adr_state": "UT", "adr_city": "Carlosfurt", "type": "CUSTOMER"}',
  u'{"event_time": "2017-02-14T16:59:28.416Z", "tags": {"some key": "some value"}, "verb": "NEW", "key": "5ffe558928e7", "customer_id": "59e1334b00e9", "type": "SITE_VISIT"}',
  u'{"camera_make": "Kodak", "event_time": "2017-02-22T16:59:28.445Z", "camera_model": "EasyShare Z981", "verb": "UPLOAD", "key": "f6701c05b03f", "customer_id": "59e1334b00e9", "type": "IMAGE"}',
  u'{"total_amount": "60.04 USD", "event_time": "2017-02-23T16:59:28.480Z", "verb": "NEW", "key": "3c0387d94366", "customer_id": "59e1334b00e9", "type": "ORDER"}',
  u'{"last_name": "Garcia", "event_time": "2017-02-24T16:59:28.397Z", "verb": "NEW", "key": "59e1334b00e9", "adr_state": "UT", "adr_city": "Carlosfurt", "type": "CUSTOMER"}',
  u'{"event_time": "2017-02-11T16:59:28.494Z", "tags": {"some key": "some value"}, "verb": "NEW"

## methods_pyspark_etl.sort_events_by_datetime(events_list)
#### Sorts a list of events by ascending datetime

    def sort_events_by_datetime(events_list):
        time_event_tuples = [(json.loads(event)['event_time'],event) for event in events_list]
        time_event_tuples.sort()
        return time_event_tuples

In [10]:
# Group events by customer id
grouped_lines = list_of_lines.groupBy(lambda line: customer_key_to_id(line))
# Apply sorting by datetime to list of events tuples per customer id
byCustomer = grouped_lines.map(lambda x: ( x[0],sort_events_by_datetime(list(x[1])) ) )
byCustomer.first()

(u'59e1334b00e9',
 [(u'2017-02-10T16:59:28.634Z',
   u'{"total_amount": "52.38 USD", "event_time": "2017-02-10T16:59:28.634Z", "verb": "NEW", "key": "e2ddccae25e9", "customer_id": "59e1334b00e9", "type": "ORDER"}'),
  (u'2017-02-11T16:59:28.494Z',
   u'{"event_time": "2017-02-11T16:59:28.494Z", "tags": {"some key": "some value"}, "verb": "NEW", "key": "19ae5aefa034", "customer_id": "59e1334b00e9", "type": "SITE_VISIT"}'),
  (u'2017-02-13T16:59:28.522Z',
   u'{"camera_make": "Fujifilm", "event_time": "2017-02-13T16:59:28.522Z", "camera_model": "FinePix A150", "verb": "UPLOAD", "key": "dedd650bfa31", "customer_id": "59e1334b00e9", "type": "IMAGE"}'),
  (u'2017-02-14T16:59:28.416Z',
   u'{"event_time": "2017-02-14T16:59:28.416Z", "tags": {"some key": "some value"}, "verb": "NEW", "key": "5ffe558928e7", "customer_id": "59e1334b00e9", "type": "SITE_VISIT"}'),
  (u'2017-02-19T16:59:28.603Z',
   u'{"camera_make": "Kodak", "event_time": "2017-02-19T16:59:28.603Z", "camera_model": "EasyShare Z9

In [12]:
type(grouped_lines)

pyspark.rdd.PipelinedRDD

## Main ETL Methods
* **methods_pyspark_etl.parse_event_time(event_time)** Parse and transform datetime to format "YYYY-mm-dd HH:MM:SS"
* **methods_pyspark_etl.dt_to_day(dt)** Return day of the week from date "YYYY-mm-dd"
* **methods_pyspark_etl.str_to_dt_var(str_dt)** Transform string "YYYY-mm-dd" to date variable
* **methods_pyspark_etl.dt_var_to_str(dt_var)** Transform date variable to string "YYYY-mm-dd"
* **methods_pyspark_etl.week_start_end(week_range)** Create a list of tuples of date of week start and week end
* **methods_pyspark_etl.week_is_number(date,list_of_week_start_end)** Determine an integer for a week number for a date falling within a week start/end tuple
* **methods_pyspark_etl.sessionize_datetime_data(session_data)** Define week numbers and session numbers for consecutive events occuring within 30 minutes of each other
* **methods_pyspark_etl.sessionize(time_event_tuples)**  Transform event tuples into tuples of week_numbers,session_numbers,events


    def parse_event_time(event_time):
        dt,tm = event_time.strip('Z').split('T')
        t = ':'.join(tm.split(':')[:3])
        hms = t.split('.')[0]
        ymd_hms = ' '.join([dt,hms])
        return dt,t,ymd_hms

    def dt_to_day(dt,fmt="%Y-%m-%d"):
        return datetime.strptime(dt,fmt).strftime('%A')

    def str_to_dt_var(str_dt,fmt="%Y-%m-%d %H:%M:%S"):
        return datetime.strptime(str_dt,fmt)

    def dt_var_to_str(dt_var,fmt="%Y-%m-%d %H:%M:%S"):
        return datetime.strftime(dt_var,fmt)

    def week_start_end(week_range):
        list_of_week_start_end = []
        start = week_range[0]
        end = dt_var_to_str(str_to_dt_var(start,fmt="%Y-%m-%d")+timedelta(days=6),
                            fmt="%Y-%m-%d")
        list_of_week_start_end.append((start,end))
        while end < week_range[-1]:
            start = dt_var_to_str(str_to_dt_var(end,fmt="%Y-%m-%d")+timedelta(days=1),
                            fmt="%Y-%m-%d")
            end = dt_var_to_str(str_to_dt_var(start,fmt="%Y-%m-%d")+timedelta(days=6),
                            fmt="%Y-%m-%d")
            list_of_week_start_end.append((start,end))    
        return list_of_week_start_end

    def week_is_number(date,list_of_week_start_end):
        dt_var = str_to_dt_var(date,fmt="%Y-%m-%d")
        week_number = [i for i in range(len(list_of_week_start_end)) 
                       if dt_var >= str_to_dt_var(list_of_week_start_end[i][0],fmt="%Y-%m-%d")
                         and dt_var <= str_to_dt_var(list_of_week_start_end[i][1],fmt="%Y-%m-%d")][0]
        return week_number

    def sessionize_datetime_data(session_data,minute_window=30,
                                 week_def={'Sunday':0,
                                       'Monday':1,
                                       'Tuesday':2,
                                       'Wednesday':3,
                                       'Thursday':4,
                                       'Friday':5,
                                       'Saturday':6},
                                 hdr=['date','day','session_start','prior_session_start'
                                      ,'minutes_since_prior_session']):



        week_numbers = [0]
        initial_day = dict(zip(hdr,session_data[0][0]))['day']
        initial_date = dict(zip(hdr,session_data[0][0]))['date']
        final_date = dict(zip(hdr,session_data[-1][0]))['date']
        week_range = [initial_date,final_date]
        n = 0
        session_numbers = [n]

        # For initial week, back up start to the defined week start day if the day is not already week start day
        """ TODO:  Use algorithm to scroll integers for a given week start day, then populated week definition dict
        See Example:
         In[]   idx = [x for x in range(7)] 
         In[]   print idx
         [0, 1, 2, 3, 4, 5, 6]
         In[]   for i in range(len(idx)):
                    print i,[(x-i)%len(idx) for x in idx]
         0 [0, 1, 2, 3, 4, 5, 6]
         1 [6, 0, 1, 2, 3, 4, 5]
         2 [5, 6, 0, 1, 2, 3, 4]
         3 [4, 5, 6, 0, 1, 2, 3]
         4 [3, 4, 5, 6, 0, 1, 2]
         5 [2, 3, 4, 5, 6, 0, 1]
         6 [1, 2, 3, 4, 5, 6, 0]
         """

        for k,v in week_def.items():
            if v == 0:
                week_start = k
        if initial_day != week_start:
            initial_dt_var = str_to_dt_var(initial_date,fmt="%Y-%m-%d")-timedelta(days=week_def[initial_day])
            week_range[0] = dt_var_to_str(initial_dt_var,fmt="%Y-%m-%d")                           
        list_of_week_start_end = week_start_end(week_range)
        for i in range(1,len(session_data)):
            session_dict = dict(zip(hdr,session_data[i][0]))
            minutes_since_prior_session = session_dict['minutes_since_prior_session']
            if minutes_since_prior_session <= minute_window:
                session_numbers.append(n)
            else:
                n += 1
                session_numbers.append(n)            
            date = session_dict['date']
            week_number = week_is_number(date,list_of_week_start_end)
            week_numbers.append(week_number)

        return week_numbers,session_numbers

        

    def sessionize(time_event_tuples):
        session_data = []
        initial_event_time,initial_event = time_event_tuples[0]
        initial_dt,initial_t,initial_ymd_hms = parse_event_time(initial_event_time)
        initial_d = dt_to_day(initial_dt)
        session_data.append(([initial_dt,initial_d,initial_t,None,None],initial_event))
        for i in range(1,len(time_event_tuples)):
            event_time,event = time_event_tuples[i]
            prior_event_time,prior_event = time_event_tuples[i-1]
            dt,t,ymd_hms = parse_event_time(event_time)
            d = dt_to_day(dt)
            prior_dt,prior_t,prior_ymd_hms = parse_event_time(prior_event_time)
            tdelta = (str_to_dt_var(ymd_hms,fmt="%Y-%m-%d %H:%M:%S")
                                     -str_to_dt_var(prior_ymd_hms,fmt="%Y-%m-%d %H:%M:%S"))
            seconds_since_prior_t = tdelta.seconds + tdelta.days * 86400.0
            minutes_since_prior_t = seconds_since_prior_t / 60.0
            session_data.append(([dt,d,t,prior_t,minutes_since_prior_t],event))
        
        week_numbers,session_numbers = sessionize_datetime_data(session_data)
        events = [session_events[1] for session_events in session_data]
        return [zip(week_numbers,session_numbers,events)]


In [17]:
def mapCustomerSession_to_Rows(byCustomerSession_tuples):
    rows = []
    customer_id,session_data = byCustomerSession_tuples
    session_data = session_data[0]
    for event_tup in session_data:
        row = []
        row.append(customer_id)
        week_id = event_tup[0]
        row.append(week_id)
        visit_id = event_tup[1]
        row.append(visit_id)
        event_dict = json.loads(event_tup[2])
        event_type = event_dict['type']
        if event_type == 'ORDER':
            amount = float(event_dict['total_amount'].split()[0])
            eventIsOrder = True
        else:
            amount = 0.0
            eventIsOrder = False
        row.append(eventIsOrder)
        row.append(amount)
        rows.append(row)
    return rows

In [16]:
byCustomerSession = byCustomer.map(lambda x: (x[0],sessionize(x[1])) )
byCustomerSession.take(5)

[(u'59e1334b00e9',
  [[(0,
     0,
     u'{"total_amount": "52.38 USD", "event_time": "2017-02-10T16:59:28.634Z", "verb": "NEW", "key": "e2ddccae25e9", "customer_id": "59e1334b00e9", "type": "ORDER"}'),
    (0,
     1,
     u'{"event_time": "2017-02-11T16:59:28.494Z", "tags": {"some key": "some value"}, "verb": "NEW", "key": "19ae5aefa034", "customer_id": "59e1334b00e9", "type": "SITE_VISIT"}'),
    (1,
     2,
     u'{"camera_make": "Fujifilm", "event_time": "2017-02-13T16:59:28.522Z", "camera_model": "FinePix A150", "verb": "UPLOAD", "key": "dedd650bfa31", "customer_id": "59e1334b00e9", "type": "IMAGE"}'),
    (1,
     3,
     u'{"event_time": "2017-02-14T16:59:28.416Z", "tags": {"some key": "some value"}, "verb": "NEW", "key": "5ffe558928e7", "customer_id": "59e1334b00e9", "type": "SITE_VISIT"}'),
    (2,
     4,
     u'{"camera_make": "Kodak", "event_time": "2017-02-19T16:59:28.603Z", "camera_model": "EasyShare Z981", "verb": "UPLOAD", "key": "b1eb09c2258b", "customer_id": "59e1334

In [18]:
customerAmountsPerWkandVisit = byCustomerSession.map(lambda x: mapCustomerSession_to_Rows(x))
customerAmountsPerWkandVisit.take(5)

[[[u'59e1334b00e9', 0, 0, True, 52.38],
  [u'59e1334b00e9', 0, 1, False, 0.0],
  [u'59e1334b00e9', 1, 2, False, 0.0],
  [u'59e1334b00e9', 1, 3, False, 0.0],
  [u'59e1334b00e9', 2, 4, False, 0.0],
  [u'59e1334b00e9', 2, 5, True, 35.51],
  [u'59e1334b00e9', 2, 6, False, 0.0],
  [u'59e1334b00e9', 2, 7, True, 60.04],
  [u'59e1334b00e9', 2, 7, False, 0.0],
  [u'59e1334b00e9', 2, 8, False, 0.0],
  [u'59e1334b00e9', 2, 8, False, 0.0],
  [u'59e1334b00e9', 2, 8, False, 0.0]],
 [[u'b033af15661d', 0, 0, False, 0.0],
  [u'b033af15661d', 0, 0, False, 0.0],
  [u'b033af15661d', 1, 1, True, 97.61],
  [u'b033af15661d', 1, 2, True, 18.62],
  [u'b033af15661d', 1, 3, False, 0.0],
  [u'b033af15661d', 1, 3, False, 0.0],
  [u'b033af15661d', 1, 4, False, 0.0],
  [u'b033af15661d', 1, 4, False, 0.0],
  [u'b033af15661d', 1, 5, False, 0.0],
  [u'b033af15661d', 1, 5, False, 0.0],
  [u'b033af15661d', 1, 6, False, 0.0],
  [u'b033af15661d', 2, 7, True, 91.57],
  [u'b033af15661d', 2, 7, True, 22.22],
  [u'b033af15661d

In [19]:
def toCSV(data):
    return ','.join([str(d) for d in data])

In [20]:
hdr_output_t1 = 'customer_id,week_id,visit_id,isOrderType,amount\n'
with open('../../output/hdr_output_t1.txt','w') as f:
    f.writelines(hdr_output_t1)
output_t1 = customerAmountsPerWkandVisit.flatMap(lambda x: x).map(toCSV)
output_t1.saveAsTextFile('../../output/output_t1')
os.system("cat ../../output/hdr_output_t1.txt ../../output/output_t1/p* > ../../output/output_t1.txt")

0

In [21]:
output_t1.take(10)

['59e1334b00e9,0,0,True,52.38',
 '59e1334b00e9,0,1,False,0.0',
 '59e1334b00e9,1,2,False,0.0',
 '59e1334b00e9,1,3,False,0.0',
 '59e1334b00e9,2,4,False,0.0',
 '59e1334b00e9,2,5,True,35.51',
 '59e1334b00e9,2,6,False,0.0',
 '59e1334b00e9,2,7,True,60.04',
 '59e1334b00e9,2,7,False,0.0',
 '59e1334b00e9,2,8,False,0.0']

In [22]:
customerAmountsPerWkandVisit.take(10)

[[[u'59e1334b00e9', 0, 0, True, 52.38],
  [u'59e1334b00e9', 0, 1, False, 0.0],
  [u'59e1334b00e9', 1, 2, False, 0.0],
  [u'59e1334b00e9', 1, 3, False, 0.0],
  [u'59e1334b00e9', 2, 4, False, 0.0],
  [u'59e1334b00e9', 2, 5, True, 35.51],
  [u'59e1334b00e9', 2, 6, False, 0.0],
  [u'59e1334b00e9', 2, 7, True, 60.04],
  [u'59e1334b00e9', 2, 7, False, 0.0],
  [u'59e1334b00e9', 2, 8, False, 0.0],
  [u'59e1334b00e9', 2, 8, False, 0.0],
  [u'59e1334b00e9', 2, 8, False, 0.0]],
 [[u'b033af15661d', 0, 0, False, 0.0],
  [u'b033af15661d', 0, 0, False, 0.0],
  [u'b033af15661d', 1, 1, True, 97.61],
  [u'b033af15661d', 1, 2, True, 18.62],
  [u'b033af15661d', 1, 3, False, 0.0],
  [u'b033af15661d', 1, 3, False, 0.0],
  [u'b033af15661d', 1, 4, False, 0.0],
  [u'b033af15661d', 1, 4, False, 0.0],
  [u'b033af15661d', 1, 5, False, 0.0],
  [u'b033af15661d', 1, 5, False, 0.0],
  [u'b033af15661d', 1, 6, False, 0.0],
  [u'b033af15661d', 2, 7, True, 91.57],
  [u'b033af15661d', 2, 7, True, 22.22],
  [u'b033af15661d

In [23]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [24]:
from operator import add
#customerAmountsPerWkandVisit.flatMap(lambda x: x).reduce(add)
rdd = customerAmountsPerWkandVisit.reduce(add)
print type(customerAmountsPerWkandVisit)
print type(customerAmountsPerWkandVisit.reduce(add))
type(rdd)

<class 'pyspark.rdd.PipelinedRDD'>
<type 'list'>


list

In [25]:
columns = ['customer_id','week_id','visit_id','isOrder','amount']
rows = sqlContext.createDataFrame(rdd)

sqlContext.registerDataFrameAsTable(rows,"visit_amounts")
df2 = sqlContext.sql("""select _1 as customer_id,
                                _2 as week_id,
                                _3 as visit_id,
                                _4 as isOrder,
                                _5 as amount
                                from visit_amounts
                        """)

df2.show()

+------------+-------+--------+-------+------+
| customer_id|week_id|visit_id|isOrder|amount|
+------------+-------+--------+-------+------+
|59e1334b00e9|      0|       0|   true| 52.38|
|59e1334b00e9|      0|       1|  false|   0.0|
|59e1334b00e9|      1|       2|  false|   0.0|
|59e1334b00e9|      1|       3|  false|   0.0|
|59e1334b00e9|      2|       4|  false|   0.0|
|59e1334b00e9|      2|       5|   true| 35.51|
|59e1334b00e9|      2|       6|  false|   0.0|
|59e1334b00e9|      2|       7|   true| 60.04|
|59e1334b00e9|      2|       7|  false|   0.0|
|59e1334b00e9|      2|       8|  false|   0.0|
|59e1334b00e9|      2|       8|  false|   0.0|
|59e1334b00e9|      2|       8|  false|   0.0|
|b033af15661d|      0|       0|  false|   0.0|
|b033af15661d|      0|       0|  false|   0.0|
|b033af15661d|      1|       1|   true| 97.61|
|b033af15661d|      1|       2|   true| 18.62|
|b033af15661d|      1|       3|  false|   0.0|
|b033af15661d|      1|       3|  false|   0.0|
|b033af15661d

In [26]:
sqlContext.registerDataFrameAsTable(df2,"t1")
ltv_prelim = sqlContext.sql("""select customer_id,
                                week_id,
                                count(distinct visit_id) as visits_per_wk,
                                sum(amount) as amount_per_wk
                                from t1
                                group by customer_id, week_id
                                order by customer_id, week_id
                        """)

ltv_prelim.show()

+------------+-------+-------------+-----------------+
| customer_id|week_id|visits_per_wk|    amount_per_wk|
+------------+-------+-------------+-----------------+
|080a257c1f0e|      0|            2|              0.0|
|080a257c1f0e|      1|            2|              0.0|
|080a257c1f0e|      2|            4|           186.69|
|0b46c0536ee4|      0|            1|              0.0|
|0b46c0536ee4|      1|            3|            75.37|
|0b46c0536ee4|      2|            2|            20.58|
|0cb3f4bdb94b|      0|            2|              0.0|
|0cb3f4bdb94b|      1|            2|            29.36|
|13103fcfb98c|      0|            1|            34.09|
|13103fcfb98c|      1|            2|              0.0|
|136395ff6a28|      0|            1|            97.58|
|136395ff6a28|      1|            3|76.32000000000001|
|136395ff6a28|      2|            3|            96.88|
|211764fe0e81|      0|            1|              0.0|
|211764fe0e81|      1|            3|             96.4|
|231995cf0

In [27]:
sqlContext.registerDataFrameAsTable(ltv_prelim,"t2")
ltv_add = sqlContext.sql("""select customer_id,
                                week_id,
                                visits_per_wk,
                                amount_per_wk,
                                amount_per_wk/visits_per_wk as amount_per_visit
                                from t2
                                order by customer_id, week_id
                        """)

ltv_add.show()

+------------+-------+-------------+-----------------+------------------+
| customer_id|week_id|visits_per_wk|    amount_per_wk|  amount_per_visit|
+------------+-------+-------------+-----------------+------------------+
|080a257c1f0e|      0|            2|              0.0|               0.0|
|080a257c1f0e|      1|            2|              0.0|               0.0|
|080a257c1f0e|      2|            4|           186.69|           46.6725|
|0b46c0536ee4|      0|            1|              0.0|               0.0|
|0b46c0536ee4|      1|            3|            75.37|25.123333333333335|
|0b46c0536ee4|      2|            2|            20.58|             10.29|
|0cb3f4bdb94b|      0|            2|              0.0|               0.0|
|0cb3f4bdb94b|      1|            2|            29.36|             14.68|
|13103fcfb98c|      0|            1|            34.09|             34.09|
|13103fcfb98c|      1|            2|              0.0|               0.0|
|136395ff6a28|      0|            1|  

In [28]:
hdr_output_ltv_specs = 'customer_id,week_id,visits_per_wk,amount_per_wk,amount_per_visit\n'
with open('../../output/hdr_output_ltv_specs.txt','w') as f:
    f.writelines(hdr_output_ltv_specs)

ltv_add.write.csv('../../output/output_ltv_specs')
os.system("cat ../../output/hdr_output_ltv_specs.txt ../../output/output_ltv_specs/p* > ../../output/output_ltv_specs.txt")

0

In [29]:
sqlContext.registerDataFrameAsTable(ltv_add,"t3")
ltv_final = sqlContext.sql("""select customer_id,
                            52*(sum(amount_per_wk)/sum(visits_per_wk))*10 as customer_ltv
                                from t3
                                group by customer_id
                                order by customer_ltv desc
                        """)

ltv_final.show()

+------------+------------------+
| customer_id|      customer_ltv|
+------------+------------------+
|c670daa7e51c|23516.350000000002|
|f879ce019eb7|21889.771428571432|
|136395ff6a28|20115.085714285713|
|231995cf0916|17448.228571428568|
|817a2b9f1b49|           16628.3|
|d642a91ea4e4|          16493.75|
|7784d50a5dbe|           16218.8|
|ce218f4d6b6a|          15814.24|
|5da32100006b|15506.400000000001|
|5abd3a376bbf|14989.866666666667|
|d0baf4d11996|          14889.16|
|f82d02da0bcb|14226.160000000002|
|b033af15661d|13593.745454545453|
|eaeda57e3148|13425.533333333333|
|8fff640f5c8c|12914.720000000001|
|5a4a630327a0|12818.000000000002|
|4447afe26ff7|           12680.2|
|2879a4bfbb50|12606.360000000002|
|211764fe0e81|           12532.0|
|b02ecfd0f18d|12370.800000000001|
+------------+------------------+
only showing top 20 rows



In [30]:
hdr_output_ltv_final = 'customer_id,customer_ltv\n'
with open('../../output/hdr_output_ltv_final.txt','w') as f:
    f.writelines(hdr_output_ltv_final)

ltv_final.write.csv('../../output/output_ltv_final')
os.system("cat ../../output/hdr_output_ltv_final.txt ../../output/output_ltv_final/p* > ../../output/output_ltv_final.txt")

0