In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils
import numpy as np
from pyspark.ml.feature import StandardScaler
import pyspark.sql.functions as f
import pyspark.sql.types
import pandas as pd
from pyspark.sql import Row


In [3]:
spark = SparkSession.builder.getOrCreate()
sc = SparkContext.getOrCreate()

In [4]:
trans_data = spark.read.csv("New_Aggregated_data_final.csv", inferSchema=True, header=True)

In [5]:
customer_data = spark.read.csv("Customer_data1.csv", inferSchema=True, header=True)

In [6]:
trans_data = trans_data.withColumn("sum_prev_day_onl", trans_data["sum_prev_day_onl"].cast("integer"))
trans_data = trans_data.withColumn("sum_prev_day_mon_onl", trans_data["sum_prev_day_mon_onl"].cast("integer"))

In [7]:
trans_data1 = trans_data.select('_c0','amt', 'Balance',
      'sum_prev_day', 'cnt_prev_day_onl', 'sum_prev_day_onl',
       '24hrsAvg','qtrAvg','wkAvg', 'monAvg','yrAvg').fillna(0)

In [8]:
# from pyspark.sql.functions import *
# train.where(col('cc_num').isNull()).count()
# df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()
# customer_data.select([count(when(col(c).isNull(),c)).alias(c) for c in customer_data.columns]).show()

In [9]:
# # merging the data together by their unique "id"
# train = trans_data.join(customer_data,how='left',on='cc_num')
# # all_data.show()

### PCA

In [10]:
#Create a single vector column
cols = trans_data1.drop('_c0').columns
cols

['amt',
 'Balance',
 'sum_prev_day',
 'cnt_prev_day_onl',
 'sum_prev_day_onl',
 '24hrsAvg',
 'qtrAvg',
 'wkAvg',
 'monAvg',
 'yrAvg']

In [11]:
assembler = VectorAssembler(inputCols=cols, outputCol='features')
output_dat = assembler.transform(trans_data1).select('_c0','features')
output_dat.show(5, truncate = False)

+---+-------------------------------------------------------------+
|_c0|features                                                     |
+---+-------------------------------------------------------------+
|0  |(10,[0,1],[95.0,942.0])                                      |
|1  |(10,[0],[90.0])                                              |
|2  |(10,[0,1],[188.0,6746.0])                                    |
|3  |[100.0,133.0,373.0,0.0,0.0,124.33,124.33,124.33,124.33,124.0]|
|4  |[79.0,3115.0,100.0,0.0,0.0,100.0,118.25,118.25,118.25,118.0] |
+---+-------------------------------------------------------------+
only showing top 5 rows



In [12]:
#Center and scale data
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures', withStd=True, withMean=True)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(output_dat)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(output_dat)
scaledData.select(['_c0','scaledFeatures']).show(5, truncate=False) #sample centered data

+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0|scaledFeatures                                                                                                                                                                                                |
+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0  |[-0.15324509141918674,-0.2082417185038452,-0.7332778530194066,-0.3436387790372,-0.21640155213233406,-0.6804918641670585,-1.2617558227807781,-1.150575009187998,-1.2597350985596223,-1.2583192791022992]       |
|1  |[-0.17600379267423114,-0.49825834867770463,-0.7332778530194066,-0.3436387790372,-0.21640155213233406,-0.6804918641670585,-1.2617558227807781,-1

In [13]:
#apply PCA
pca = PCA(k=8, inputCol=scaler.getOutputCol(), outputCol='pcaFeatures')

model = pca.fit(scaledData)
transformed_feature = model.transform(scaledData)

In [14]:
scaler.getOutputCol()

'scaledFeatures'

In [15]:
# from pyspark.mllib.linalg.distributed import RowMatrix
# pca_features = scaledData.select("scaledFeatures").rdd.map(lambda row : row[0])
# mat = RowMatrix(pca_features)
# svd = mat.computeSVD(5,True)
# svd

In [16]:
# percentage of variance explained by each PC
np.round(100.00*model.explainedVariance.toArray(),4)

array([53.7645, 15.0258,  9.997 ,  9.0107,  6.3449,  2.2406,  1.8031,
        1.7031])

In [17]:
# compute loadings of each feature
pcs = np.round(model.pc.toArray(),4)
pcs

array([[-0.1963,  0.2095, -0.0126, -0.7499,  0.5859,  0.0489, -0.0032,
        -0.0969],
       [-0.0012, -0.0318,  0.9992, -0.0167,  0.0091, -0.0012,  0.0048,
        -0.0127],
       [-0.3   ,  0.4045,  0.0115,  0.4062,  0.1406,  0.4948,  0.0026,
        -0.5627],
       [-0.2639,  0.4167,  0.0089, -0.2755, -0.6029, -0.4606,  0.0824,
        -0.3136],
       [-0.3331,  0.4029,  0.0238, -0.0495, -0.2335,  0.3855, -0.1155,
         0.7121],
       [-0.3105,  0.241 ,  0.0118,  0.4313,  0.4535, -0.5859,  0.2194,
         0.2495],
       [-0.3878, -0.3387, -0.0131, -0.0391, -0.065 ,  0.0794,  0.2516,
        -0.0115],
       [-0.3784, -0.2318, -0.0041,  0.0623,  0.034 , -0.1801, -0.8713,
        -0.0737],
       [-0.3879, -0.3381, -0.0109, -0.0319, -0.061 ,  0.0607,  0.212 ,
        -0.0211],
       [-0.3878, -0.3386, -0.0136, -0.0398, -0.0648,  0.0796,  0.2537,
        -0.009 ]])

In [18]:
pcs = np.round(model.pc.toArray(),4)
df_pc = pd.DataFrame(pcs, columns = ['PC'+str(i) for i in range(1, 9)], index = cols)
df_pc

Unnamed: 0,PC1,PC2,PC3,PC4,PC5,PC6,PC7,PC8
amt,-0.1963,0.2095,-0.0126,-0.7499,0.5859,0.0489,-0.0032,-0.0969
Balance,-0.0012,-0.0318,0.9992,-0.0167,0.0091,-0.0012,0.0048,-0.0127
sum_prev_day,-0.3,0.4045,0.0115,0.4062,0.1406,0.4948,0.0026,-0.5627
cnt_prev_day_onl,-0.2639,0.4167,0.0089,-0.2755,-0.6029,-0.4606,0.0824,-0.3136
sum_prev_day_onl,-0.3331,0.4029,0.0238,-0.0495,-0.2335,0.3855,-0.1155,0.7121
24hrsAvg,-0.3105,0.241,0.0118,0.4313,0.4535,-0.5859,0.2194,0.2495
qtrAvg,-0.3878,-0.3387,-0.0131,-0.0391,-0.065,0.0794,0.2516,-0.0115
wkAvg,-0.3784,-0.2318,-0.0041,0.0623,0.034,-0.1801,-0.8713,-0.0737
monAvg,-0.3879,-0.3381,-0.0109,-0.0319,-0.061,0.0607,0.212,-0.0211
yrAvg,-0.3878,-0.3386,-0.0136,-0.0398,-0.0648,0.0796,0.2537,-0.009


In [19]:
df_pc['PC1']

amt                -0.1963
Balance            -0.0012
sum_prev_day       -0.3000
cnt_prev_day_onl   -0.2639
sum_prev_day_onl   -0.3331
24hrsAvg           -0.3105
qtrAvg             -0.3878
wkAvg              -0.3784
monAvg             -0.3879
yrAvg              -0.3878
Name: PC1, dtype: float64

#### Get eigenvalues

In [20]:
from pyspark import SparkConf,SparkContext
from pyspark.sql.context import HiveContext
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler,PCA,StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.param import Param,Params
from pyspark.sql.types import *
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import _convert_to_vector, Vectors, Matrix, DenseMatrix
from pyspark.sql.functions import array, col, explode, struct, lit, udf, sum, when,avg,pow,sqrt,mean,log,desc
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.sql.window import Window
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
import time, os, sys, json, math
import datetime as dt
import subprocess
import getpass
import pdb
import csv
import pandas as pd
import numpy as np
from copy import copy
from numpy.linalg import eigh
import itertools

class princomp:
    def __init__(self,n=5,std=True,prefix='pcomp'):
        self.n=n
        self.std=std
        self.prefix=prefix
        self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
        self.inputcol = "std_features"
        self.outputcol = "pca_features"
        self.vars = 'all'
        self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
        self.components = spark.createDataFrame(pd.DataFrame([0]))
        self.projections = spark.createDataFrame(pd.DataFrame([0]))
        self.result = spark.createDataFrame(pd.DataFrame([0]))
    def helpme(self):
        print ('|-- Please note that output_parameters will have null values before calling the fit method')
        print ('|-- n : input_parameter : sets the number of principal components, default = 5')
        print ('|-- std : input_parameter : True/False value specifying whether to standardize the principal components, default = True')
        print ('|-- prefix : input_parameter : string specifying the prefix for columns of principal components , default = pcomp')
        print ('|-- vars : input_parameter : list of variable names to be used for principal components, default = all')
        print ('|-- components : output_parameter : pandas dataframe of coefficients of different input columns for computing principal components')
        print ('|-- result : output_parameter : spark dataframe of original variables joined with projections of principal components')
        print ('|-- covariance : output_parameter : Numpy array of the covariance matrix')
        print ('|-- eigvals : output_parameter : Numpy array of all eigenvalues')
        print ('|-- eigvecs : output_parameter : Numpy array of all eigenvectors')
        print ('|-- varianceexplained : output_parameter : variance explained by the n principal components')
        print ('|-- outputcompfile(file) : method : outputs the components matrix to the specified file')
        print ('|-- fit(inputdf,myfeatures) : method : fit method which computes all output parameters')
    # SET methods
    def setn(self,val):
        self.n = val
        self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
        self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
    def setstd(self,val):
        self.std = val
    def setprefix(self,val):
        self.prefix=val
        self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
    def setcols(self,val):
        self.cols = val
    def setinputcol(self,val):
        self.inputcol = val
        self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
    def setoutputcol(self,val):
        self.outputcol = val
        self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
    def setvars(self,val):
        self.vars = val
    def setmodel(self,val):
        self.model = val
    def setcomponents(self,val):
        self.components=val
    def setprojections(self,val):
        self.projections=val
    def setresult(self,val):
        self.result = val
    # GET methods
    def getn(self):
        return self.n
    def getstd(self):
        return self.std
    def getprefix(self):
        return self.prefix
    def getcols(self):
        return self.cols
    def getinputcol(self):
        return self.inputcol
    def getoutputcol(self):
        return self.outputcol
    def getvars(self):
        return self.vars
    def getmodel(self):
        return self.model
    def getcomponents(self):
        return self.components
    def getprojections(self):
        return self.projections
    def getresult(self):
        return self.result
    # CORE methods
    def vectorizedf(self,inputdf,vars='all'):
        """Returns the input spark dataframe with an additional column of dense vector features"""
        if vars=='all':
            myfeatures = inputdf.columns
        else:
            myfeatures=vars

        assembler = VectorAssembler(inputCols = myfeatures,outputCol="features")
        assembled = assembler.transform(inputdf)

        as_dense = udf(
            lambda v: DenseVector(v.toArray()) if v is not None else None,
            VectorUDT()
        )

        df_dense = assembled.withColumn("features1", as_dense(assembled.features))
        df_dense2 = df_dense.drop("features")
        df_dense3 = df_dense2.withColumnRenamed("features1","features")
        return df_dense3
    def outputcompfile(self,filewlocation):
        """ Outputs the loading of principal components to the file specified"""
        df = self.components
        df.to_csv(filewlocation,index=False)
        print ("Component matrix is now available at the location : "+filewlocation)
    def identity(self):
        """ Outputs an identity matrix in the form of features column in a dataframe"""
        iden = np.identity(len(self.vars)).tolist()
        rddi = sc.parallelize(iden)
        df_identity = rddi.map(lambda line:Row(std_features=Vectors.dense(line))).toDF()
        return df_identity
    def fit(self,inputdf,myfeatures):
        """ Fits the input dataframe in a PCA model with the given features """
        start_time = time.time()      # Start Timer
        if myfeatures=='all':
            self.vars = inputdf.columns
        else:
            self.vars = myfeatures
        # vectorize and scale
        df_dense = self.vectorizedf(inputdf,self.vars)
        df_normalized = self.scalemeanstd(df_dense)
        # Compute covariance matrix, eigenvalues and eigenvectors
        dfzeromean = df_normalized.select(self.inputcol)
        self.covariance = dfzeromean.map(lambda x:np.outer(x,x)).sum()/dfzeromean.count()
        col1 = self.covariance.shape[1]
        eigvals,eigvecs = eigh(self.covariance)
        inds = np.argsort(eigvals)
        self.eigvals = eigvals[inds[-1:-(col1+1):-1]]
        self.eigvecs = -1*eigvecs.T[inds[-1:-(col1+1):-1]]
        self.varianceexplained = np.sum(self.eigvals[0:self.n])/np.sum(self.eigvals)
        # Fit PCA model
        model1 = self.model.fit(df_normalized)
        df_features = model1.transform(df_normalized)
        # Compute components and put in a pandas dataframe
        df_identity = self.identity()
        components = model1.transform(df_identity)
        components = components.withColumnRenamed('pca_features','components')
        edf_rdd = components.select("components").rdd.map(lambda x: tuple(x.components.toArray().tolist()))
        edf_pandas = edf_rdd.toDF(self.cols).toPandas()
        comp_ind = sqlContext.createDataFrame([Row(industries=self.vars)])
        comp_ind_pandas = comp_ind.select(explode(comp_ind.industries).alias("Variable")).toPandas()
        self.components = pd.concat([comp_ind_pandas,edf_pandas],axis=1)
        # Compute and standardize projections if self.std = True
        if self.std:
            projections1=self.scalemeanstd(df_features,inputcol = "pca_features",outputcol = "projections")
        else:
            projections1 = df_features.withColumnRenamed('pca_features','projections')
        # Prepare data for output
        self.projections = projections1.select('projections')
        drop_list = ['features','std_features','pca_features']
        projections2 = projections1.select([c for c in projections1.columns if c not in drop_list])
        l = ['x.'+c for c in inputdf.columns]
        cst = '['+",".join(l)+']'
        final_df = projections1.rdd.map(lambda x: tuple(eval(cst))+tuple(x.projections.toArray().tolist())).toDF(inputdf.columns+self.cols)
        self.result = final_df
        print("PCA fitting took a total of %s seconds " % (time.time() - start_time))

In [21]:
fs = princomp(n=15)
fs.fit(trans_data1,cols) # here select_features is the list of columns that you wish to perform PCA on
fs.outputcompfile('components.csv')
print (fs.eigvals)
print (fs.varianceexplained)

AttributeError: 'princomp' object has no attribute 'scalemeanstd'

In [23]:
#  # Compute covariance matrix, eigenvalues and eigenvectors
dfzeromean = scaledData.select('scaledFeatures')
# cov = as_cov(assembled.features)

dfzeromean.map(lambda x:np.outer(x,x)).sum()/dfzeromean.count()

as_cov = udf(
            lambda x:np.outer(x,x)
        )

# # col1 = self.covariance.shape[1]
# # eigvals,eigvecs = eigh(self.covariance)
# # inds = np.argsort(eigvals)
# # self.eigvals = eigvals[inds[-1:-(col1+1):-1]]
# # self.eigvecs = -1*eigvecs.T[inds[-1:-(col1+1):-1]]
# # self.varianceexplained = np.sum(self.eigvals[0:self.n])/np.sum(self.eigvals)

AttributeError: 'DataFrame' object has no attribute 'map'

In [40]:
def estimateCovariance(df):
    """Compute the covariance matrix for a given dataframe.

    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.

    Args:
        df:  A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.

    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input dataframe.
    """
    m = df.select(df['features']).rdd.map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).rdd.map(lambda x:   x[0]).map(lambda x: x-m)  # subtract the mean

    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

In [44]:
from numpy.linalg import eigh

def pca(df, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.

    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k (int): The number of principal components to return.

    Returns:
        tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
        scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
        rows equals the length of the arrays in the input `RDD` and the number of columns equals
        `k`.  The `RDD` of scores has the same number of rows as `data` and consists of arrays
        of length `k`.  Eigenvalues is an array of length d (the number of features).
     """
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvals
    score = df.select(df['features']).rdd.map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
    # Return the `k` principal components, `k` scores, and all eigenvalues

    return components.T, score, eigVals

In [42]:
 data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
...     (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
...     (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = spark.createDataFrame(data,["features"])

In [46]:
>>> score.collect()
# [Row(pca_features=DenseVector([1.6486, 4.0133])),
#  Row(pca_features=DenseVector([-4.6451, 1.1168])),
#  Row(pca_features=DenseVector([-6.4289, 5.338]))]

[array([1.64857282, 4.0132827 ]),
 array([-4.64510433,  1.11679727]),
 array([-6.42888054,  5.33795143])]

In [47]:
eigVals

array([ 1.20041647e+01,  3.10694640e+00,  1.16385189e-15,  1.96900417e-17,
       -2.23965166e-15])

In [49]:
output = output_dat.select('features')
comp, score, eigVals = pca(output)
eigVals

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 61, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 253, in main
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 248, in process
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\pyspark\rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\pyspark\rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\pyspark\rdd.py", line 350, in func
    return f(iterator)
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\pyspark\rdd.py", line 1063, in <lambda>
    return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\statcounter.py", line 43, in __init__
    self.merge(v)
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\statcounter.py", line 47, in merge
    delta = value - self.mu
TypeError: unsupported operand type(s) for -: 'SparseVector' and 'float'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:165)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 253, in main
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 248, in process
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\pyspark\rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\pyspark\rdd.py", line 2440, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\pyspark\rdd.py", line 350, in func
    return f(iterator)
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\pyspark\rdd.py", line 1063, in <lambda>
    return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\statcounter.py", line 43, in __init__
    self.merge(v)
  File "C:\spark\spark-2.3.2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\statcounter.py", line 47, in merge
    delta = value - self.mu
TypeError: unsupported operand type(s) for -: 'SparseVector' and 'float'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


### Feature Engineering

In [22]:
from pyspark.sql.functions import *
from pyspark.sql.functions import unix_timestamp

In [60]:
train = train.withColumn("Birthdate",from_unixtime(unix_timestamp(train['dob'], 'MM/dd/yyyy')))

In [61]:
train=(train.withColumn('Yearofbirth',year(train['Birthdate'])))

In [62]:
train = train.withColumn("trans_date",from_unixtime(unix_timestamp(train['trans_date'], 'MM/dd/yyyy')))

In [64]:
train=(train.withColumn('Month',month(train['fulltime'])))
train = train.withColumn("Time",hour(train["fulltime"]))

In [65]:
train = train.withColumn('today_date',lit(2019))
train = train.withColumn('Age',train['today_date']-train['Yearofbirth'])
train.select('today_date','Age','dob').show(5)

+----------+---+---------+
|today_date|Age|      dob|
+----------+---+---------+
|      2019| 45|9/23/1974|
|      2019| 45|9/23/1974|
|      2019| 45|9/23/1974|
|      2019| 45|9/23/1974|
|      2019| 45|9/23/1974|
+----------+---+---------+
only showing top 5 rows



In [68]:
#spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false")
a=train.drop('first','last')
train2 = a.toPandas()

In [None]:
train2["fulltimepd"] =  pd.to_datetime(train2['unix_time'],unit='s')

train2["Weekday"] = train2["fulltimepd"].dt.strftime("%A")

In [70]:
#train["Time_short_for_grouping"] = train["Time_short_for_grouping"].astype(int)
bins = [2,6,11,18,22]
labels = ["Early Morning","Morning","Afternoon","Evening"]
train2["Time of day"] = pd.cut(train2.Time,bins=bins,labels=labels)
train2["Time of day"]=train2["Time of day"].cat.add_categories('Midnight') 
train2["Time of day"] = train2["Time of day"].fillna('Midnight')

In [71]:
train2["Month"] = train2["fulltimepd"].dt.strftime("%B")

In [72]:
def haversine_(lat1, lng1, lat2, lng2):
    """function to calculate haversine distance between two co-ordinates"""
    lat1, lng1, lat2, lng2 = map(np.radians, (lat1, lng1, lat2, lng2))
    AVG_EARTH_RADIUS = 6371  # in km
    lat = lat2 - lat1
    lng = lng2 - lng1
    d = np.sin(lat * 0.5) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(lng * 0.5) ** 2
    h = 2 * AVG_EARTH_RADIUS * np.arcsin(np.sqrt(d))
    return(h)

In [73]:
def manhattan_distance_pd(lat1, lng1, lat2, lng2):
    """function to calculate manhatten distance between pick_drop"""
    a = haversine_(lat1, lng1, lat1, lng2)
    b = haversine_(lat1, lng1, lat2, lng1)
    return a + b

In [74]:
train2['hvsine']= haversine_(train2['lat'].values,
                                 train2['long'].values, train2['merch_lat'].values,
                                             train2['merch_long'].values)

In [75]:
train2['mnhtn']= manhattan_distance_pd(train2['lat'].values,
                                 train2['long'].values, train2['merch_lat'].values,
                                             train2['merch_long'].values)

In [76]:
train2['hvsine2']= haversine_(train2['prev_lat'].values,
                                 train2['prev_long'].values, train2['merch_lat'].values,
                                             train2['merch_long'].values)



In [77]:
train2['mnhtn2']= manhattan_distance_pd(train2['prev_lat'].values,
                                 train2['prev_long'].values, train2['merch_lat'].values,
                                             train2['merch_long'].values)



In [78]:
#Distance and time dfference
train2['distandtime1'] = train2['mnhtn2']/train2['time_diff_min']
# train2['distandtime2'] = train2['hvsine2']/train2['time_diff_min']
# train['distandtime3'] = train['bearing2']/train['time_diff_min']

#train2['distandtime'] =train2['manhtn']/train2['time_diff_min']

In [79]:
# Amount versus year average
train2['amt_yrAvg'] = train2['amt']/train2['yrAvg']

In [80]:
#
train3=train2.copy(deep=True)

In [81]:
trainimp_f = train3[['Channel', 'Transaction Type', 'gender', 'amt', 'Balance',
       'Month', 'Weekday','Time of day','Age','mnhtn','sum_prev_day', 'cnt_prev_day_onl', 'sum_prev_day_onl',
       '24hrsAvg','qtrAvg','wkAvg', 'monAvg','yrAvg','mnhtn2','distandtime1','amt_yrAvg','is_fraud']]

In [82]:
trainimp_f.isnull().sum()

Channel                 0
Transaction Type        0
gender                  0
amt                     0
Balance                 0
Month                   0
Weekday                 0
Time of day             0
Age                     0
mnhtn                   0
sum_prev_day         1496
cnt_prev_day_onl    15761
sum_prev_day_onl    17759
24hrsAvg             1496
qtrAvg                332
wkAvg                 332
monAvg                332
yrAvg                 332
mnhtn2                 98
distandtime1           98
amt_yrAvg             332
is_fraud                0
dtype: int64

In [83]:
colsna= ['sum_prev_day','cnt_prev_day_onl','sum_prev_day_onl',
         '24hrsAvg','qtrAvg','wkAvg','monAvg','yrAvg','mnhtn2','distandtime1','amt_yrAvg']

In [84]:
for col in colsna:
    trainimp_f[col] = trainimp_f[col].fillna(0)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  from ipykernel import kernelapp as app


In [85]:
trainimp_f.head()

Unnamed: 0,Channel,Transaction Type,gender,amt,Balance,Month,Weekday,Time of day,Age,mnhtn,...,sum_prev_day_onl,24hrsAvg,qtrAvg,wkAvg,monAvg,yrAvg,mnhtn2,distandtime1,amt_yrAvg,is_fraud
0,POS,POS,F,95,942,January,Friday,Midnight,45,1.688628,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0
1,POS,POS,F,90,0,January,Friday,Evening,45,2.709298,...,0.0,0.0,0.0,0.0,0.0,0.0,1.020535,0.001001,0.0,0
2,Web,Online,F,188,6746,January,Friday,Evening,45,1.880668,...,0.0,0.0,0.0,0.0,0.0,0.0,2.175526,0.022514,0.0,0
3,Mobile,USSD,F,100,133,January,Saturday,Afternoon,45,1.911466,...,0.0,124.33,124.33,124.33,124.33,124.0,3.791882,0.003087,0.806452,0
4,ATM,ATM,F,79,3115,January,Sunday,Morning,45,1.804176,...,0.0,100.0,118.25,118.25,118.25,118.0,1.52947,0.001447,0.669492,0


In [86]:
from pyspark.sql.types import (StructField, StringType,
                              IntegerType, StructType, DoubleType)

In [87]:
data_schema = StructType([StructField("Channel", StringType(), True),
               StructField("Transaction Type", StringType(), True),
               StructField("gender", StringType(), True),
               StructField("amt", IntegerType(), True),
               StructField("Balance", IntegerType(), True),
               StructField("Month", StringType(), True),
               StructField("Weekday", StringType(), True),
               StructField("Time of day", StringType(), True),
               StructField("Age", IntegerType(), True),
               StructField("mnhtn",  DoubleType(), True),
               StructField("sum_prev_day", DoubleType(), True),
               StructField("sum_prev_day_onl", DoubleType(), True),
               StructField("cnt_prev_day_onl", DoubleType(), True),
               StructField("24hrsAvg", DoubleType(), True),
               StructField("wkAvg", DoubleType(), True),
               StructField("monAvg", DoubleType(), True),
               StructField("qtrAvg", DoubleType(), True),
               StructField("yrAvg", DoubleType(), True),
               StructField("mnhtn2", DoubleType(), True),
               StructField("distandtime1", DoubleType(), True),
               StructField("amt_yrAvg", DoubleType(), True),
               StructField("is_fraud", IntegerType(), True)])

In [88]:
#Convert
#spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false")
#from pyspark.sql.types import *
#from pyspark.sql import SqlContext
trainimp_f = spark.createDataFrame(trainimp_f,schema=data_schema)

In [89]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

### MLlib Transformer Pipeline

In [90]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import (RandomForestClassifier,
                                       GBTClassifier)

indexers = [StringIndexer(inputCol=column, 
                          outputCol=column+"_index",
                          handleInvalid="keep").fit(trainimp_f) 
            for column in list(['Transaction Type',
                         'gender',
                        "Time of day"]) ]



assembler = VectorAssembler(inputCols= ['amt',
                                        "Age",
                                        "Time of day_index",
                                        'amt_yrAvg','mnhtn2','distandtime1'], 
                            outputCol='features')


rfc = RandomForestClassifier(labelCol='is_fraud',
                             featuresCol= 'features',
                             maxDepth = 11,numTrees=40,seed=1)    

pipeline = Pipeline(stages=indexers+[assembler,rfc])

In [91]:
train_data,test_data = trainimp_f.randomSplit([0.7,0.3])

In [92]:
rfc_model2 = pipeline.fit(train_data)

In [94]:
rfc_preds3 = rfc_model2.transform(test_data)

In [96]:
rfc_preds3.select('amt','mnhtn2','distandtime1','amt_yrAvg','is_fraud','prediction').show()

+---+-------------------+--------------------+--------------------+--------+----------+
|amt|             mnhtn2|        distandtime1|           amt_yrAvg|is_fraud|prediction|
+---+-------------------+--------------------+--------------------+--------+----------+
|  6| 0.6004767007199595|7.269931350297886E-5| 0.06976744186046512|       0|       0.0|
|  6|  5.278624833901226| 0.04529064636551888|0.047619047619047616|       0|       0.0|
|  6|0.21698341972920043|0.007215943456242116| 0.07407407407407407|       0|       0.0|
|  6|  2.817629414586735|0.020340957367793348| 0.06451612903225806|       0|       0.0|
|  7| 0.6871846682760476|0.002072017694183771| 0.07865168539325842|       0|       0.0|
|  7| 1.3819106657909628|0.002747992892521...| 0.07777777777777778|       0|       0.0|
|  7|   3.03338696845326|0.003177818834480394| 0.12727272727272726|       0|       0.0|
|  8|   2.28200209680964|0.018108253426516747| 0.07920792079207921|       0|       0.0|
|  8|  1.996620501516868|0.02414

In [72]:
my_binary_eval = BinaryClassificationEvaluator(labelCol='is_fraud')
print(my_binary_eval.evaluate(rfc_preds))

0.9995282126010989


### Model Evaluation

In [73]:
tp = rfc_preds[(rfc_preds.is_fraud == 1) & (rfc_preds.prediction == 1)].count()
tn = rfc_preds[(rfc_preds.is_fraud == 0) & (rfc_preds.prediction == 0)].count()
fp = rfc_preds[(rfc_preds.is_fraud == 0) & (rfc_preds.prediction == 1)].count()
fn = rfc_preds[(rfc_preds.is_fraud == 1) & (rfc_preds.prediction == 0)].count()
print ("True Positives:", tp)
print ("True Negatives:", tn)
print ("False Positives:", fp)
print ("False Negatives:", fn)
print ("Total", rfc_preds.count())

True Positives: 170
True Negatives: 6058
False Positives: 19
False Negatives: 3
Total 6250


In [54]:
tp = gbt_preds[(gbt_preds.is_fraud == 1) & (gbt_preds.prediction == 1)].count()
tn = gbt_preds[(gbt_preds.is_fraud == 0) & (gbt_preds.prediction == 0)].count()
fp = gbt_preds[(gbt_preds.is_fraud == 0) & (gbt_preds.prediction == 1)].count()
fn = gbt_preds[(gbt_preds.is_fraud == 1) & (gbt_preds.prediction == 0)].count()
print ("True Positives:", tp)
print ("True Negatives:", tn)
print ("False Positives:", fp)
print ("False Negatives:", fn)
print ("Total", gbt_preds.count())

True Positives: 156
True Negatives: 6065
False Positives: 5
False Negatives: 19
Total 6245


### Saving & Loading Models

In [55]:
rfc_model.write().overwrite().save("models/rfc_model")

In [56]:
type(rfc_model)

pyspark.ml.classification.RandomForestClassificationModel

In [70]:
from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()

Transformed string column 'category' to indexed column 'categoryIndex'
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

StringIndexer will store labels in output column metadata

Transformed indexed column 'categoryIndex' back to original string column 'originalCategory' using labels in metadata
+---+-------------+----------------+
| id|categoryIndex|originalCategory|
+---+-------------+----------------+
|  0|          0.0|               a|
|  1|          2.0|               b|
|  2|          1.0|               c|
|  3|          0.0|               a|
|  4|          0.0|               a|
|  5|          1.0|               c|
+---+-------------+----------------+

