## KKChurn model 
### Team members: Lorena Mejía, Alfredo Carrillo and Ricardo Figueroa

In [None]:
import numpy as np
import csv
import sys
import pandas as pd
from pyspark import SparkContext, SparkConf
import os
import re
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [2]:
# We create the spark application called 'KKchurn' to use all the available cores
conf = SparkConf().setAppName('KKChurn').setMaster("local[*]").set("spark.driver.maxResultSize", "6g")
sc=SparkContext(conf=conf)

In [84]:
# Import transactions file using Pyspark
transactions=sc.textFile('./data/transactions.csv') \
    .map(lambda line: line.split(","))
transactions=transactions.toDF(transactions.first())

In [85]:
# Import transactions_v2 file using Pyspark
transactions_2=sc.textFile('./data/churn_comp_refresh/train_v2.csv') \
    .map(lambda line: line.split(","))
transactions_2=transactions_2.toDF(transactions_2.first())

In [77]:
# Import train data (before updates)
train_data=sc.textFile('./data/train.csv') \
    .map(lambda line: line.split(","))
train_data=train_data.toDF(train_data.first())

In [78]:
# Import train data v2 
train_data_2=sc.textFile('./data/churn_comp_refresh/train_v2.csv') \
    .map(lambda line: line.split(","))
train_data_2=train_data_2.toDF(train_data_2.first())

In [79]:
sample_submission_zero = sc.textFile('./data/sample_submission_zero.csv') \
    .map(lambda line: line.split(","))
sample_submission_zero=sample_submission_zero.toDF(sample_submission_zero.first())

In [86]:
sample_submission_2 = sc.textFile('./data/churn_comp_refresh/sample_submission_v2.csv') \
    .map(lambda line: line.split(","))
sample_submission_2=sample_submission_2.toDF(sample_submission_2.first())

In [93]:
user_logs = sc.textFile('./data/user_logs.csv') \
    .map(lambda line: line.split(","))
user_logs = user_logs.toDF(user_logs.first())

In [94]:
user_logs = sc.textFile('./data/user_logs.csv') \
    .map(lambda line: line.split(","))
user_logs = user_logs.toDF(user_logs.first())

In [96]:
# Example to show a dataframe from Spark
user_logs.show()

+--------------------+--------+------+------+------+-------+-------+-------+----------+
|                msno|    date|num_25|num_50|num_75|num_985|num_100|num_unq|total_secs|
+--------------------+--------+------+------+------+-------+-------+-------+----------+
|                msno|    date|num_25|num_50|num_75|num_985|num_100|num_unq|total_secs|
|rxIP2f2aN0rYNp+to...|20150513|     0|     0|     0|      0|      1|      1|  280.3350|
|rxIP2f2aN0rYNp+to...|20150709|     9|     1|     0|      0|      7|     11| 1658.9480|
|yxiEWwE9VR5utpUec...|20150105|     3|     3|     0|      0|     68|     36|17364.9560|
|yxiEWwE9VR5utpUec...|20150306|     1|     0|     1|      1|     97|     27|24667.3170|
|yxiEWwE9VR5utpUec...|20150501|     3|     0|     0|      0|     38|     38| 9649.0290|
|yxiEWwE9VR5utpUec...|20150702|     4|     0|     1|      1|     33|     10|10021.5200|
|yxiEWwE9VR5utpUec...|20150830|     3|     1|     0|      0|      4|      7| 1119.5550|
|yxiEWwE9VR5utpUec...|20151107| 

In [103]:
user_logs_2 = sc.textFile('./data/churn_comp_refresh/sample_submission_v2.csv') \
    .map(lambda line: line.split(","))
user_logs_2 = user_logs_2.toDF(user_logs_2.first())

In [108]:
members = sc.textFile('./data/members_v3.csv') \
    .map(lambda line: line.split(","))
members = members.toDF(members.first())

In [75]:
# Example of a query using pyspark
aggr_value = transactions.select("msno").show()

+--------------------+
|                msno|
+--------------------+
|                msno|
|YyO+tlZtAXYXoZhNr...|
|AZtu6Wl0gPojrEQYB...|
|UkDFI97Qb6+s2LWci...|
|M1C56ijxozNaGD0t2...|
|yvj6zyBUaqdbUQSrK...|
|KN7I82kjY0Tn76Ny9...|
|m5ptKif9BjdUghHXX...|
|uQxbyACsPOEkTIrv9...|
|LUPRfoE2r3WwVWhYO...|
|pMVjPLgVknaJYm9L0...|
|bQkbrEPdMfVfdsoz0...|
|TZVCT9pCufI/AWjrG...|
|b2AiGMFhT6fbDyN12...|
|ksInNb4D5jdSSIYUr...|
|aQKXNflQtXF92cpv4...|
|iFxPpElVK6kXnZbuh...|
|8qrtRZQTuCih4YJhj...|
|pE2FeJOBZv5snDGdF...|
|vma4rQzDa/l4Wb/My...|
+--------------------+
only showing top 20 rows



### Utils functions

In [81]:
# Util functions 
def splitComma(line:str):
    splits = COMMA_DELIMITER.split(line)
    return "{}, {}".format(splits[1], splits[6])
def comma_delimiter(line):
    return(line.split(','))
COMMA_DELIMITER = re.compile(''', (?=(?:[^"]*"[^"]*")*[^"]*$)''')
COMMA_DELIMITER.split("hisds, sdfs")

['hisds', 'sdfs']

In [None]:
# How to save to a text file 
# transactions.map(lambda row: str(row[0]) + "\t" + str(row[1])) \
#    .saveAsTextFile("out/try2.text")

In [2]:
# Definimos función para importar datos
def import_csv(file):
    temp_data = []
    with open(file,'rt', encoding='ASCII') as csvfile: 
        reader = csv.reader(csvfile, delimiter=',', quotechar='|') 
        for row in reader:
            temp_data.append(row)
    temp_data = np.array(temp_data)
    return(temp_data)

In [3]:
# Definimos una función para limpiar los datos del train y test data
def limpiar_train_test(np_array):
    np_array = np.array([[np_array[i][0].replace("=", ""), np_array[i][1]] for i in range(len(np_array))])
    return(np_array)

### Train y Test Data
The train and test set, containing the user ids and whether they have churned. 
- msno: user id
- is_churn: This is the target variable. Churn is defined as whether the user did not continue the subscription within 30 days of expiration. is_churn = 1 means churn,is_churn = 0 means renewal.


### Transactions Data
The information and variables included in this file are the following:
- MSNO: id of the user
- payment_method_id: payment method
- payment_plan_days: length of membership plan in days
- plan_list_price: in New Taiwan Dollar (NTD)
- actual_amount_paid: in New Taiwan Dollar (NTD)
- is_auto_renew
- transaction_date: format %Y%m%d
- membership_expire_date: format %Y%m%d
- is_cancel: whether or not the user canceled the membership in this transaction.

### User logs
Daily user logs describing listening behaviors of a user. Data collected until 2/28/2017. 
The information and variables included in this file are the following:
- msno: user id
- date: format %Y%m%d
- num_25: # of songs played less than 25% of the song length
- num_50: # of songs played between 25% to 50% of the song length
- num_75: # of songs played between 50% to 75% of of the song length
- num_985: # of songs played between 75% to 98.5% of the song length
- num_100: # of songs played over 98.5% of the song length
- num_unq: # of unique songs played

### Members
User information
The information and variables included in this file are the following:
- msno: user id
- date: format %Y%m%d
- num_25: # of songs played less than 25% of the song length
- num_50: # of songs played between 25% to 50% of the song length
- num_75: # of songs played between 50% to 75% of of the song length
- num_985: # of songs played between 75% to 98.5% of the song length
- num_100: # of songs played over 98.5% of the song length
- num_unq: # of unique songs played