In [1]:
import findspark
findspark.init('/usr/local/spark')

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [None]:
import pyspark
import random
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

In [None]:
import pandas as pd 
import numpy as np
import os

base_dir = '/data/churn/partitions/'
partitions = list(range(len(os.listdir(base_dir))))

In [None]:
def generate_labels(customer_id, trans, label_type, churn_period = 30, return_cust = False):
    """Make labels for one customer for one period
    Params
    --------
        customer_id (str): string used to select customer
        trans (dataframe): transactions for customers
        label_type (str): either 'MS' for monthly labels at the start of the month or 
                          'SMS' for twice a month labels (on 1 and 15 of month)
        churn_period (int): number of days without membership required for a churn [default 30 days]
        return_cust (bool): whether or not to return the customer dataframe. Useful for debugging
        
    Return
    --------
        labels (dataframe): labels for all months in customer history
                            columns are ['msno', 'cutoff', 'churn', 'days_to_next_churn']
        cust (dataframe): if return_cust == True, a dataframe of the customers transactions
    """
    assert label_type in ['MS', 'SMS'], 'label_type must be either "MS" or "SMS"'
    
    cust = trans.loc[trans['msno'] == customer_id].copy()
    
    # Make sure to sort transactions and drop the index
    cust = cust.sort_values(['transaction_date', 'membership_expire_date']).reset_index(drop = True)
    
    # Find gap between membership expiration and start of next membership
    cust['gap'] = (cust['transaction_date'].shift(-1) - cust['membership_expire_date']).dt.days 
    
    # Determine if churn occur
    cust.loc[cust['gap'] > churn_period, 'churn']  = 1
    cust.loc[cust['gap'] <= churn_period, 'churn'] = 0
    
    # Calculate date range for labels
    first_trans = cust['transaction_date'].min()
    last_trans = cust['membership_expire_date'].max()
    start_date = pd.datetime(first_trans.year, first_trans.month, 1)
    
    # Handle case where last transaction month was december
    if last_trans.month == 12:
        end_date = pd.datetime(last_trans.year + 1, 1, 1)
    else:
        end_date = pd.datetime(last_trans.year, last_trans.month + 1, 1)

    # Create a range of dates for labels 
    # 'MS' = month starts, 'SM': twice a month on 15 and end.
    date_range = pd.date_range(start_date, end_date, freq = label_type)
    
    # Create a label dataframe
    labels = pd.DataFrame({'cutoff_time': date_range})
    labels['next_cutoff_time'] = labels['cutoff_time'].shift(-1)
    labels['msno'] = customer_id
    
    # Handle case where there are no churns
    if not np.any(cust['churn'] == 1):
        labels['churn'] = 0
        labels['days_to_next_churn'] = np.nan
        return labels[['msno', 'cutoff_time', 'churn', 'days_to_next_churn']]
    
    # If customer did churn set the churn date
    cust['potential_churn_date'] = cust['membership_expire_date'] + pd.Timedelta(churn_period, unit = 'd')
    cust.loc[cust['churn'] == 1, 'churn_date'] = cust.loc[cust['churn'] == 1, 'potential_churn_date']
    
    previous_churn = None

    # Iterate through the churn dates
    for churn_date in cust.loc[cust['churn_date'].notnull(), 'churn_date']:
        
        # Assign the label 1 if the customer churned during the cutoff_time period
        labels.loc[(labels['cutoff_time'] <= churn_date) & (labels['next_cutoff_time'] > churn_date), 'churn'] = 1

        if previous_churn is not None:
            # Subset to cutoff times after the previous churn but before the current churn
            # Calculate the days until the churn
            labels.loc[(labels['cutoff_time'] > previous_churn) & 
                       (labels['cutoff_time'] <= churn_date), 
                       'days_to_next_churn'] = (churn_date - labels.loc[(labels['cutoff_time'] > previous_churn) & 
                                                                       (labels['cutoff_time'] <= churn_date), 
                                                                        'cutoff_time']).dt.days
        # No previous churn
        else:
            # Subset to cutoff times before the current churn and calculate days until the churn
            labels.loc[labels['cutoff_time'] <= churn_date, 
                       'days_to_next_churn'] = (churn_date - labels.loc[labels['cutoff_time'] <= churn_date,
                                                                         'cutoff_time']).dt.days
        previous_churn = churn_date
    
    labels['churn'] = labels['churn'].fillna(0)
    
    # Sometimes want to return customer information for debugging
    if return_cust:
        return cust, labels[['msno', 'cutoff_time', 'churn', 'days_to_next_churn']]
    
    # Subset to relevant columns
    return labels[['msno', 'cutoff_time', 'churn', 'days_to_next_churn']]

In [None]:
def partition_to_labels(partition, label_type, churn_period):
    """Make labels for all customers in one partition
    Either for one month or twice a month
    
    Params
    --------
        partition (int): number of partition
        label_type (str): either 'monthly' for monthly labels or
                          'bimonthly' for twice a month labels
        churn_period (int): number of days required without a membership for a churn
    
    Returns
    --------
        None: saves the label dataframes with the appropriate name to the partition directory
    """
    
    # Read in data and filter anomalies
    trans = pd.read_csv(f'{base_dir}p{partition}/transactions.csv',
                        parse_dates=['transaction_date', 'membership_expire_date'], 
                        infer_datetime_format = True)
    trans = trans.loc[trans['membership_expire_date'] >= trans['transaction_date']]
    
    cutoff_list = []

    if label_type == 'monthly':
        # Iterate through every customer
        for customer_id in trans['msno'].unique():
            cutoff_list.append(generate_labels(customer_id, trans, label_type = 'MS', churn_period = churn_period))
        cutoff_times = pd.concat(cutoff_list)
        cutoff_times.to_csv(f'{base_dir}p{partition}/monthly_labels_{churn_period}.csv', index = False)
        
    
    elif label_type == 'bimonthly':
        for customer_id in trans['msno'].unique():
            cutoff_list.append(generate_labels(customer_id, trans, label_type = 'SMS', churn_period = churn_period))
        cutoff_times = pd.concat(cutoff_list)
        cutoff_times.to_csv(f'{base_dir}p{partition}/bimonthly_labels_{churn_period}.csv', index = False)

In [None]:
from timeit import default_timer as timer

In [None]:
start = timer()
partition_to_labels(1, 'monthly', 30)
end = timer()
print(f'{round(end - start)} seconds elapsed.')

In [None]:
start = timer()
partition_to_labels(1, 'bimonthly', 14)
end = timer()
print(f'{round(end - start)} seconds elapsed.')

In [None]:
!/usr/local/spark/sbin/stop-all.sh

In [None]:
!/usr/local/spark/sbin/start-all.sh

In [None]:
conf = pyspark.SparkConf()
conf.set('spark.eventLog.enabled', True);
conf.set('spark.eventLog.dir', '/usr/local/spark/tmp');

In [None]:
start = timer()
sc = pyspark.SparkContext(master = 'spark://ip-172-31-23-133.ec2.internal:7077', 
                          appName = 'labeling_month', conf = conf)
r = sc.parallelize(partitions, numSlices=1000).map(lambda x: partition_to_labels(x, 
                                                   label_type = 'monthly', 
                                                   churn_period = 30)).collect()
sc.stop()
end = timer()
print(f'{round(end - start)} seconds elapsed.')

In [None]:
start = timer()
sc = pyspark.SparkContext(master = 'spark://ip-172-31-23-133.ec2.internal:7077', 
                          appName = 'labeling_bimonthly', conf = conf)
r = sc.parallelize(partitions, numSlices=1000).map(lambda x: partition_to_labels(x, 
                                                   label_type = 'bimonthly', 
                                                   churn_period = 14)).collect()
sc.stop()
end = timer()
print(f'{round(end - start)} seconds elapsed.')

In [None]:
sc.stop()