###Anomaly Detection : German Credit Risk

1. Import all libarires.

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import GaussianMixture
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
import numpy as np

from matplotlib import  pyplot as plt

import matplotlib as mpl

### Load german credit data

In [4]:
from pyspark.sql.types import *
schema = StructType([

StructField("creditability", IntegerType(), True),

StructField("balance", IntegerType(), True),

StructField("duration", IntegerType(), True),

StructField("history", IntegerType(), True),

StructField("purpose", IntegerType(), True),

StructField("amount", IntegerType(), True),

StructField("savings", IntegerType(), True),

StructField("employment", IntegerType(), True),

StructField("instPercent", IntegerType(), True),

StructField("sexMarried", IntegerType(), True),

StructField("guarantors", IntegerType(), True),

StructField("residenceDuration", IntegerType(), True),

StructField("assets", IntegerType(), True),

StructField("age", IntegerType(), True),

StructField("concCredit", IntegerType(), True),

StructField("apartment", IntegerType(), True),

StructField("credits", IntegerType(), True),

StructField("occupation", IntegerType(), True),

StructField("dependents", IntegerType(), True),

StructField("hasPhone", IntegerType(), True),

StructField("foreign", IntegerType(), True)

])
GermanDF = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferSchema='true').load('/FileStore/tables/uukouu8x1474353982100/German_cleanData.csv',schema=schema)






#GermanDF = sqlContext.read.format('com.databricks.spark.csv').options( #inferSchema='true').load('/FileStore/tables/uukouu8x1474353982100/German_cleanData.csv')


GermanDF.show(4)


Apply basic statistics here to process data

In [6]:
# register the Customers frame as table
GermanDF.registerTempTable('credit')
# query the credability table to check average balance amount,average loan and average duration for

# each class of customer i.e. 1 and 0

results =  sqlContext.sql("SELECT creditability, avg(balance) as avgbalance, avg(amount) as avgamt,avg(duration) as avgdur  FROM credit GROUP BY creditability")
# check the result of the query

results.show()

In [7]:
Basic_stat = GermanDF.describe('creditability','balance','duration','history','purpose')
GermanDF.describe('creditability','balance','duration','history','purpose').show()
#GermanDF.describe("amount","savings", "employment", "instPercent","sexMarried").show()
#GermanDF.describe("guarantors","residenceDuration", "assets", "age", "concCredit").show()
#GermanDF.describe("apartment","credits", "occupation", "dependents", "hasPhone").show()
#GermanDF.describe("foreign").show()

display(Basic_stat)

###Pre-process the data

In [9]:
GermanDF = GermanDF.withColumnRenamed("foreign","label")

### Transform 20 features into MLlib vectors

In [11]:
#assembler = VectorAssembler(
#    inputCols=['C%d' % i for i in range(20)],
#    outputCol="features") 


assembler = VectorAssembler(inputCols=["creditability","balance","duration","history","purpose","amount","savings","employment","instPercent","sexMarried","guarantors","residenceDuration","assets","age","concCredit","apartment","credits","occupation","dependents","hasPhone"],
    outputCol="features") 

output = assembler.transform(GermanDF)

output.show(4)

### StandardScaler : is feature Transformer
###Scale features to have zero mean and one unit standard deviation

Whether to standardize the data prior to a PCA on the covariance matrix depends on the measurement scales of the original features. Since PCA yields a feature subspace that maximizes the variance along the axes, it makes sense to standardize the data, especially, if it was measured on different scales.

In [13]:

standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='features',
                              outputCol='std_features')
model = standardizer.fit(output)
output = model.transform(output)



In [14]:
output.show(4)


###Convert label to numeric index

######StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the #####most frequent label gets index 0. If the input column is numeric, we cast it to string and index the string values. When downstream pipeline components #####such as Estimator or Transformer make use of this string-indexed label, you must set the input column of the component to this string-indexed column #####name. In many cases, you can set the input column with setInputCol.

In [16]:
indexer = StringIndexer(inputCol="label", outputCol="label_idx")
indexed = indexer.fit(output).transform(output)

print indexed.show(4)

###Extract only columns of interest

In [18]:
german = indexed.select(['std_features', 'label', 'label_idx'])



In [19]:
german.show(n=3, truncate=False) 

###Data conversion

######We will first fit a Gaussian Mixture Model with 2 components to the first 2 principal components of the data as an example of unsupervised learning. #####The GaussianMixture model requires an RDD of vectors, not a DataFrame. Note that pyspark converts numpy arrays to Spark vectors.

In [21]:
pca = PCA(k=2, inputCol="std_features", outputCol="pca")
model = pca.fit(german)
transformed = model.transform(german)

after_pca_transformed_Data = transformed

print type(after_pca_transformed_Data)

after_pca_transformed_Data.show(4)

print type(model)

###We split the data into training (75%) and use the rest (25%) as test set.

In [23]:
train_f, test_f = after_pca_transformed_Data.randomSplit([0.75,0.25],seed=123)

print type(test_f)

train_f.collect()

In [24]:
from numpy import array
import numpy as np
features = train_f.map(lambda row:array(row.pca))


features.take(2)


###Build Model :A clustering model derived from the Gaussian Mixture Model method.
gaussians is a array of MultivariateGaussian where gaussians[i] represents the Multivariate Gaussian (Normal) Distribution for Gaussian i.

k - Number of gaussians in mixture.

In [26]:
# gmm is a model
gmm = GaussianMixture.train(features,2)

print gmm

In [27]:
# output parameters of model
for i in range(2):
    print ("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
        "sigma = ", gmm.gaussians[i].sigma.toArray())

In [28]:
x_train = np.array(features.map(lambda row:array(row)).collect()).squeeze()

x_train

In [29]:
from scipy import linalg

color_iter = ['r','g']

splot = plt.subplot(111)
for i in range(2):
    mean = gmm.gaussians[i].mu
    covar = gmm.gaussians[i].sigma.toArray()
    color = color_iter[i]
    v, w =linalg.eigh(covar)
    u = w[0]/linalg.norm(w[0])
    
    plt.scatter(x_train[:, 0], x_train[:, 1], .8, color=color)
    angle = np.arctan(u[1]/u[0])
    angle = 180*angle/np.pi
    ell = mpl.patches.Ellipse(mean, v[0], v[1], 180+angle, color = color)
    ell.set_clip_box(plt.box)
    ell.set_alpha(0.5)
    splot.add_artist(ell)
    
plt.title("PySpark dataset with Gaussian Models aligned")

display(plt.show())

###Optimize and fit the model to data

In [31]:
predict = gmm.predict(test_f.map(lambda row:array(row.pca))).collect()

labels = test_f.select('label_idx').rdd.map(lambda r: r[0]).collect()

print predict

###Post-processing and model evaluation

In [33]:
np.corrcoef(predict, labels)



#### Plot discrepancy between predicted and labels

In [35]:

#Remove single-dimensional entries from the shape of an array.
xs = np.array(test_f.map(lambda row:array(row.pca)).collect()).squeeze()

fig, axes = plt.subplots(1, 2, figsize=(10, 4))

axes[0].scatter(xs[:, 0], xs[:,1], c=predict)
axes[0].set_title('Predicted')

axes[1].scatter(xs[:, 0], xs[:,1], c=labels)
axes[1].set_title('Labels')

display(fig)

### Apply Supervised MLLIB

###Convert to format expected by regression functions in mllib

In [38]:

train_f = train_f.map(lambda x: LabeledPoint(x[2], x[0]))

print type(train_f)

train_f.take(4)

In [39]:
test_f.show(4)

### Model on train data using logistic regression for Binary classification data

In [41]:
model = LogisticRegressionWithLBFGS.train(train_f)


### Evaluate on test data

In [43]:
eval_data = test_f.map(lambda x: (x.label_idx, float(model.predict(x.std_features))))


err = eval_data.filter(lambda x: x[0] != x[1]).count() / float(test_f.count())
print("Error = " + str(err))

print("Test Count =")
print(test_f.count())


success_count_new = eval_data.filter(lambda rec:
                                            rec[0] == rec[1]).count()



print("Accuracy data :Successful prediction percentage: " +
    str( round( success_count_new / float(test_f.count()), 4 ) ) )


eval_data.take(4)



### Final Result :
### Create a confusion metrics to understand True positive rates and False Positive Rates

In [45]:
from sklearn import metrics
import seaborn as sn 


#fig_heatmap = plt.figure(figsize=(10, 8))
labelsAndPreds_Eval_data= eval_data.toDF().toPandas()
cm = metrics.confusion_matrix( labelsAndPreds_Eval_data._1, labelsAndPreds_Eval_data._2 )

f = plt.figure(figsize=(10, 8))
plt.title('Confusion matrix of the classifier')
sn.heatmap(cm, annot=True, fmt='.2f' )
plt.close()
display(f)

###Final Result : Evaluate Anomaly detectors can be evaluated on the same metrics as binary classifiers. Area under the ROC curve provides a good way to measure the discriminatory power of the

In [47]:
import matplotlib.pyplot as plt



def prepare_plot(xticks, yticks, figsize=(10.5, 6), hide_labels=False, grid_color='#999999',
                 grid_width=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hide_labels: axis.set_ticklabels([])
    plt.grid(color=grid_color, linewidth=grid_width, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

### We will now visualize how well the model predicts our target. To do this we generate a plot of the ROC curve. The ROC curve shows us the trade-off between the false positive rate and true positive rate, as we liberalize the threshold required to predict a positive outcome. A random model is represented by the dashed line.

In [49]:
labels_and_scores = test_f.map(lambda x: (x.label_idx, float(model.predict(x.std_features))))
labels_and_weights = labels_and_scores.collect()
labels_and_weights.sort(key=lambda (k, v): v, reverse=True)
labels_by_weight = np.array([k for (k, v) in labels_and_weights])

length = labels_by_weight.size
true_positives = labels_by_weight.cumsum()
num_positive = true_positives[-1]
false_positives = np.arange(1.0, length + 1, 1.) - true_positives

true_positive_rate = true_positives / num_positive
false_positive_rate = false_positives / (length - num_positive)

AUC = 0
# Generate layout and plot data
fig, ax = prepare_plot(np.arange(0., 1.1, 0.1), np.arange(0., 1.1, 0.1))
ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
ax.set_ylabel('True Positive Rate') 
ax.set_xlabel('False Positive Rate')
plt.title("Accuracy data :Successful prediction percentage: " +
    str( round( success_count_new / float(test_f.count()), 4 ) ))
plt.plot(false_positive_rate, true_positive_rate, color='r', linestyle='-', linewidth=3.)
plt.plot((0., 1.), (0., 1.), linestyle='--', color='g', linewidth=2.)  # Baseline model
display(fig)

In [50]:
print false_positive_rate[0:70]

In [51]:
from sklearn.metrics import precision_recall_fscore_support as score

precision, recall, fscore, support = score(labels,predict)


print('precision: {}'.format(precision))
print('recall: {}'.format(recall))
print('fscore: {}'.format(fscore))
print('support: {}'.format(support))

#print labels,predict

###Plot risky and non risky data

In [53]:


#Remove single-dimensional entries from the shape of an array.
norm_data =german.filter(german.label == 1)

risky_data =german.filter(german.label == 2)

common_data = german.map(lambda l:array(l.std_features))

norm_Features = norm_data.map(lambda l:array(l.std_features))
risky_Features = risky_data.map(lambda l:array(l.std_features))


#Remove single-dimensional entries from the shape of an array.

xs = np.array(norm_Features.collect()).squeeze()
xs_r = np.array(risky_Features.collect()).squeeze()

xs_comman = np.array(common_data.collect()).squeeze()



fig, axes = plt.subplots(1, 3, figsize=(10, 4))
axes[0].scatter(xs[0:10], xs[10:20], c='g')
axes[0].set_title('Normal data')
axes[1].scatter(xs_r[0:10], xs_r[10:20], c='r')
axes[1].set_title('Risky Data')
axes[2].scatter(xs_comman[0:10], xs_comman[10:20])
axes[2].set_title('Comman Data')

display(fig)

In [54]:
import matplotlib.pyplot as plt

x = range(100)
y = range(100,200)
fig = plt.figure()
ax1 = fig.add_subplot(111)

ax1.scatter(xs[0:481], xs[481:962], s=10, c='b', marker="s", label='Normal Data')
ax1.scatter(xs_r[0:18], xs_r[18:36], s=10, c='r', marker="o", label='Risky Data')
#ax1.scatter(xs_comman[0:500], xs_comman[500:1000], s=10, c='g', marker="o", label='Comman Data')
plt.legend(loc='upper left');
display(fig)