In [None]:
############## skip this if use Databrick ####################################
# The first step of any Spark driver application is to create a SparkContext. 
# The SparkContext allows Spark driver application to access the cluster through a resource manager. 
# In order to create a SparkContext, we need first create a SparkConf. 
# The SparkConf stores configuration parameters that Spark driver application will pass to SparkContext. 
# setAppName() gives your Spark driver application a name so you can identify it in the Spark 
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("appName")
#conf = SparkConf().setMaster("local[*]").setAppName("Naive_Bayes")

sc = SparkContext(conf=conf)
print "Running Spark Version %s" % (sc.version)

#SparkSQL is used to process structured data, SparkSQL implements dataframes and a SQL query engine
#SparkSQL has a SQLContext and a HiveContext
sqlContext=SQLContext(sc)

In [None]:
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
pd.options.display.mpl_style = 'default'
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from scipy.stats import skew

from pyspark.ml import *
from pyspark.ml.classification import *
from pyspark.ml.regression import *
from pyspark.ml.clustering import *
from pyspark.ml.feature import *
from pyspark.ml.param import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.mllib.evaluation import RegressionMetrics, MulticlassMetrics
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import *
#from pyspark.sql.functions import rand, log, sqrt, pow, size, exp, mean, sum, avg 
from sklearn.metrics import classification_report
from time import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row


### Create a RDD 

In [None]:
## create a RDD from the file in HDFS
#The textFile() method reads the file into a Resilient Distributed Dataset (RDD) with each line in the file 
rdd1 = sc.textFile("amazon_cells_labelled.txt")
rdd1.take(5)
#[u'So there is no way for me to plug it in here in the US unless I go by a converter.\t0',
# u'Good case, Excellent value.\t1',
# u'Great for the jawbone.\t1',
# u'Tied to charger for conversations lasting more than 45 minutes.MAJOR PROBLEMS!!\t0',
# u'The mic is great.\t1']
rdd1.count() #1000   #prints the number of elements (lines) 
rdd1.getNumPartitions() #2
##########################################################################################
## create a RDD from the train file in HDFS
rdd1 = sc.textFile("allstate_train.csv")
rdd1.take(3)
#[u'id,cat1,cat2,
# u'1,A,B,
# u'2,A,B]
##########################################################################################
#convert a list into a RDD
lines = sc.parallelize([1,2,3])
numbers = sc.parallelize(range(10),3)#parallize the range output [0,1,2,..,9] into 3 partitions 
#it will become 3 partitions : [0,1,2],[3,4,5],[6,7,8,9]
numbers.collect() #gather 3 partitions into one partition [1,2,3,...9]
numbers.getNumPartitions() #3 show oartition number

### RDD Map function

In [None]:
rdd1 = sc.textFile("amazon_cells_labelled.txt")
#Split a review sentence and a corresponding label by finding a tab (\t )
rdd2 = rdd1.map(lambda x: x.split("\t"))
#[[u'So there is no way for me to plug it in here in the US unless I go by a converter.', u'0'],
# [u'Good case, Excellent value.', u'1'],
# [u'Great for the jawbone.', u'1'],
# [u'Tied to charger for conversations lasting more than 45 minutes.MAJOR PROBLEMS!!', u'0'],
# [u'The mic is great.', u'1']]

#Create a RDD of Rows with the field names : label and review
rdd3 = rdd2.map(lambda x: Row(review=x[0],label=x[1]))
#[Row(label=u'0', review=u'So there is no way for me to plug it in here in the US unless I go by a converter.'),
# Row(label=u'1', review=u'Good case, Excellent value.'),
# Row(label=u'1', review=u'Great for the jawbone.'),
# Row(label=u'0', review=u'Tied to charger for conversations lasting more than 45 minutes.MAJOR PROBLEMS!!'),
# Row(label=u'1', review=u'The mic is great.')]

rdd3 = rdd1.map(lambda x: x.split("\t")).map(lambda x: Row(x[0],x[1])).toDF(["review","label"])
#[Row(review=u'So there is no way for me to plug it in here in the US unless I go by a converter.', label=u'0'),
# Row(review=u'Good case, Excellent value.', label=u'1'),
# Row(review=u'Great for the jawbone.', label=u'1')]
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
rdd1 = sc.textFile("/FileStore/tables/c4pzqsme1501527259399/amazon_cells_labelled.txt")
words = rdd1.flatMap(lambda line: line.split(" "))
words.take(4) #[u'So', u'there', u'is', u'no']
tuples = words.map(lambda word: (word, 1))
tuples.take(4) #[(u'So', 1), (u'there', 1), (u'is', 1), (u'no', 1)]
#sum all the counts in the tuples for each word into a new RDD counts:
counts = tuples.reduceByKey(lambda a, b:(a+b))
counts.take(4)
#[(u'magnetic', 1),
# (u'unsatisfactory\t0', 1),
# (u'four', 1),
# (u'earpiece.\t0', 2)]
counts.coalesce(1).saveAsTextFile('....')
########################################################
rdd1 = sc.textFile("allstate_train.csv")
#[u'id,cat1,cat2,
# u'1,A,B,
# u'2,A,B]
rdd2 = rdd1.map(lambda x: x.split(","))
########################################################
map(lambda c: c + "_Index", ['column1','column2']) #['column1_Index', 'column2_Index']

In [None]:
#################### filter ##########################
# find out which review has 'Good'
#rdd2 is like
#[Row(review=u'So there is no way for me to plug it in here in the US unless I go by a converter.', label=u'0'),
# Row(review=u'Good case, Excellent value.', label=u'1'),
rdd2.filter(col("review").like("%Good%")).take(2)
#[Row(review=u'Good case, Excellent value.', label=u'1'),
# Row(review=u'So Far So Good!.', label=u'1')

### createDataFrame

In [None]:
rdd1 = sc.textFile("amazon_cells_labelled.txt")
rdd2 = rdd1.map(lambda x: x.split("\t"))
rdd3 = rdd2.map(lambda x: Row(review=x[0],label=x[1]))
df=sqlContext.createDataFrame(rdd3)
# convert the RDD of Rows to a DataFrame via createDataFrame
# register this DataFrame as a Temporary Table (named as “df”) in the SQLContext
df.registerTempTable("df")
df.show()
#+-----+--------------------+
#|label|              review|
#+-----+--------------------+
#|    0|So there is no wa...|
#|    1|Good case, Excell...|
#|    1|Great for the jaw...|

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
## load table in DataBrick to create DataFrame
df = sqlContext.table("allstate_train_csv")
print '(Sample number, column number) in the train dataset =({},{})'.format(df.count(), len(df.columns))

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
## create a RDD from the train file in HDFS
rdd1 = sc.textFile("allstate_train.csv")
rdd2 = rdd1.map(lambda x: x.split(","))
header = rdd2.first() #extract header
data = rdd2.zipWithIndex().filter(lambda (row,index): index > 0).keys()
# Converting an RDD to DataFrame
df=sqlContext.createDataFrame(data,header)
## Registers this DataFrame as a temporary table using the given name
df.registerTempTable("df")
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from pyspark.sql.types import * 
l = [('Alice',2),('May',5) ]
rdd = sc.parallelize(l)
schema =  StructType([StructField ("name" , StringType(), True) , 
                      StructField("age" , IntegerType(), True)]) 
df = sqlContext.createDataFrame(rdd, schema) 
df.show()
#+-----+---+
#| name|age|
#+-----+---+
#|Alice|  2|
#|  May|  5|
#+-----+---+
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
df = sqlContext.createDataFrame([
  (0, "Hi I heard about Spark I."),
  (1, "Logistic regression models are neat.")
], ["label", "sentence"])
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
df = spark.createDataFrame([(1.0, 2.0, 3.0), (None, 4.0, 6.0)], ("column1", "column2", "column3"))
df.show()
#+-------+-------+-------+
#|column1|column2|column3|
#+-------+-------+-------+
#|    1.0|    2.0|    3.0|
#|   null|    4.0|    6.0|
#+-------+-------+-------+
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
        (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
df = sqlContext.createDataFrame(data, ["features"])
df.show()
#+---------+
#| features|
#+---------+
#|[0.0,0.0]|
#|[1.0,1.0]|
#|[9.0,8.0]|
#|[8.0,9.0]|
#+---------+
##########################################################################
pd.DataFrame(df.take(5), columns=df.columns)
df.show()
df.printSchema()
#root
# |-- label: double (nullable = true)
# |-- review: string (nullable = true)
print '(Sample number, column number) in the train dataset =({},{})'.format(df.count(), len(df.columns))

### convert DataFrame to RDD

In [None]:
df = spark.createDataFrame([(0.0, 2.0), (1.0, 3.0), (2.0, 4.0)], ("column1", "column2"))
rdd1=df.rdd.map(tuple)
rdd1.collect() #[(0.0, 2.0), (1.0, 3.0), (2.0, 4.0)]


### convert variable types

In [None]:
# convert String labels to Double type
df = df.withColumn("label", df.label.cast(DoubleType()))
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# convert all the 'cont' variables to the 'Double' type
def convertfloatColumn(dataframe, name):
    return dataframe.withColumn(name, dataframe[name].cast(DoubleType()))
float_columns = [feat for feat in df.columns if 'cont' in feat]
for f in float_columns:
    df=convertfloatColumn(df,f)
    
# convert the 'loss' variable to the 'Double' type   
df=convertfloatColumn(df,'loss')
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# convert the 'id' column to 'Integer' type
def convertIntColumn(dataframe, name):
    return dataframe.withColumn(name, dataframe[name].cast(IntegerType()))
df=convertIntColumn(df,'id')

### save and load a DataFrame file

In [None]:
df.toPandas().to_csv('mycsv.csv')
df.write.csv('mycsv.csv') # doesn't have header
df.write.csv('mycsv1.csv',header='true') # keep the header
df.write.save("df_amazon.parquet", format="parquet")

df = sqlContext.read.load("df_amazon.parquet")
rdd = sc.textFile("mycsv1.csv")

### Dataframe select

In [None]:
data=df.select(['column2', 'column2'])

### Dataframe filter

In [None]:
df.filter(df['label'] ==0).show()
#+-----+--------------------+
#|label|              review|
#+-----+--------------------+
#|  0.0|So there is no wa...|
#|  0.0|Tied to charger f...|


### Dataframe groupby

In [None]:
df.groupBy("label").count().show()
#+-----+-----+
#|label|count|
#+-----+-----+
##|  1.0|  500|
#|  0.0|  500|
#+-----+-----+
df.groupby('label').count().toPandas()

### Dataframe union and join

In [None]:
df = sqlContext.createDataFrame([
  (0, "Hi I heard about Spark I."),
  (1, "Logistic regression models are neat.")
], ["label", "sentence"])
df.unionAll(df).show()
#+-----+--------------------+
#|label|            sentence|
#+-----+--------------------+
#|    0|Hi I heard about ...|
#|    1|Logistic regressi...|
#|    0|Hi I heard about ...|
#|    1|Logistic regressi...|
#+-----+--------------------+
############################################
# Combine the training data and the test data as “train_test” DataFrame
train_test=df.unionAll(df_test)
print '(Sample number, column number) in the train_test dataset =({},{})'.format(train_test.count(),len(train_test.columns))
#############################################
df1 = spark.createDataFrame([(1.0, 10.0), (2.0, 20.0), (3.0, 30.0)], ("column1", "column2"))
#+-------+-------+
#|column1|column2|
#+-------+-------+
#|    1.0|   10.0|
#|    2.0|   20.0|
#|    3.0|   30.0|
#+-------+-------+
df2 = spark.createDataFrame([(1.0, 100.0), (2.0, 200.0), (2.0, 300.0)], ("column1", "column3"))
#+-------+-------+
#|column1|column3|
#+-------+-------+
#|    1.0|  100.0|
#|    2.0|  200.0|
#|    2.0|  300.0|
#+-------+-------+
df1.join(df2, "column1").show()               
#+-------+-------+-------+
#|column1|column2|column3|
#+-------+-------+-------+
#|    1.0|   10.0|  100.0|
#|    2.0|   20.0|  200.0|
#|    2.0|   20.0|  300.0|
#+-------+-------+-------+

### create a new column in DataFrame

In [None]:
# Create a new column of log(loss +200) named as ‘label’, and a new column of all zeros named 'all_zero'
df=df.withColumn('label', log(df.loss+200)).withColumn('new_id', df.loss *0.0)


### other functions in DataFrame

In [None]:
df = spark.createDataFrame([(1.0, 2.0, 3.0), (None, 4.0, 6.0)], ("column1", "column2", "column3"))
df.show()
#+-------+-------+-------+
#|column1|column2|column3|
#+-------+-------+-------+
#|    1.0|    2.0|    3.0|
#|   null|    4.0|    6.0|
#+-------+-------+-------+
df.drop('column1')
df.describe().toPandas().transpose() 
#             0     1                   2    3    4
#summary  count  mean              stddev  min  max
#column1      1   1.0                 NaN  1.0  1.0
#column2      2   3.0  1.4142135623730951  2.0  4.0
#column3      2   4.5  2.1213203435596424  3.0  6.0
df.na.drop().show() #drop all the rows missing a value in any column
df.na.drop(subset=['column1']).show()  #drop the rows with missing values in the teamlevel column
#+-------+-------+-------+
#|column1|column2|column3|
#+-------+-------+-------+
#|    1.0|    2.0|    3.0|
#+-------+-------+-------+
df.na.fill(10, ['column1']).show()        #replace the missing values with 10 in column 1 
#+-------+-------+-------+
#|column1|column2|column3|
#+-------+-------+-------+
#|    1.0|    2.0|    3.0|
#|   10.0|    4.0|    6.0|
#+-------+-------+-------+
df.stat.corr('column2', 'column3') #1.0  #Compute correlation between two columns
df.select(mean('column2'), sum('column2')).show()   ##Calculate average and sum
#+------------+------------+
#|avg(column2)|sum(column2)|
#+------------+------------+
#|         3.0|         6.0|
#+------------+------------+
df.agg(avg('column2')).show()
#+------------+
#|avg(column2)|
#+------------+
#|         3.0|
#+------------+

### data transformation

In [None]:
# Create a new column of log(loss +200) named as ‘label’.
shift=200
df=df.withColumn('label', log(df.loss+shift))

# Calculate skewness before data transform
float_columns = [feat for feat in df.columns if 'cont' in feat]
df_new=df.select(float_columns).toPandas()
skew_values_before=[]
for f in float_columns:
    skew = df_new[f].skew()
    skew_values_before.append(skew)
############# Data transformation ######################
# 'cont1' : Square root transformation             
# 'cont2' : Unchanged                             
# 'cont3' : X^ 1/3 transformation
# 'cont4' : Log transformation                    
# 'cont5' : Square root transformation

df = df.withColumn("cont1_trans", sqrt(df.cont1)).\
withColumn("cont2_trans", df.cont2).\
withColumn("cont3_trans", pow(df.cont4,0.3333333)).\
withColumn("cont4_trans", log(df.cont5 + 0.000000001)).\
withColumn("cont5_trans", sqrt(df.cont6))
# Calculate skewness after data transform
float_trans = [feat for feat in df.columns if 'trans' in feat]
df_new=df.select(float_trans).toPandas()
skew_values_after=[]
for f in float_trans:
    skew = df_new[f].skew()
    skew_values_after.append(skew)

# Plot skewness distribution of continuous features before/after data transformation
fig, ax = plt.subplots(nrows=1,ncols=1,figsize=(12,5), facecolor='white')
ax.plot(np.arange(0, len(float_columns)),skew_values_before,marker='D',linewidth=3,label='Before data transformation')
ax.plot(np.arange(0, len(float_columns)),skew_values_after,marker='D',linewidth=3, color='r',label='After data transformation')
ax.set_xticks(np.arange(0, len(float_columns)))
ax.set_xticklabels(float_columns)
ax.grid(color='black', linestyle='--', linewidth=1)
ax.set_title('Skewness of continuous features before/after data transformation')
ax.set_ylabel('Skewness')
ax.set_ylim(-0.5,1.2)
ax.legend(loc='best')
display(fig)

###  convert the distinct labels in the input dataset to index values

In [None]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df)

### Feature extraction

In [None]:
# tokenizer 
# the RegexTokenizer function to break a review sentence into individual words. 
#The parameter “pattern” ("\W") is used to remove all non-word characters, such as , !. .
tokenizer = RegexTokenizer(inputCol="review", outputCol="words", pattern="\W")##'\w' remove none-word letters
df_tokenized = tokenizer.transform(df)

tokenizer = Tokenizer(inputCol="review", outputCol="words")
df_tokenized = tokenizer.transform(df)
#+-----+--------------------+--------------------+
#|label|              review|               words|
#+-----+--------------------+--------------------+
#|  0.0|So there is no wa...|[so, there, is, n...|
#|  1.0|Good case, Excell...|[good, case,, exc...|
#################################################################################
# remove stop words
add_stopwords = ["http","https","amp","rt","t","c","can"] # standard stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df_removed = remover.transform(df_tokenized)

#################################################################################
# Convert to TF words vector
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
df_TF = hashingTF.transform(df_removed)
# Convert to TF*IDF words vector
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(df_TF)
df_idf = idfModel.transform(df_TF)
for features_label in df_idf.select("features", "label").take(3):
    print(features_label)
#Row(features=SparseVector(262144, {21872: 5.8101, 52801: 6.2156, 61625: 5.5225, 113100: 4.4238, 172477: 4.9628, 199255: 4.8293}), label=0.0)
#Row(features=SparseVector(262144, {113432: 2.5913, 117481: 3.5766, 192310: 3.5076, 206496: 5.117}), label=1.0)
#Row(features=SparseVector(262144, {138356: 2.3238, 138642: 5.5225}), label=1.0)
######################################################################################################
df = spark.createDataFrame([(0.0, 2.0), (1.0, 3.0), (2.0, 4.0)], ("column1", "column2"))
#+-------+-------+
#|column1|column2|
#+-------+-------+
#|    0.0|    2.0|
#|    1.0|    3.0|
#|    2.0|    4.0|
#+-------+-------+
bi=Binarizer(threshold=0.0, inputCol="column1", outputCol="features")
bi.transform(df).show()
#If the value is more than 0, the categorical value to be 1, otherwise the categorical value should be 0
+-------+-------+--------+
|column1|column2|features|
+-------+-------+--------+
|    0.0|    2.0|     0.0|
|    1.0|    3.0|     1.0|
|    2.0|    4.0|     1.0|
+-------+-------+--------+

### StandardScaler and PCA for numerical features

In [None]:
# Plot Cumulative explained variance Vs. Number of PCA components for numerical features
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
float_columns = [feat for feat in df.columns if 'cont' in feat]
data=df.select(float_columns).toPandas()
z_scaler = StandardScaler()
z_data = z_scaler.fit_transform(data)
pca_trafo = PCA().fit(z_data);

fig, ax = plt.subplots(nrows=1,ncols=1,figsize=(12,5), facecolor='white')
ax.plot(pca_trafo.explained_variance_ratio_.cumsum(),'--o',linewidth=3)
ax.hlines(y=0.99, xmin=0, xmax=14, lw=2,color='r')
ax.set_xlabel('Number of PCA components')
ax.set_ylabel('Cumulative explained variance')
ax.grid(color='black', linestyle='--', linewidth=1)
display(fig)
########################################################################
# Create a vector column of transformed numerical variables as the input of a standard scaler
# combine all the continuous features into a single vector column 
vecAssembler = VectorAssembler(inputCols=float_columns, outputCol="numerical_features")
#df=vecAssembler.transform(df)

# normalizes each feature in “numerical_features” to have unit standard deviation and zero mean
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="numerical_features", outputCol="numerical_scaledfeatures",withStd=True, withMean=True)
#scalerModel = scaler.fit(df)
#df = scalerModel.transform(df)

# The scaled features are feed into PCA 
#The new features obtained fron PCA is called as “pcaFeatures_numerical".
from pyspark.ml.feature import PCA
pca = PCA(k=9, inputCol="numerical_scaledfeatures", outputCol="pcaFeatures_numerical")
#model = pca.fit(df)
#df = model.transform(df)
#df.select('pcaFeatures').take(2)

#  Wrap VectorAssembler, StandardScaler, and PCA using pipelines
pipeline = Pipeline(stages=[vecAssembler,scaler,pca])
model = pipeline.fit(df)
df = model.transform(df)
df.select('pcaFeatures_numerical').take(2)
#[Row(pcaFeatures=DenseVector([-2.5706, 1.9145, -1.7275, 0.5826, 0.5813, -0.4001, -0.9643, -1.0756, -1.3186])),
# Row(pcaFeatures=DenseVector([0.875, 0.4521, 2.2523, -0.7282, -0.0405, -0.6469, 0.9521, -0.3714, 0.2652]))]


### label encoding and PCA for categorical features 

In [None]:
######################### 2-label categorical variables ####################################
categorical_columns = [feat for feat in df.columns if 'cat' in feat]
data=df.select(categorical_columns).toPandas()
for f in categorical_columns:
    print '{} unique number: {}'.format(f, data[f].nunique())
    
#  Transform the labels A and B into 0 and 1 numbers for all 2-labels categorical variables ('cat1' ~ 'cat72')
for f in categorical_columns[:72]:
    stringIndexer = StringIndexer(inputCol=f, outputCol=f+"_Index")
    model = stringIndexer.fit(df)
    df = model.transform(df)
encoded_2labels_features=map(lambda c: c + "_Index", categorical_columns[:72])
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Apply PCA using Python Sklearn to find suitable PCA n_components
from sklearn.decomposition import PCA
data=train_test.select(encoded_2labels_features).toPandas()
pca_trafo = PCA().fit(data);
ax.plot(pca_trafo.explained_variance_ratio_.cumsum(),'--o',linewidth=3)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Create a vector column of encoded 2-lables categorical variables as the input of PCA 
vecAssembler_2labels = VectorAssembler(inputCols=encoded_2labels_features, outputCol="vec_encoded_2labels_features")

# Apply PCA 
from pyspark.ml.feature import PCA
pca_2labels = PCA(k=45, inputCol="vec_encoded_2labels_features", outputCol="pcaFeatures_2labels")

#  Wrap VectorAssembler, and PCA using pipelines
pipeline = Pipeline(stages=[vecAssembler_2labels,pca_2labels])
model = pipeline.fit(df)
df = model.transform(df)
df.select('pcaFeatures_2labels').take(2)

######################### muilti-label categorical variables ####################################
data=df.select(categorical_columns[72:]).toPandas()
from scipy.sparse import csr_matrix, hstack
# Use one-hot encoding to transform their categorical labels into binary vectors for multi-label 
# categorical variables (‘cat73’ ~’cat116) 
one_hot_features = []
for f in categorical_columns[72:]:
    dummy = pd.get_dummies(data[f].astype('category'))
    dummy = csr_matrix(dummy)
    one_hot_features.append(dummy)
one_hot_features=hstack(one_hot_features, format = 'csr').toarray()
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Apply PCA using Python Sklearn to find suitable PCA n_components
from sklearn.decomposition import PCA
pca_trafo = PCA().fit(one_hot_features)
# Plot Cumulative explained variance Vs. Number of PCA components for multi-labels categorical variable
ax.plot(pca_trafo.explained_variance_ratio_.cumsum(),'--o',linewidth=3)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for f in categorical_columns[72:]:
    # Encode the string labels into numerical labels
    stringIndexer = StringIndexer(inputCol=f, outputCol=f+"_Index")
    model = stringIndexer.fit(df)
    df = model.transform(df)
    # Maps a column of numerical labels to a column of binary vectors 
    encoder = OneHotEncoder(dropLast=False, inputCol=f+"_Index", outputCol=f+"_onehot")
    df = encoder.transform(df)
# Assemble all the encoded vector columns of the multi-label categorical variables into a single vector columns 
onehot_features=map(lambda c: c + "_onehot", categorical_columns[72:])
vecAssembler_onehot = VectorAssembler(inputCols=onehot_features, outputCol="vec_onehot_features")
df=vecAssembler_onehot.transform(df)
# Apply PCA 
from pyspark.ml.feature import PCA
pca_multilabels = PCA(k=300, inputCol="vec_onehot_features", outputCol="pcaFeatures_multilabels")
model = pca_multilabels.fit(df)
df = model.transform(df)
df.select('pcaFeatures_multilabels').take(1)

### Create feature columns

In [None]:
#the features created by PCA on the continuous input variables, 2-label and multi-label categorical variables 
#are combined together to become one vector column by ‘VectorAssembler’
vecAssembler_total = VectorAssembler(inputCols=['pcaFeatures_numerical','pcaFeatures_2labels','pcaFeatures_multilabels'], 
                                     outputCol="features")
df=vecAssembler_total.transform(df)
print 'The feature number in "features" column of train_test DataFrame = {}'.format(df.select('features').
                                                                                    rdd.map(lambda raw : len(raw[0])).take(1))
df('features').take(1)

### Split, cache data

In [None]:
# Split data aproximately into training (80%) and test (20%)
(train, test)=df.randomSplit([0.8,0.2], seed = 0)
# Cache the train and test data in-memory because they will be called more than once during model building. 
train = train.cache()
test = test.cache()
print 'Sample number in the train set : {}'.format(train.count())
print 'Sample number in the test set : {}'.format(test.count())
train.groupby('label').count().toPandas()
##################################################################
# Split the “train_test” dataframe into the ‘X’ dataframe (referred to the training set) and 
#the “X_test” dataframe (refered to the test set),
X=train_test.filter(train_test.loss!=0).select('features', 'label')
X_test=train_test.filter(train_test.loss==0).select('features')
print 'Sample number in X = {}'.format(X.count())
print 'Sample number in X_test = {}'.format(X_test.count())
# Split the ‘X’ dataframe into the ‘train’ (80%) and ‘validation’ (20%) sets 
(train, validation)=X.randomSplit([0.8,0.2], seed = 0)
# Cache the train and validation data in-memory 
train = train.cache() 
validation = validation.cache()
print 'Sample number in the "train" set : {}'.format(train.count())
print 'Sample number in the "validation" set : {}'.format(validation.count())

In [None]:
# store RDD in memory
df = df.cache()
df = df.persist() 
#delete caching, release memory
df.unpersist() 
#Removes all cached tables from the in-memory cache.
sqlContext.clearCache()

### grid search model parameters

In [None]:
########################### classification ##################################
def grid_search(p1,p2,p3,p4):
    lr = LogisticRegression()
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, lr])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(lr.regParam, [p2])
                 .addGrid(lr.elasticNetParam, [p3])
                 .addGrid(lr.maxIter, [p4])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print 'average cross-validation accuracy = {}'.format(average_score[0])
    return average_score[0]

########################### regression ##################################
def cross_valid(estimator, paramGrid, train, validation):
    evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
    #"rmse" (default): root mean squared error - "mse": mean squared error - "r2": R^2^ metric - "mae": mean absolute error 
    crossval = CrossValidator(estimator=estimator,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    average_score = cvModel.avgMetrics
    print 'average cross-validation MAE = {}'.format(average_score[0])
    ########  Make predictions on on the validation data
    prediction = cvModel.transform(validation)
    # since lable is log(loss+200), we need to transform back
    prediction=prediction.withColumn("loss", exp(prediction.label) - 200).
    withColumn("loss_prediction", exp(prediction.prediction) - 200)
    ########  Model evaluation
    scores=RegressionMetrics(prediction.select("label", "prediction").rdd)
    print 'Mean absolute error in the validation data = {}'.format(scores.meanAbsoluteError)
    scores=RegressionMetrics(prediction.select("loss", "loss_prediction").rdd)
    print 'Mean absolute error in the validation data after inverse log transformation = {}'.format(scores.meanAbsoluteError)
    return average_score[0], prediction
def grid_search(p1,p2,p3,p4):
    lr = LinearRegression()
    # Create ParamGrid for Cross Validation 
    paramGrid = (ParamGridBuilder()
                 .addGrid(lr.regParam, [p1])#(default: 0.0)
                 .addGrid(lr.elasticNetParam, [p2]) 
                 .addGrid(lr.maxIter, [p3]) #(default: 100)
                 .addGrid(lr.fitIntercept, [p4])#False, True
                 .build())
    average_score, _ = cross_valid(lr, paramGrid, train, validation)
    return average_score
##################################loop over all parameter combinations ####################
score=0.0
for p1 in [45000,50000,55000]:
    for p2 in [0.09,0.10,0.11]:
        for p3 in [0.09,0.10,0.11]:
            for p4 in [9,10,11]:
                t0 = time()
                print '(numFeatures,regParam,elasticNetParam,maxIter)=({},{},{},{})'.format(p1,p2,p3,p4)
                average_score=grid_search(p1,p2,p3,p4)
                tt = time() - t0
                print "Classifier trained in {} seconds".format(round(tt,3))
                if average_score > score:
                    print '################ Best score ######################'
                    params=(p1,p2,p3,p4)
                    score=average_score
print 'Best score is {} at params ={}'.format(score, params)

### Data modeling

In [None]:
##################### for classification #######################
# trained by a logistic regression 
lr = LogisticRegression()
print lr.explainParams()
# Build a pipeline
pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, lr])

# Create ParamGrid for Cross Validation 
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [50000])
             .addGrid(lr.regParam, [0.10])
             .addGrid(lr.elasticNetParam, [0.10])
             .addGrid(lr.maxIter, [10])
             .build())

evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,#BinaryClassificationEvaluator(), the default metric is areaUnderROC
                          numFolds=4)
    
########  Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)
average_score = cvModel.avgMetrics
print 'best average cross-validation accuracy = {}'.format(average_score[0])

########  Make predictions on on the test data
prediction = cvModel.transform(test)
#The prediction DataFrame will be created with the new output columns of 
#"rawPrediction", "probability", and "prediction". 
#The “RawPrediction” means raw prediction probability for each possible label. 
#This is a measure of confidence in each possible label. A larger value indicates more confidence for that label. 
#The “Probability” estimate the probability of each class, based on a given raw prediction. 
#The "prediction" gives a prediction label for each data instance
prediction.printSchema()
#root
# |-- label: double (nullable = true)
# |-- review: string (nullable = true)
# |-- indexedLabel: double (nullable = true)
# |-- words: array (nullable = true)
# |    |-- element: string (containsNull = true)
# |-- filtered: array (nullable = true)
# |    |-- element: string (containsNull = true)
# |-- rawFeatures: vector (nullable = true)
# |-- features: vector (nullable = true)
# |-- rawPrediction: vector (nullable = true)
# |-- probability: vector (nullable = true)
# |-- prediction: double (nullable = true)

In [None]:
##################### for classification #######################
# trained by a logistic regression 
lr = LogisticRegression()
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.10]) 
             .addGrid(lr.elasticNetParam, [0.10])
             .addGrid(lr.maxIter, [10])
             .build())
#regParam (regularization parameter), 
#elasticNetParam (the ElasticNet mixing parameter), 
#maxIter (max number of iterations).

##############################################
# trained by a Naïve Bayes 
nb = NaiveBayes()
# Create ParamGrid for Cross Validation 
paramGrid = (ParamGridBuilder()
             .addGrid(nb.smoothing, [1.0])
             .build())
#smoothing (smoothing parameter) 0 ~ 1 

##############################################
# trained by a Decision Tree 
dt = DecisionTreeClassifier(labelCol="indexedLabel",impurity="entropy")
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [25])
             .addGrid(dt.minInstancesPerNode, [4])
             .build())
#maxDepth (maximum depth of the tree), 
#minInstancesPerNode (minimum number of instances each child must have after split for a Decision Tree classifier). 

##############################################
rf = RandomForestClassifier(labelCol="indexedLabel",impurity="entropy", seed=5043)
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [31])
             .addGrid(rf.maxDepth, [29])
             .addGrid(rf.minInstancesPerNode, [1])
             .build())
#numTrees (number of trees to train), 
#maxDepth (maximum depth of the tree), 
#minInstancesPerNode (minimum number of instances each child must have after split for a Random Forest classifier). 

##############################################
# trained by a Gradient Boosted Tree 
gbt = GBTClassifier(labelCol="indexedLabel")
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxIter, [25]) #(default: 20)
             .addGrid(gbt.maxDepth, [19])
             .addGrid(gbt.minInstancesPerNode, [2])
             .build())
#maxIter (max number of iterations), 
#maxDepth (maximum depth of the tree for a Gradient Boosted Tree classifier), 
#minInstancesPerNode (minimum number of instances each child must have after split for a Gradient Boosted Tree classifier). 

##############################################

In [None]:
##################### for regression #######################
lr = LinearRegression()
#lr.explainParams()
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.0001])#(default: 0.0)
             .addGrid(lr.elasticNetParam, [1.0]) 
             .addGrid(lr.maxIter, [1000]) #(default: 100)
             .build())
average_score, prediction = cross_valid(lr, paramGrid, train, validation)

In [None]:
##################### for regression #######################
lr = LinearRegression()
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.0001])#(default: 0.0)
             .addGrid(lr.elasticNetParam, [1.0]) 
             .addGrid(lr.maxIter, [1000]) #(default: 100)
             .build())
#regParam (regularization parameter), 
#elasticNetParam (ElasticNet mixing parameter in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. 
#For alpha = 1, it is an L1 penalty.),
#maxIter (max number of iterationsfor 
##############################################
rf = RandomForestRegressor(seed=5043)
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [2])
             .addGrid(rf.maxDepth, [2])
             .addGrid(rf.minInstancesPerNode, [2]).build())
#numTrees (number of trees to train), 
#maxDepth (maximum depth of a tree), 
#minInstancesPerNode (minimum number of instances each child must have after split). 
##############################################
gbt = GBTRegressor()
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxIter, [2]) 
             .addGrid(gbt.maxDepth, [2])
             .addGrid(gbt.minInstancesPerNode, [1])
             .build())
#maxIter (max number of iterations), 
#maxDepth (maximum depth of a tree), 
#minInstancesPerNode (minimum number of instances each child must have after split). 

In [None]:
##################### for clustering #######################
kmeans = KMeans(k=2,seed=1)
model = kmeans.fit(train) # train DataFrame has "features" column
centers = model.clusterCenters()
len(centers) #2 # cluster center number
prediction = cvModel.transform(test)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(train_new)
print("Within Set Sum of Squared Errors = " + str(wssse))

### Metrics

In [None]:
##################### for classification #######################
######## Calculate accuracy of the prediction of the test data
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy_score=evaluator.evaluate(prediction)
# another way to calculate accuracy 
correct=prediction.filter(prediction['label']== prediction['prediction']).select("label","prediction")
accuracy_score = correct.count() / float(test.count())  
print 'Accuracy in the test data = {}'.format(accuracy_score)
   
######## calculate F1 score of the prediction of the test data
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score=evaluator.evaluate(prediction)
print 'F1 score in the test data = {}'.format(f1_score) 

######## Calculate area under ROC for the prediction of the test data
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
ROC_score=evaluator.evaluate(prediction)
print 'areaUnderROC in the test data = {}'.format(ROC_score)
#print "areaUnderROC in the train data = %g" % evaluator.evaluate(model.transform(train))  
#print "areaUnderROC in the test data = %g" % evaluator.evaluate(model.transform(test))

######## Print classification_report
prediction_and_labels=prediction.select("label","prediction")
prediction_and_labels.collect()
#[Row(label=0.0, prediction=0.0),
# Row(label=0.0, prediction=0.0),....]

y_true = []
y_pred = []
for x in prediction_and_labels.collect():
    #ex:x is Row(label=0.0, prediction=0.0), xx=[0.0, 0.0]
    xx = list(x)
    try:
        tt = int(xx[1])
        pp = int(xx[0])
        y_true.append(tt)
        y_pred.append(pp)
    except:
        continue

target_names = ['neg 0', 'pos 1']
from sklearn.metrics import classification_report
print classification_report(y_true, y_pred, target_names=target_names)

In [None]:
##################### for regression #######################
scores=RegressionMetrics(prediction.select("label", "prediction").rdd)
print 'Mean absolute error in the validation data = {}'.format(scores.meanAbsoluteError)

### save the model

In [None]:
cvModel.write.overwrite.save("model")

### Plot

In [None]:
import matplotlib.pyplot as plt
classifier_names=['Logistic_Regression', 'Naive_Bayes', 'Decision_Tree', 'Random_Forest', 'Gradient_Boosted_Tree']
time=[6.41,2.893,141,133,3179]
accuracy=[0.8385,0.8177,0.78125,0.8125,0.8229]
fig, ax = plt.subplots(nrows=1,ncols=1,figsize=(5,5), facecolor='white')
ax.barh(np.arange(0, 5),time)
ax.set_yticks(np.arange(0.5, 5.5))
ax.set_yticklabels(classifier_names)
ax.grid(color='b', linestyle='--', linewidth=1)
ax.set_title('Model training time')
ax.set_xlabel('Time (sec)')
#ax.set_xscale('log')
display(fig)
##########################################################################
# Plot loss distribution, log(loss) distribution
df_loss=df.select('loss').toPandas()
fig, ax = plt.subplots(nrows=1,ncols=2, figsize=(10,5))
F=sns.distplot(df_loss.loss.values, kde=False, bins=1000, ax=ax[0])
F.set(title='Loss distribution. Skewness={:.2f}'.format(df_loss.loss.skew()))
F.set(xlim=(0,20000))
F1=sns.distplot(np.log(df_loss.loss.values), kde=False, bins=100,ax=ax[1])
F1.set(title='log(loss) distribution. Skewness={:.2f}'.format(np.log(df_loss.loss).skew()))
F1.set(xlim=(4,12))
display(fig)
##########################################################################
## plot a correlation heatmap of the target valuable and the continuous input variables
float_columns = [feat for feat in df.columns if 'cont' in feat]
data=df.select(float_columns+['label']).toPandas()
df_corr = data.corr(method='pearson', min_periods=100)
f, ax = plt.subplots(figsize=(10, 10))
k=14 # the number of the continuous input variables
df_corr = np.abs(df_corr)
cols = df_corr.nlargest(k, 'label')['label'].index
cm=data[cols].corr(method='pearson', min_periods=100)
cm = np.abs(cm)
sns.set(font_scale=1.25)
hm = sns.heatmap(cm, cbar=True, annot=True, square=True, fmt='.2f', annot_kws={'size': 8}, \
                 yticklabels=cols.values, xticklabels=cols.values)
display(f)