In [1]:
# CHANGE WORKING DIRECTORY AND ADD MY LIBRARIES
import os
os.chdir('./4.Santander_Customer_Satisfaction')
import sys
sys.path.insert(0, '../mylib/')

In [3]:
# PYSPARK SETTINGS
sys.path.append("C:\spark\python\lib\pyspark.zip")
sys.path.append("C:\spark\python\lib\py4j-0.9-src.zip")

os.environ["SPARK_HOME"] = "C:\spark"
os.environ["HADOOP_HOME"] = "C:\hadoop" 
# see http://nishutayaltech.blogspot.co.uk/2015/04/how-to-run-apache-spark-on-windows7-in.html for more info
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

In [4]:
# PACKAGES

from pyspark.sql.functions import stddev
from pyspark.sql.functions import min
from pyspark.sql.functions import udf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import BinaryClassificationMetrics

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

import seaborn as sns

In [5]:
# MAGIC COMMANDS
%matplotlib inline

In [5]:
sc = SparkContext("local", "test")

In [10]:
sqlContext = SQLContext(sc)

In [7]:
# Simple test:
lines = sc.textFile('README.md')

In [13]:
lines.count()

215

In [11]:
# LOAD DATA FILES (CSV)

training_df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(inferSchema="true", header='true') \
    .load('train.csv')


test_df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(inferSchema="true", header='true') \
    .load('test.csv')


sample_submission_df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(inferSchema="true", header='true') \
    .load('sample_submission.csv')

# See http://spark.apache.org/docs/latest/sql-programming-guide.html for examples on how to deal with Spark SQL dataframes

In [9]:
# Example: show target
#training_df.select('TARGET').show(30) # (returns another dataframe and shows it)
# which is not the same as training_df['var15'] (returns a Column object, the equivalent to Series?)
# have a look here: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column

In [9]:
# EXAMPLE: show age and target. add 1 unit to age. This is also an example of operations over columns
#training_df.select(training_df['var15'] + 1, training_df['TARGET']).show(50)
# example: 
#type(training_df.select('var15', 'TARGET'))

In [9]:
# Get a summary of the dataset as a pandas dataframe, 
# so that dealing with this small amount of information (in comparison with the dataset) is much quicker.
pandas_df_training_describe = training_df.describe().toPandas()

In [None]:
# Find out which object has a std() method

#type(training_df['var15']) # Column -> stddev
#type(training_df.var15) # Column
#type(training_df.select('var15')) # DataFrame

# This was confusing at the beginning. A Column is a name or an expression: stddev('var15') to be used as selector. 
# It does not contain data (check this statement later). 
# See https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html

In [10]:
# Remove constant features

# Get std of all variables (row 2, starting from column 1) as a dict.
training_stds = pandas_df_training_describe.iloc[2,1:].apply(float).to_dict()

# get those columns whose std == 0.0
remove = [col for col, value in training_stds.items() if value == 0.0]

for col in remove:
    training_df = training_df.drop(col)
    test_df = test_df.drop(col)

In [95]:
# Remove duplicated columns: 
# THIS CODE IS TOO SLOW. I NEED TO FIND AN ALTERNATIVE WAY
remove = []

# Get col names from the describe pandas dataframe:
cols = pandas_df_training_describe.columns[1:]  # all columns but the first one, which in this dataframe is "summary"

# from the first to the penultimate column
for i in range(0, len(cols)-1):
    for j in range(i+1, len(cols)): # compare with the following columns but using the correlation coefficient. If == 1, equal cols.
        if training_df.corr(cols[i], cols[j]) == 1:
            remove.append(cols[j])

remove

#for i in range(len(c)-1):
#    v = training_df[c[i]].values
#    for j in range(i+1,len(c)):
#        if np.array_equal(v,training_df[c[j]].values):
#            remove.append(c[j])
            
#training_df.drop(remove, axis=1, inplace=True)
#test_df.drop(remove, axis=1, inplace=True)

############################

In [11]:
# EXPLORATORY ANALYSIS:
# check if there is any repeated ID, which would imply to tidy the data set:
#id_counts_df = training_df['ID'].value_counts().sort_index()  # count the number of occurrences of each ID
#max(id_counts_df)  # if the max value is 1, then there are no repeated IDs = 1 row for each observation.


id_counts_df = training_df.groupBy('ID').count()
id_counts_df.groupBy().max('count').show()

+----------+
|max(count)|
+----------+
|         1|
+----------+



In [15]:
# Write the describe table to a file
describe_file = open('describe_file_pyspark.txt', 'w+')
describe_file.write(pandas_df_training_describe.to_json()) 
describe_file.close()

In [12]:
# Explore variables graphically with pyspark. Two options came to my mind: 
#   1. get a small subset of the dataset and do traditional visualization with pandas and seaborn.
#   2. try Apache Zeppeling, which is still incubating

# Let's go for the firs one. In order to know the fraction of the training set we want to obtain, 
# it would be useful to know its size first:

# Get column names
column_names = training_df.columns  # show all column names. Remember 1st element is "Summary"
#column_names

training_df_shape = (training_df.count(), len(column_names))
training_df_shape

(76020, 337)

In [13]:
# Once we know the size, lets get a fraction of the dataframe and then convert it to pandas dataframe:
subset_training_df = training_df.sample(False,0.05).toPandas()
subset_training_df.shape

(3842, 337)

In [14]:
# Describe the subset:
subset_training_df.describe()

Unnamed: 0,ID,var3,var15,imp_ent_var16_ult1,imp_op_var39_comer_ult1,imp_op_var39_comer_ult3,imp_op_var40_comer_ult1,imp_op_var40_comer_ult3,imp_op_var40_efect_ult1,imp_op_var40_efect_ult3,...,saldo_medio_var33_hace2,saldo_medio_var33_hace3,saldo_medio_var33_ult1,saldo_medio_var33_ult3,saldo_medio_var44_hace2,saldo_medio_var44_hace3,saldo_medio_var44_ult1,saldo_medio_var44_ult3,var38,TARGET
count,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0,...,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0,3842.0
mean,76828.980739,-2339.789953,33.164237,115.328769,78.298493,127.008155,5.725315,9.83676,0.167694,0.167694,...,32.771611,0.373345,45.850409,30.042283,12.673571,1.901627,38.082033,36.247137,122351.268509,0.041645
std,43866.243355,48349.345212,12.69898,2284.990662,358.626038,531.096111,114.384073,191.324687,6.619606,6.619606,...,1035.295994,19.546962,1472.095905,969.884422,686.905932,117.870184,1131.059386,1111.40804,278975.937365,0.199803
min,125.0,-999999.0,5.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,12088.95,0.0
25%,38752.5,2.0,23.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,69137.8425,0.0
50%,77705.0,2.0,28.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,105098.835,0.0
75%,113933.5,2.0,40.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,117310.979016,0.0
max,151767.0,238.0,95.0,129000.0,9946.71,11140.08,5122.29,7740.78,300.0,300.0,...,38838.39,1200.03,63317.19,42767.16,42316.44,7306.05,50415.36,50415.36,11857856.46,1.0


In [68]:
# Explore the distribution of var15 graphically:
sns.set(rc={"figure.figsize": (16, 8)})
plt = sns.distplot(subset_training_df['var15'], 
             hist_kws={"linewidth": 1},  # histogram
             rug_kws={"color": "g"},  # Plot datapoints in an array as sticks on an axis.
             kde_kws={"color": "b", "lw": 2, "label": "mean"}  # Fit and plot a univariate or bivariate kernel density estimate.
             )

In [None]:
# The idea of exploring a subset graphically can be extended to other steps. I just thought about the words "Subset Engineering", 
# which could be a new step used when dealing with big data sets which have to be reduced to easily deal with them. This step 
# could be based on several heuristics to create meaninful subsets.

In [98]:
# There are several boolean variables in the dataset. Lets figure out what they mean. 
# First, I will get all their names and the proportion of 0s and 1s

# THIS CODE IS TOO SLOW. I have to find an alternative

#print(training_df.select('ID').collect(0))

#for col in column_names:
#    print training_df.select(min(col)).collect()[0][0]

#booleans = [col for col in column_names 
#            if training_df.select(min(col)).collect()[0][0] == 0 
#            if training_df.select(max(col)).collect()[0][0] == 1]

#list(training_df[booleans])

In [15]:
# DATA TRANSFORMATION

# remove the ID field from DataFrames, but save first

training_IDs = training_df.select('ID')
test_IDs = test_df.select('ID')

# drop() does not modify the current dataframe. It returns a new dataframe with the chosen column removed
training_df = training_df.drop('ID') 
test_df = test_df.drop('ID')

# update column names:
column_names = training_df.columns

In [None]:
# Transform age column to: likely to change = 1, unlikely to change = 0. Lets establish the threshold above 40 years old.

# - Following http://stackoverflow.com/questions/29109916/updating-a-dataframe-column-in-spark, there are several alternatives, 
# both at DataFrame and RDD level
# - Useful tips when transitioning from pandas dataframe to spark dataframe
# http://growthintel.com/from-pandas-to-spark/

In [16]:
# DataFrame select/udf - based transformation:
def var15_tobool_f(var):
    if var < 40:
        return 1
    else:
        return 0

var15_tobool = udf(var15_tobool_f)
index_var15 = training_df.columns.index('var15')

column_names = training_df.columns  # save current column names
training_df = training_df.select(column_names[:index_var15]+[var15_tobool('var15')]+column_names[index_var15+1:])
training_df = training_df.withColumnRenamed(training_df.columns[index_var15], 'var15')
# Given the name 'var15' changes to 'PythonUDF#var15_tobool_f(var15)', this column name has to be restored

In [30]:
# * Notes on RDDs:

# why does a row of an rdd admit operations like x['var15'] or x.var15? (see the previous lambda x func)
# rdds coming from a dataframe contain Row objects, which contain the schema from the original dataframe
# Check it by doing: 
#rdd_t = training_df.rdd.map(lambda x: type(x))
#rdd_t.first()

# By checking the documentation, this can be confirmed:
# DataFrame.rdd: Returns the content as an pyspark.RDD of Row.
# Row: A row in DataFrame. The fields in it can be accessed like attributes.

In [33]:
# Feature selection (mllib) the only one available in mllib is ChiSqSelector, which is not still available for pyspark.

In [None]:
# Balancing the training set:

In [19]:
# CLASSIFIER & TRAINING: support vector machine, class imbalance handling

# Load and parse the data
#def prepare(row):
#    return LabeledPoint(row[-1], row[:-1])

#training_df.map(prepare) and training_df.map(lambda row: prepare(row)) are equivalent
#prepared_training_df = training_df.map(prepare)

# A compact version of the previous lines:
prepared_training_df = training_df.map(lambda row: LabeledPoint(row[-1], row[:-1]))

In [18]:
# Build the model
model = SVMWithSGD.train(prepared_training_df, iterations=5)

In [None]:
# Save and load model
model.save(sc, "MLlib_SVM_model")
reloadedModel = SVMModel.load(sc, "MLlib_SVM_model")

In [19]:
# TEST: prepare test RDD
#prepared_training_df.first().label
#prepared_training_df.first().features

# Create an RDD of tuples (true label, predicted label) # float() is important!
predictionAndLabels = prepared_training_df.map(lambda row: (float(model.predict(row.features)), row.label))

# create an RDD of tuples where true label != predicted label, count the amount and divide by the total number of tuples
trainErr = predictionAndLabels.filter(lambda row: row[0] != row[1]).count() / float(predictionAndLabels.count()) 

print("Training Error = " + str(trainErr))

Training Error = 0.0395685345961589


In [29]:
# Test metrics:
metrics = BinaryClassificationMetrics(predictionAndLabels)

In [30]:
# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under PR = 0.5197842672980795
Area under ROC = 0.5


In [22]:
# Cross - validation:
# see http://spark.apache.org/docs/latest/ml-guide.html (overview of MLlib: estimators, transformers and pipelines)
# and then https://spark.apache.org/docs/1.6.1/api/python/pyspark.ml.html#module-pyspark.ml.tuning
# ml is different from mllib, keep it in mind and compare both. 
# Remember I previously did the following with mllib: 
#   model = SVMWithSGD.train(prepared_training_df, iterations=5) 
#   predictionAndLabels = prepared_training_df.map(lambda row: (float(model.predict(row.features)), row.label)) 
# ml module uses fit() and transform() instead:

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Learn a LogisticRegression model. I use prepared_training_df, 
# as ml also uses the convention (label, features) instead of (features, label) to learn from the training data.
# Estimator.fit() uses a dataframe as input an returns a Transformer model as output.
# prepared_training_df is an RDD, which is ok for mllib package, but not for ml, which requires dataframes to be used.
# So we have to turn the prepared_training_df RDD into a dataframe again.
prepared_training_df_ml = prepared_training_df.toDF()

# train the model
model = lr.fit(prepared_training_df_ml)  

# Make predictions on test data using the Transformer.transform() method.
# Using the training data as test data is not a good approach. Anyways, the purpose of this code block is to understand 
# the ml package. I will go back to this problem later.
prediction = model.transform(prepared_training_df_ml)



In [23]:
prediction.head()

Row(features=DenseVector([2.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 0.0, 3.0, 0.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 99.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0