In [1]:
spark

In [19]:
from pyspark.sql.functions import to_date, expr, monotonically_increasing_id

infile = 'file:///home/cloudera/2.kkbox_churn/raw_data/transactions.csv'
df0 = spark.read.format('csv').option('header','true').load(infile) \
  .select(monotonically_increasing_id().alias('rid'),
          'msno', 
          expr('payment_method_id AS pay_method'),
          expr('CAST(payment_plan_days AS int) AS plan_days'),
          expr('plan_list_price AS list_price'),
          expr('actual_amount_paid AS actual_paid'),
          expr('CAST(is_auto_renew AS boolean) AS auto_renew'),
          expr('CAST(is_cancel AS boolean) AS is_cancel'),
          to_date('transaction_date', 'yyyyMMdd').alias('trans_date'),
          to_date('membership_expire_date', 'yyyyMMdd').alias('expire_date')
         )

df0.printSchema()

root
 |-- rid: long (nullable = false)
 |-- msno: string (nullable = true)
 |-- pay_method: string (nullable = true)
 |-- plan_days: integer (nullable = true)
 |-- list_price: string (nullable = true)
 |-- actual_paid: string (nullable = true)
 |-- auto_renew: boolean (nullable = true)
 |-- is_cancel: boolean (nullable = true)
 |-- trans_date: date (nullable = true)
 |-- expire_date: date (nullable = true)



In [24]:
# filter: trans_date >= 2015-11-01
from pyspark.sql.functions import col, lit
df1 = df0.where(col('trans_date') >= to_date(lit('2015-11-01')))
df1.show(5)

+---+--------------------+----------+---------+----------+-----------+----------+---------+----------+-----------+
|rid|                msno|pay_method|plan_days|list_price|actual_paid|auto_renew|is_cancel|trans_date|expire_date|
+---+--------------------+----------+---------+----------+-----------+----------+---------+----------+-----------+
| 44|FT4moGxOj6tzwkTSA...|        34|       30|       149|        149|      true|    false|2015-11-30| 2015-12-31|
| 45|z1s1E/gm6xiwjNb8T...|        34|       30|       149|        149|      true|    false|2015-11-30| 2015-12-31|
| 46|lZyYiuAJW3qzDnicN...|        34|       30|       149|        149|      true|    false|2015-11-30| 2015-12-31|
| 47|pXpFcJbT8/FDkhnSU...|        34|       30|       149|        149|      true|    false|2015-11-30| 2015-12-31|
| 48|0f9IUy6wP6pEUntXg...|        31|       30|       149|        149|      true|    false|2015-11-30| 2015-12-31|
+---+--------------------+----------+---------+----------+-----------+----------

# Dates Correction

In [52]:
import datetime as dt

def date_minus_days(d, n):
    '''
    `d`: an object of class `datetime.date`
    `n`: an integer
    Returns a `datetime.date` object.
    '''
    return (dt.datetime.combine(d, dt.time()) - dt.timedelta(days=n)).date()
    

def dates_correction(is_cancel, trans_date, expire_date, plan_days):
    '''
    `is_cancel`: boolean
    `trans_date`, `expire_date`: datetime.date objects
    `plan_days`: integer
    Returns [corrected-start-date, corrected-expiration-date]
    '''
    d_start = None
    d_exp = None
    
    if is_cancel:
        d_exp = expire_date if trans_date <= expire_date else trans_date
    else: # not is_cancel
        start_date = date_minus_days(expire_date, plan_days-1)
        if trans_date <= start_date:
            d_start = start_date
            d_exp = expire_date
        elif trans_date <= expire_date:
            d_start = trans_date
            d_exp = expire_date
        else:
            d_start = None
            d_exp = None
            
    return [d_start, d_exp]


rdd_corr1 = df1.rdd.map(lambda row: [row['rid']] + dates_correction(row['is_cancel'], row['trans_date'], 
                                                                   row['expire_date'], row['plan_days']))
rdd_corr1.take(3)

[[44, datetime.date(2015, 12, 2), datetime.date(2015, 12, 31)],
 [45, datetime.date(2015, 12, 2), datetime.date(2015, 12, 31)],
 [46, datetime.date(2015, 12, 2), datetime.date(2015, 12, 31)]]

In [53]:
from pyspark.sql import Row
rdd_corr2 = df_corr1.map(lambda rec: Row(rid=rec[0], corr_start_date=rec[1], corr_exp_date=rec[2]))
rdd_corr2.take(3)

[Row(corr_exp_date=datetime.date(2015, 12, 31), corr_start_date=datetime.date(2015, 12, 2), rid=44),
 Row(corr_exp_date=datetime.date(2015, 12, 31), corr_start_date=datetime.date(2015, 12, 2), rid=45),
 Row(corr_exp_date=datetime.date(2015, 12, 31), corr_start_date=datetime.date(2015, 12, 2), rid=46)]

In [56]:
df_corr = rdd_corr2.toDF()
df_corr.show(3)

+-------------+---------------+---+
|corr_exp_date|corr_start_date|rid|
+-------------+---------------+---+
|   2015-12-31|     2015-12-02| 44|
|   2015-12-31|     2015-12-02| 45|
|   2015-12-31|     2015-12-02| 46|
+-------------+---------------+---+
only showing top 3 rows



# Merge Tables
Merge `df1` with `df_corr`.

In [63]:
df_corr_ = df_corr.withColumnRenamed('rid', 'rid_')

df_final = df1.join(df_corr_, df1['rid']==df_corr_['rid_'], 'inner') \
  .drop('rid').drop('rid_')

In [64]:
df_final.printSchema()

root
 |-- msno: string (nullable = true)
 |-- pay_method: string (nullable = true)
 |-- plan_days: integer (nullable = true)
 |-- list_price: string (nullable = true)
 |-- actual_paid: string (nullable = true)
 |-- auto_renew: boolean (nullable = true)
 |-- is_cancel: boolean (nullable = true)
 |-- trans_date: date (nullable = true)
 |-- expire_date: date (nullable = true)
 |-- corr_exp_date: date (nullable = true)
 |-- corr_start_date: date (nullable = true)



In [65]:
# output
outfile = 'file:///home/cloudera/2.kkbox_churn/data01/from_raw_transactions-v1/with_corrected_dates'
df_final.write.format('csv').option('header','true').save(outfile)