In [None]:
from pyspark import SparkContext
from pyspark.mllib.classification import LogisticRegressionWithLBFGS,SVMWithSGD,LogisticRegressionWithSGD, NaiveBayes
from pyspark.mllib.regression import LabeledPoint,LinearRegressionWithSGD,RidgeRegressionWithSGD,LassoWithSGD
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
from pyspark.mllib.feature import StandardScaler,ChiSqSelector,Normalizer,PCA
from pyspark.mllib.evaluation import RegressionMetrics, BinaryClassificationMetrics
from pyspark.mllib.stat import Statistics, MultivariateStatisticalSummary
from pyspark.mllib.util import MLUtils
import time

def importRawData(sc, filePath):
    """
    :param sc: Spark Context
    :param filePath: path to data .csv file
    :return: RDD of (LabeledPoint, index)
    """
    rdd = sc.textFile(filePath)
    # index was keep for this dataset, we need index to perform partition
    # spark DOES NOT gurantee tranformation and action execute order for RDD[Vector]
    return rdd.map(lambda line: line.split(",")) \
                .map(lambda array: [float(n) for n in array]) \
                .zipWithIndex()

def exploreData(rawData, corrThreshold = 0.05, colinearThreshold = 0.8):
    """
    this function is use to print out some basic statistic of data
    :param rawData: RDD[Vector] of raw data
    :param corrThreshold: correlation threshold for print
    :param colinearThreshold: colinear threshold for print
    :return: local matrix of correliation matrix, corr[0] as label, rest are features
    """
    print "=============================================="
    print "EXPLORE DATA"
    print "=============================================="
    matrix = rawData.map(lambda t:t[0])
    trait = Statistics.colStats(matrix)
    print "Features:"
    print "Format:\t\t[% 10s\t% 12s\t% 10s\t% 12s\t% 10s]" % ("mean","variance","numNonzeors","max","min")
    for i in range(1, len(trait.mean())):
        print "Feature %s:\t[ % 10.2f\t% 12.2f\t% 10d\t% 12.2f\t% 10.2f ] " % (i+1, trait.mean()[i], trait.variance()[i],\
                                                    trait.numNonzeros()[i], trait.max()[i], trait.min()[i])
    print "Label:"
    print "Label :\t[ % 10.2f\t% 12.2f\t% 10d\t% 12.2f\t% 10.2f ] " % (trait.mean()[0], trait.variance()[0], \
                                            trait.numNonzeros()[0], trait.max()[0], trait.min()[0])
    corr = Statistics.corr(matrix, method="pearson")
    print "Correlation between Features and Label (only show corr > %s): " % corrThreshold
    labelCorr = zip([row[0] for row in corr[1:] if abs(row[0])>=corrThreshold], range(1, len(corr)))
    labelCorr.sort(key=lambda t:abs(t[0]), reverse=True)
    for corrScore, i in labelCorr:
        print "Feature %s: % 6.4f" % (i, corrScore)
    print "Correlation between Features (only show colinear > %s): " % colinearThreshold
    for i, row in enumerate(corr):
        for j, r in enumerate(row):
            if (i!=0 and j!=0 and i<j and abs(r)>colinearThreshold):
                print "Feature %s and Feature %s have r=%.4f" % (i, j, r)
    return corr

def featureEngineering(rawData, corrSelection = None, zNorm = True, l2Norm = True, categorical = False, topFeature = 10):
    """
    this function is to provide feature engineering and transformation
    including partition, normalization, feature selection, dimension reduction
    :param rawData: RDD[Vector] of raw data
    :param corrSelection: local matrix of correliation matrix, if provided will perform CFS
    :param zNorm: boolean of whether to perform Z normalization or not
    :param l2Norm: boolean of whether to perform L2 normalization or not
    :param categorical: boolean of whether to perform chi square feature selection or not
    :param topFeature: select top features if using chi square selection
    :return: tuple of (RDD[LabeledPoint] trainingData, RDD[LabeledPoint] validationData)
    """
    print "=============================================="
    print "FEATURE ENGINEERING"
    print "=============================================="
    print "partitioning..."
    # beware this partitioning is HARD CODED!
    # because it was suggested by data set creator
    # first 463715 used as trainning, last 51630 used as validation
    tFeatures = rawData.filter(lambda a:a[1]<463715).map(lambda a:a[0][1:])
    vFeatures = rawData.filter(lambda a:a[1]>=463715).map(lambda a:a[0][1:])
    tLabel = rawData.filter(lambda a:a[1]<463715).map(lambda a:a[0][0])
    vLabel = rawData.filter(lambda a:a[1]>=463715).map(lambda a:a[0][0])
    print "Normalization and Scaling... "
    if(zNorm):
        zN = StandardScaler(withMean=True, withStd=True).fit(tFeatures)
        tFeatures = zN.transform(tFeatures)
        vFeatures = zN.transform(vFeatures)
    if(l2Norm):
        l2N = Normalizer()
        tFeatures = l2N.transform(tFeatures)
        vFeatures = l2N.transform(vFeatures)
    print "Feature Selection... "
    # only categorical value for classification problem could use chi square selector
    # otherwise, use correlation based selector instead
    if categorical or (corrSelection is None):
        selector = ChiSqSelector(topFeature).fit(tFeatures)
        tFeatures = selector.transform(tFeatures)
        vFeatures = selector.transform(vFeatures)
    else:
        bestFeatures = cfs(corrSelection)
        tFeatures = tFeatures.map(lambda a:[a[n-1] for n in bestFeatures])
        vFeatures = vFeatures.map(lambda a:[a[n-1] for n in bestFeatures])
    featureCount = len(tFeatures.first())
    print "Selected %s Features: " % featureCount
    print "Dimension Reduction..."
    # PCA dimension reduction = EVD of cov matrix = SVD of data matrix
    # spark use EVD implement, hence we need to calculate eigen vector ourself to decide cutoff
    # cutoff point the reduced dimension could represent 90% of original total variance
    selector = PCA(len(bestFeatures)).fit(tFeatures.map(lambda a: LabeledPoint(1,a).features))
    temp = selector.transform(tFeatures)
    eigen = eigenvalues(temp)
    print "PCA Eigen Vector: %s" % eigen
    accVarPor = [sum(eigen[:i+1])/sum(eigen) for i in range(len(eigen))]
    print "Accumulate Variance Porpotion:"
    reduction = 0
    for i in range(len(accVarPor)):
        print "%s Dimensions:  %.2f%%" % (i+1, accVarPor[i]*100)
        if(reduction==0 and accVarPor[i]>0.9): reduction = i+1
    print "Reduce to %s dimensions..." % reduction
    selector = PCA(reduction).fit(tFeatures.map(lambda a: LabeledPoint(1,a).features))
    return (tLabel.zip(tFeatures).map(lambda lp:LabeledPoint(lp[0], lp[1])).cache(), \
            vLabel.zip(vFeatures).map(lambda lp:LabeledPoint(lp[0], lp[1])).cache())

def cfs(corr):
    """
    helper function of correlation feature selection, implemented in greedy algorithm O(n^2logn)
    :param corr: local matrix of correliation matrix
    :return: local list of best k features, list element as feature index (1 based)
    """
    features = range(1,len(corr))
    bestK = []
    queue = []
    merit = -1
    sumL = 0
    sumF = 0
    print "Greedy Correlation Feature Selection (CFS)"
    # the stop condition of this algorithm is we reach maximum merit
    # i.e. merit start to decrease as we include more features
    # because of the increase correlation between features-to-features outweight featur-to-labels'
    # that's a bad thing to avoid, hence we stop there
    while(len(features)>0):
        for f in features:
            bestK.append(f)
            tempL = sumL+abs(corr[0][f])
            tempF = sumF
            for cur in bestK:
                if cur!=f:
                    tempF += abs(corr[cur][f])
            queue.append((tempL/((len(bestK)+1+2*tempF)**0.5), f, tempL, tempF))
            bestK.remove(f)
        queue.sort(key=lambda e:e[0], reverse=True)
        if(queue[0][0]<=merit):
            break
        merit = queue[0][0]
        bestK.append(queue[0][1])
        features.remove(queue[0][1])
        sumL = queue[0][2]
        sumF = queue[0][3]
        print "k=%s\tHighest Merit=%.4f\tSelected:%s" % (len(bestK),merit,bestK)
    return bestK

def eigenvalues(data):
    """
    helper function of calculate eigen vector of PCA, to determine how many dimension after reduction
    :param data: RDD[Vector] of data after PCA transformation
    :return: local list of eigen vector based on input data
    """
    # only eigen value is calculated, not the whole covariance because that would be too slow
    count = data.count()
    miu = data.reduce(lambda a,b:[a[i]+b[i] for i in range(len(a))])
    miu = map(lambda s:s/count, miu)
    eigen = data.map(lambda a:[(a[i]-miu[i])**2 for i in range(len(a))]) \
                .reduce(lambda a,b:[a[i]+b[i] for i in range(len(a))])
    eigen = map(lambda s:s/count, eigen)
    return eigen

def selectClassificationModel(sc, trainingData, validationData):
    """
    wrapper function to evaluate and select all the classification models
    :param sc: spark context
    :param trainingData: RDD[LabeledPoint] of training data
    :param validationData: RDD[LabeledPoint] of validation data
    :return: None
    """
    print "=============================================="
    print "CLASSIFICATION"
    print "=============================================="
    classificationModels = [
                                (SVMWithSGD, {"intercept":True, "regType":None}), 
                                (SVMWithSGD, {"intercept":True, "regType":"l1"}), 
                                (SVMWithSGD, {"intercept":True, "regType":"l2"}), 
                                (LogisticRegressionWithLBFGS, {"intercept":True, "regType":"l1"}), 
                                (LogisticRegressionWithLBFGS, {"intercept":True, "regType":"l2"}), 
                                (LogisticRegressionWithSGD, {"intercept":True, "regType":None}), 
                                (LogisticRegressionWithSGD, {"intercept":True, "regType":"l1"}), 
                                (LogisticRegressionWithSGD, {"intercept":True, "regType":"l2"})
                           ]
    for modelClass, kwargs in classificationModels:
        trainClassificationModel(sc, trainingData, validationData, modelClass, **kwargs)
    #GBT is waaaaaay too slow for this dataset
    classificationModels = [
                                (DecisionTree, {"numClasses":2,"categoricalFeaturesInfo":{},"minInstancesPerNode":100 ,"impurity":"gini"}), 
                                (DecisionTree, {"numClasses":2,"categoricalFeaturesInfo":{},"minInstancesPerNode":100 , "impurity":"entropy"}), 
                                (RandomForest, {"numClasses":2,"categoricalFeaturesInfo":{},"numTrees":20, "impurity":"gini"}), 
                                (RandomForest, {"numClasses":2,"categoricalFeaturesInfo":{},"numTrees":20, "impurity":"entropy"}) 
                           ]
    for modelClass, kwargs in classificationModels:
        trainClassificationTreeModel(sc, trainingData, validationData, modelClass, **kwargs)

def trainClassificationModel(sc, trainingData, validationData, modelClass, **kwargs):
    """
    train classification models for NOT-TREE based model
    :param sc: spark context
    :param trainingData: RDD[LabeledPoint] of training data
    :param validationData: RDD[LabeledPoint] of validation data
    :modelClass: model CLASS that use to train
    :kwargs: key-value paired arguments for modelClass, would be passes in directly
    :return: None
    """
    print "Classification Model: %s %s" % (modelClass.__name__, kwargs)
    startTime = time.time()
    trainingData = trainingData \
                    .map(lambda lp:LabeledPoint(1, lp.features) if lp.label>=2000 else LabeledPoint(0, lp.features))
    validationData = validationData \
                    .map(lambda lp:LabeledPoint(1, lp.features) if lp.label>=2000 else LabeledPoint(0, lp.features))
    model = modelClass.train(trainingData, **kwargs)
    model.clearThreshold()
    validationsResult = validationData.map(lambda lp:(float(model.predict(lp.features)), lp.label))
    metric = BinaryClassificationMetrics(validationsResult)
    # the error rate search is to search for overall best error rate
    # regardless of precision and recall, however they could be evaluate by PR area and ROC area
    errors = []
    for i in range(1, 11):
        err = validationsResult.filter(lambda (predict,label):(1 if predict>i/10.0 else 0)!=label).count() \
                                            / float(validationsResult.count())
        errors.append((err, i/10.0))
    errors.sort(key=lambda t:t[0])
    print "[ Error: %.4f\t\tPrecision-recall: %.4f\tROC: %.4f ] - %s sec" \
            % (errors[0][0], metric.areaUnderPR, metric.areaUnderROC, (time.time()-startTime))
        
def trainClassificationTreeModel(sc, trainingData, validationData, modelClass, **kwargs):
    """
    train classification models for TREE based model
    :param sc: spark context
    :param trainingData: RDD[LabeledPoint] of training data
    :param validationData: RDD[LabeledPoint] of validation data
    :modelClass: model CLASS that use to train
    :kwargs: key-value paired arguments for modelClass, would be passes in directly
    :return: None
    """
    print "Classification Model: %s %s" % (modelClass.__name__, kwargs)
    startTime = time.time()
    trainingData = trainingData \
                    .map(lambda lp:LabeledPoint(1, lp.features) if lp.label>=2000 else LabeledPoint(0, lp.features))
    validationData = validationData \
                    .map(lambda lp:LabeledPoint(1, lp.features) if lp.label>=2000 else LabeledPoint(0, lp.features))
    model = modelClass.trainClassifier(trainingData, **kwargs)
    validationFeatures = validationData.map(lambda lp:lp.features)
    # !!!beware, due to some stange bug, DO NOT chain RDD transformation on tree model predict, count() immediately!!!
    validationsResult = model.predict(validationFeatures)
    totalCount = validationsResult.count()
    validationsResult = validationsResult.zip(validationData.map(lambda lp:lp.label))
    errCount = validationsResult.filter(lambda (predict,label):predict!=label).count()
    validationsResult = validationsResult.zip(validationData.map(lambda lp:lp.label))
    err = float(errCount) / totalCount
    print "[ Error: %.4f ] - %s sec" % (err, (time.time()-startTime))
    
def selectRegressionModel(sc, trainingData, validationData):
    """
    wrapper function to evaluate and select all the regression models
    :param sc: spark context
    :param trainingData: RDD[LabeledPoint] of training data
    :param validationData: RDD[LabeledPoint] of validation data
    :return: None
    """
    print "=============================================="
    print "REGRESSION"
    print "=============================================="
    regressionModels = [
                            (LinearRegressionWithSGD, {"intercept":True, "regType":None}), 
                            (RidgeRegressionWithSGD, {"intercept":True}), 
                            (LassoWithSGD, {"intercept":True})
                       ]
    for modelClass, kwargs in regressionModels:
        trainRegressionModel(sc, trainingData, validationData, modelClass, **kwargs)
    regressionModels = [
                            (DecisionTree, {"categoricalFeaturesInfo":{},"minInstancesPerNode":100 ,"impurity":"variance"}), 
                            (RandomForest, {"categoricalFeaturesInfo":{},"numTrees":20, "impurity":"variance"}), 
                       ]
    for modelClass, kwargs in regressionModels:
        trainRegressionTreeModel(sc, trainingData, validationData, modelClass, **kwargs)
        
def trainRegressionModel(sc, trainingData, validationData, modelClass, **kwargs):
    """
    train regression models for NOT-TREE based model
    :param sc: spark context
    :param trainingData: RDD[LabeledPoint] of training data
    :param validationData: RDD[LabeledPoint] of validation data
    :modelClass: model CLASS that use to train
    :kwargs: key-value paired arguments for modelClass, would be passes in directly
    :return: None
    """
    print "Regression Model: %s %s" % (modelClass.__name__, kwargs)
    startTime = time.time()
    model = modelClass.train(trainingData, **kwargs)
    validationsResult = validationData.map(lambda lp:(float(model.predict(lp.features)), lp.label))
    metric = RegressionMetrics(validationsResult)
    print "[ MSE: %.4f\t\tR^2: %.4f\tExplained Variance: %.4f ] - %s sec" \
            % (metric.meanSquaredError, metric.r2, metric.explainedVariance, (time.time()-startTime))
        
def trainRegressionTreeModel(sc, trainingData, validationData, modelClass, **kwargs):
    """
    train regression models for TREE based model
    :param sc: spark context
    :param trainingData: RDD[LabeledPoint] of training data
    :param validationData: RDD[LabeledPoint] of validation data
    :modelClass: model CLASS that use to train
    :kwargs: key-value paired arguments for modelClass, would be passes in directly
    :return: None
    """
    print "Regression Model: %s %s" % (modelClass.__name__, kwargs)
    startTime = time.time()
    model = modelClass.trainRegressor(trainingData, **kwargs)
    validationFeatures = validationData.map(lambda lp:lp.features)
    # !!!beware, due to some stange bug, DO NOT chain RDD transformation on tree model predict, count() immediately!!!
    validationsResult = model.predict(validationFeatures)
    validationsResult.count()
    validationsResult = validationsResult.zip(validationData.map(lambda lp:lp.label))
    metric = RegressionMetrics(validationsResult)
    print "[ MSE: %.4f\t\tR^2: %.4f\tExplained Variance: %.4f ] - %s sec" \
            % (metric.meanSquaredError, metric.r2, metric.explainedVariance, (time.time()-startTime))
        
if __name__=="__main__":
    filePath = "file:///ipython/YearPredictionMSD.txt"
    sc = SparkContext(appName="MainContext")
    rawData = importRawData(sc, filePath).cache()
    try:
        corrSelection = exploreData(rawData)
        trainingData, validationData = featureEngineering(rawData, corrSelection = corrSelection)
        selectClassificationModel(sc, trainingData, validationData)
        selectRegressionModel(sc, trainingData, validationData)
    except Exception:
        raise
    finally:
        sc.stop()

EXPLORE DATA
Features:
Format:		[      mean	    variance	numNonzeors	         max	       min]
Feature 2:	[      43.39	       36.82	    515345	       61.97	      1.75 ] 
Feature 3:	[       1.29	     2660.53	    515345	      384.07	   -337.09 ] 
Feature 4:	[       8.66	     1243.87	    515345	      322.85	   -301.01 ] 
Feature 5:	[       1.16	      266.43	    515345	      335.77	   -154.18 ] 
Feature 6:	[      -6.55	      522.62	    515345	      262.07	   -181.95 ] 
Feature 7:	[      -9.52	      165.32	    515345	      166.24	    -81.79 ] 
Feature 8:	[      -2.39	      212.34	    515345	      172.40	   -188.21 ] 
Feature 9:	[      -1.79	       63.42	    515345	      126.74	    -72.50 ] 
Feature 10:	[       3.73	      112.00	    515345	      146.30	   -126.48 ] 
Feature 11:	[       1.88	       42.64	    515345	       60.35	    -41.63 ] 
Feature 12:	[      -0.15	       19.10	    515343	       88.02	    -69.68 ] 
Feature 13:	[       2.55	       69.23	    515345	       87.91	    -94.04 ] 
Fe