In [1]:
CTRL_ID = 101 # Control experiment id
EXP_ID = 102  # Experiment experiment id
LE_MEAN_ASP_CTRL = 1.0 # Low-end products average sale price (asp) (in Control)
LE_MEAN_ASP_EXP = 0.9 # Low-end products average sale price (asp) (in Experiment)
HE_MEAN_ASP_CTRL = 2 # High-end products average sale price (asp) (in Control)
HE_MEAN_ASP_EXP = 2.2 # High-end product average sale price (asp) (in Experiment)
    
NUM_LE = 10 # Number of low-end products
NUM_HE = 12 # Number of high-end products
MEAN_LE_IMPS_CTRL = 5 # Averge number of impressions for low-end products (in Control)
MEAN_HE_IMPS_CTRL = 20 # Averge number of impressions for high-end products (in Control)
MEAN_LE_IMPS_EXP = 8 # Averge number of impressions for low-end products (in Experiment)
MEAN_HE_IMPS_EXP = 24 # Averge number of impressions for high-end products (in Experiment)

In [2]:
import numpy as np
'''
Creating fake data

Let’s make two categories of products: low-end and high-end (mean impression of 100, 10000) and have 100 in each category.
Each product will have a different average ctr (sampled from a Beta Distribution:
say Beta(3,15) -- mean of ~0.167 for both control and experiment

The average sale price (asp) for are Normally distributed N(1, 0.2), N(2, 0.2), N(0.9, 0.2), N(2.2, 0.2)
for sa-control, la-control, sa-exp, la-exp, respectively
First, get the total number of simulated impressions per product: sample poission(\lambda)
'''

def calc_sim_data(exp_id, product_ids, ctrs, mean_imps, mean_asp, size):
    exp_ids = exp_id*(np.ones(size))
    imps = np.random.poisson(mean_imps, size)
    zeros = 0.0*(np.ones(size))
    ones = np.ones(size)
    data = np.concatenate(([exp_ids], [product_ids], [ones], [zeros], [ctrs]), axis=0).transpose()
    full_data = np.repeat(data,imps, axis=0)

    np.set_printoptions(suppress=True) # Supress scientific notation.
    for r in full_data:
        r[3] = np.random.normal(mean_asp, 0.2)
        r[4] = np.random.binomial(1, r[4])
    return full_data
    
CTRL_ID = 101
EXP_ID = 102
LE_MEAN_ASP_CTRL = 1.0
LE_MEAN_ASP_EXP = 0.9
HE_MEAN_ASP_CTRL = 2
HE_MEAN_ASP_EXP = 2.2

NUM_LE = 10
NUM_HE = 12
MEAN_LE_IMPS_CTRL = 5
MEAN_HE_IMPS_CTRL = 20
MEAN_LE_IMPS_EXP = 8
MEAN_HE_IMPS_EXP = 24

le_product_ids = np.arange(1000,1000+NUM_LE)
he_product_ids = np.arange(10000,10000+NUM_HE)

le_ctr = np.random.beta(3, 15, NUM_LE)
he_ctr = np.random.beta(3, 15, NUM_HE)

le_data_ctrl = calc_sim_data(CTRL_ID, le_product_ids, le_ctr, MEAN_LE_IMPS_CTRL, LE_MEAN_ASP_CTRL, NUM_LE)
le_data_exp = calc_sim_data(EXP_ID, le_product_ids, le_ctr, MEAN_LE_IMPS_EXP, LE_MEAN_ASP_EXP, NUM_LE)
he_data_ctrl = calc_sim_data(CTRL_ID, he_product_ids, he_ctr, MEAN_HE_IMPS_CTRL, HE_MEAN_ASP_CTRL, NUM_HE)
he_data_exp = calc_sim_data(EXP_ID, he_product_ids, he_ctr, MEAN_HE_IMPS_EXP, HE_MEAN_ASP_EXP, NUM_HE)

all_data = np.concatenate(([le_data_ctrl], [le_data_exp],
                          [he_data_ctrl], [he_data_exp]), axis=1)[0]
np.set_printoptions(suppress=True)
np.savetxt('sim_data_{0}_{1}.csv'.format(NUM_LE, NUM_HE), all_data, fmt='%i,%i,%i,%5.2f,%i',
           header="exp_id,product_id,impressions,price,clicks")

In [1]:
import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
print(spark_home)
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip')) # for Spark 1.4
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.1-src.zip')) # for Spark 2.0

execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

/Users/davidada/apps/spark-2.0.0-bin-hadoop2.7
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/

Using Python version 2.7.10 (default, Oct 23 2015 19:19:21)
SparkSession available as 'spark'.


In [2]:
# Calculate the numerator:
# sum_i price_{ctrlid,i} * click_{expid,i} * w_i
# and the denominator:
# sum_i price_{expid,i} * click_{ctrlid,i} * w_i
# where w_i = 1/(click_{ctrlid,i} + click{expid,i})
def get_ctrl_exp(product_data):
    # The shape is (product_id, iterable(id, price, clicks))
    r = list(product_data[1])
    exp_idx = 0
    assert len(r) == 2
    if int(r[0][exp_idx]) == CTRL_ID and int(r[1][exp_idx]) == EXP_ID:
        ctrl = r[0]
        exp = r[1]
    elif int(r[1][exp_idx]) == CTRL_ID and int(r[0][exp_idx]) == EXP_ID:
        ctrl = r[1]
        exp = r[0]
    else:
        assert False
    return ctrl, exp


def calc_numerator(product_data):
    (price_idx, click_idx) = (1, 2)
    [ctrl, exp] = get_ctrl_exp(product_data)
    w_inverse = (ctrl[click_idx] + exp[click_idx])
    if w_inverse > 0:
        return (ctrl[price_idx] * exp[click_idx]) / w_inverse
    else:
        return 0
            
def calc_denominator(product_data):
    (price_idx, click_idx) = (1, 2)
    [ctrl, exp] = get_ctrl_exp(product_data)
    w_inverse = (ctrl[click_idx] + exp[click_idx])
    if w_inverse > 0:
        return (exp[price_idx] * ctrl[click_idx]) / w_inverse
    else:
        return 0

(exp, prod, imps, price, clicks) = (0,1,2,3,4)
def convert_line(l):
    return [int(l[exp]), int(l[prod]), int(l[imps]), float(l[price]), int(l[clicks])]


SyntaxError: invalid syntax (<ipython-input-2-896be7c1c5f0>, line 28)

In [7]:
from operator import add

# We want to calculate MH(k_{a,i},n_{a,i},k_{b,i},n_{b,i}), where a and b are control and experiment
# and there k and n in our cases are sale prices and clicks.
input_rdd = sc.textFile('sim_data_{0}_{1}.csv'.format(NUM_LE, NUM_HE))
header = input_rdd.first() # Remove the first line.
parsed_input_rdd = input_rdd.filter(lambda x: x !=header).map(lambda x: convert_line(x.split(',')))
transformed = parsed_input_rdd.map(lambda x: ((x[exp], x[prod]), (x[clicks]*x[price], x[clicks])))

(sp, clks) = (0, 1) # sale price and clicks
(ep, sc) = (0, 1) # exp_id&product_id, sp&clicks
(exp2, prod2) = (0, 1) # exp_id, product_id
# For each product cross exp_id, sum the sale prices and clicks
grouped_result = transformed.reduceByKey(lambda x,y: (x[sp]+y[sp], x[clks]+y[clks]))
grouped_by_product = grouped_result.map(lambda x: ((x[ep][prod2]), (x[ep][exp2], x[sc][sp], x[sc][clks]))).groupByKey()

numerator_sum = grouped_by_product.map(lambda x: calc_numerator(x)).reduce(add)
denominator_sum = grouped_by_product.map(lambda x: calc_denominator(x)).reduce(add)
print(numerator_sum, denominator_sum)


(51.28058705475811, 55.114265835042154)


In [3]:
import google.cloud.dataflow as df

def t_sum(values):
    result = [0,0]
    for v in values:
        result[0] += v[0]
        result[1] += v[1]
    return (result[0], result[1])

# Create a pipeline executing on a direct runner (local, non-cloud).
p = df.Pipeline('DirectPipelineRunner')
parsed_input_rdd = (p
 | df.Read('load records', df.io.TextFileSource('sim_data_{0}_{1}.csv'.format(NUM_LE, NUM_HE)))
 | df.Filter('filter header', lambda x: x[0] != '#')
 | df.Map('split line', lambda x: convert_line(x.split(','))))
transformed = (parsed_input_rdd
 | df.Map((lambda x: ((x[exp], x[prod]), (x[price]*x[clicks], x[clicks])))))

(sp, clks) = (0, 1) # sale price and clicks
(ep, sc) = (0, 1) # exp_id&product_id, sp&clicks
(exp2, prod2) = (0, 1) # exp_id, product_id

# For each product cross exp_id, sum the sale prices and clicks
grouped_result = (transformed
 | df.CombinePerKey('combine per product/id', t_sum))
grouped_by_product = (grouped_result
 | df.Map(lambda x: ((x[ep][prod2]), (x[ep][exp2], x[sc][sp], x[sc][clks])))
 | df.GroupByKey())

numerator_sum = (grouped_by_product
 | df.Map(lambda x: calc_numerator(x))
 | df.CombineGlobally('num', sum))
numerator_sum | df.Write('save numerator', df.io.TextFileSink('./numerator_sum'))

denominator_sum = (grouped_by_product
 | df.Map(lambda x: calc_denominator(x))
 | df.CombineGlobally('denom', sum))
denominator_sum | df.Write('save denominator', df.io.TextFileSink('./denominator_sum'))
p.run()


ERROR:root:Error while visiting split line


NameError: global name 'convert_line' is not defined [while running 'split line']