# Data Science Case Study

### 1. Importing all the necessary packages and functions

In [1]:
import numpy as np
import pandas as pd
import glob # to be used to import multiple files at once
import findspark
findspark.init('/home/ubuntu/spark-2.3.1-bin-hadoop2.7')

import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('spark').getOrCreate()
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import PCA
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel,LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder,TrainValidationSplit

In [2]:
# Check if all inout files are covered or not
path ='/home/ubuntu/Notebooks'
filenames = glob.glob( path+"/*.parquet")

In [18]:
print(filenames)

['/home/ubuntu/Notebooks/part-00007-tid-6056519413201330783-119422a4-5c84-4461-ba64-4a5e2a1d4acd-36-c000.snappy.parquet', '/home/ubuntu/Notebooks/part-00006-tid-6056519413201330783-119422a4-5c84-4461-ba64-4a5e2a1d4acd-35-c000.snappy.parquet', '/home/ubuntu/Notebooks/part-00000-tid-6056519413201330783-119422a4-5c84-4461-ba64-4a5e2a1d4acd-29-c000.snappy.parquet', '/home/ubuntu/Notebooks/part-00003-tid-6056519413201330783-119422a4-5c84-4461-ba64-4a5e2a1d4acd-32-c000.snappy.parquet', '/home/ubuntu/Notebooks/part-00004-tid-6056519413201330783-119422a4-5c84-4461-ba64-4a5e2a1d4acd-33-c000.snappy.parquet', '/home/ubuntu/Notebooks/part-00001-tid-6056519413201330783-119422a4-5c84-4461-ba64-4a5e2a1d4acd-30-c000.snappy.parquet', '/home/ubuntu/Notebooks/part-00002-tid-6056519413201330783-119422a4-5c84-4461-ba64-4a5e2a1d4acd-31-c000.snappy.parquet']


### 2. Loading the data into a dataframe

In [2]:
# Creating spark dataframe by reading all the input files
lucid_dataset=spark.read.format("parquet").option("header", "true").load("/home/ubuntu/Notebooks/*.parquet")

In [5]:
# Total number of rows in dataset?
lucid_dataset.count()

6883958

In [6]:
# Total number of columns? in dataset?
len(lucid_dataset.columns)

503

In [7]:
lucid_dataset.select('label').distinct().show()

+-----+
|label|
+-----+
|    1|
|    0|
+-----+



In [6]:
lucid_dataset.groupby('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
|    1|    368|
|    0|6883590|
+-----+-------+



### This is an highly imbalanced dataset with 99.995 % are 0's(negative class) and 0.005 % are 1's (positive class)

# Data Cleaning Steps:
<b> Though the data is cleaned, but below are the steps which I would have followed to clean the data and identify issues </b>

1. Check for nulls in the data. Would have written a pyspark script which will give us, what columns contain null values and how much proportion of nulls they have
2. Depending on the proportion, would have either deleted the column or replaced null with the mean value (in case of continuous and discrete value) or replaced with 0 in case of binary
3. Identifying and replacing the null value should proceed with caution since it will be dependent on the type of column
4. The next step would be to identify if we have any duplicates in our data, again a pyspark script I have written which helps in identifying duplicate rows as well as removes it
5. In case of categorical/text data, converting the data into numeric, so that it can be used for analysis (using one hot encoder)
6. Renaming of columns, each column should have a meaningful names so that it becomes easy to understrand
7. Standardizing data --> eg: if there are two columns merged into one then we have to separate such columns
8. Converting datatypes into their respective format

### 3.  Steps to identify important variables and create model :

a. <b> Separate the data into training and test. All the analysis,feature selection would be on training data only </b>

b. <b> Separate the categorical(binary) variables and continuous/discrete variables </b>


c. <b> Using Chi-Square selector, identify important features from the categorical variables </b>

d. <b> Build Logistic Regression model using the important features and identify variables which are important or have zero impact on the model </b>

e. <b> Build logistic regression/SVM model on the continuous/discrete variables and identify variables which are impactful and which are not </b>

In [5]:
# Splitting the data into train and test
train_data,test_data=lucid_dataset.randomSplit([0.8,0.2])

In [31]:
def column_binary(dataframe):
    """
    This function will separate the categorical(binary) variables with continuous/discrete and will give a list of columns which have o  and 1
    """
    cols=[]
    for i in dataframe.columns[1:]:
        if dataframe.select(i).distinct().count()==2:
            cols.append(i)
    cols.append('label') # appending label column to capture output in new dataframe
    return cols 
cols=column_binary(lucid_dataset)

In [134]:
def create_features(data):
    """
    # This function creates a dataframe containing only categorical data and uses Vector assembler to create the features and
      dataframe required to perform Chi square test
    """
    category_data=data.select(cols)
    assembler=VectorAssembler(inputCols=cols[1:],outputCol='features')
    category_data=assembler.transform(category_data)
    return category_data
category_data=create_features(train_data)

In [135]:
def Chisq(data):
    """
    # ChiSqSelector stands for Chi-Squared feature selection. It operates on labeled data with categorical features. 
    # ChiSqSelector uses the Chi-Squared test of independence to decide which features to choose
    # It takes category_data as the input since the input should contain only categorical data
    """
    selector = ChiSqSelector(numTopFeatures=50, featuresCol="features",outputCol="selectedFeatures", labelCol="label")
    model = selector.fit(data)
    feature=model.transform(data).head().selectedFeatures
    column_list=(model.selectedFeatures)
    return column_list

column_list=Chisq(category_data)

In [136]:
def useful_columns(data):
    """
    # This function takes category_data as the input and creates a list containing all the selected variables from chisquare test
    # It uses the column_list variable from Chisq function's output to build the list
    """
    q=data.columns
    output=['label']
    for i in column_list:
        output.append(q[i])
    return output    

output=useful_columns(category_data)

In [8]:
lr=LogisticRegression() # Initializing Logistic regression

In [137]:
def cat_function_features(data):
    """
    # This function takes train_data as the input and creates a dataframe containing all binary/categorical variables
    # It then transforms them into features, to be required for Logistic Regression
    # Finally it calculates all the coefficients and captures them into a dataframe containing all the columns and coefficients
    """
    cat_train_data=data.select(output)
    assembler=VectorAssembler(inputCols=cat_train_data.columns[1:],outputCol='features')
    cat_train_data=assembler.transform(cat_train_data)
    lr_model=lr.fit(cat_train_data)
    lr_model.save('category_model')
    data_array = []
    column_schema=sch=cat_train_data.columns[1:]
    for i in range (0,len(lr_model.coefficients)) :
         data_array.extend([(column_schema[i], float(lr_model.coefficients[i]))])
    df = spark.createDataFrame(data = data_array, schema = ["columns", "coefficients"])
    return df

df=cat_function_features(category_data)

In [90]:
lr_model=LogisticRegressionModel.load('category_model')

## In logistic Regression : 

-> A negative coefficient simply implies that the probability that the event identified by the dependent variable happens decreases as the value of the independent variable increases

-> A positive coefficient simply implies that the probability that the event identified by the dependent variable happens increases as the value of the independent variable increases

In [98]:
df_no_zero=df.filter('coefficients!=0')
positive_variable=df_no_zero.filter('coefficients>0.05')
negative_variable=df_no_zero.filter('coefficients<-0.05')

In [106]:
# Features which  impact the output variable by going in negative direction are (ordered from their impact/ variation):
var1=negative_variable.orderBy(negative_variable['coefficients'].asc()).select('columns').rdd.flatMap(lambda x: x).collect()
for x in var1:
    print(x, end=' ')

f445 f458 f301 f309 f265 f253 f325 f183 f451 f275 f162 f466 f259 f421 f298 f179 f153 f193 f502 f326 f305 f150 f202 f365 f223 f340 f172 f494 f306 f449 f268 f394 f395 f300 f333 f324 f187 f350 

In [111]:
var2=positive_variable.orderBy(positive_variable['coefficients'].asc()).select('columns').rdd.flatMap(lambda x: x).collect()
for x in var2:
    print(x, end=' ')

f478 f228 f173 f470 f208 f113 f163 f145 f180 f430 f154 

In [138]:
def numeric_data(data):
    """
    # This function takes train_data as the input and creates a dataframe containing all continuous/discrete variables
    # It then transforms them into features, to be required for Logistic Regression
    # Finally it calculates all the coefficients and captures them into a dataframe containing all the columns and coefficients
    """
    numeric_data=data.select([c for c in data.columns if c not in cols[0:-1]])
    assembler=VectorAssembler(inputCols=numeric_data.columns[1:],outputCol='features')
    numeric_data=assembler.transform(numeric_data)
    n_model=lr.fit(numeric_data)
    n_model.save('numeric_model')
    data_array = []
    for i in range (0,len(n_model.coefficients)) :
        data_array.extend([(numeric_columns[i], float(n_model.coefficients[i]))])
    df = spark.createDataFrame(data = data_array, schema = ["columns", "coefficients"])
    return df

df=numeric_data(train_data)

In [None]:
n_model=LogisticRegressionModel.load('numeric_model')

## 3. Steps to identify/ important variables:

#### a. We will now try to drop those variables whose coefficients values are low and zero.
#### b. Since logistic regression is log probability, bigger the number, better are the odds to increase the probability of the output


In [74]:
newdf=df.filter('coefficients!=0.0')
negativedf=newdf.filter('coefficients<-1')
positivedf=newdf.filter('coefficients>1')

In [80]:
print(negativedf.count()) # total number of variables which impact the output while they are decreasing
print(positivedf.count()) # total number of variables which impact the output while they are increasing

27
12


In [107]:
# Features which  impact the output variable by going in negative direction are (ordered from their impact/ variation):
var3=negativedf.orderBy(negativedf['coefficients'].asc()).select('columns').rdd.flatMap(lambda x: x).collect()
for x in var3:
    print(x, end=' ')

f12 f68 f18 f65 f88 f403 f73 f63 f67 f35 f158 f313 f499 f69 f199 f425 f127 f66 f465 f17 f147 f75 f101 f501 f471 f293 f89 

In [108]:
# Features which are impact the output variable by going in negative direction are:
var4=positivedf.orderBy(positivedf['coefficients'].desc()).select('columns').rdd.flatMap(lambda x: x).collect()
for x in var4:
    print(x, end=' ')

f366 f6 f489 f115 f122 f330 f272 f369 f488 f491 f483 f258 

# Feature Selection Procedure:

<b> Out of the 501 features, an important task was to identify the important features, below are the steps I took: </b>

1. The dataset has both binary and continuous variables. Separate these columns into a different dataframe
2. The result was, there were 335 binary columns and 166 continuous variables
3. Using Chi square selector on the binary column dataframe, identified the top 50 features in that dataframe and stored them
4. Created a logistic regression model on those column and got rid of those variables whose coefficients were zero
5. Majority of the features had a negative coefficients and it means that, they impact the output variable positively when they decrease or in our case , they are zero (masked as zero in case of categorical)
6. For continuous/discrete variables, I created a logistic regression model and got their coefficients
7. Out of 166 columns, 24 had zero coefficients and thus were ruled out from analysis
8. Many variables had very low coefficients(in negative exponential terms) and thus I decided to rule them out 
9. Finally out of 166, I identified 39 columns which were impactful for the output probability 
10. 27 were those which impacted the output probability inversely. Their decrease was increasing the probability
11. Since it's PySpark, I could not use Anova or RFE which would have further helped in identifying important features or would have gotten rid of more variables

# Important Features are:

 
1. Features/variables <b>"f445, f458, f301, f309, f265, f253, f325, f183, f451, f275 </b> are the top variables which impact/increase the output probability ( of 1 ) when they are 0

2. The other cateogorical/binary variables which increase the probability of the output to be 1 when they are themselves 1 are : <b> "f478 ,f228, f173, f470, f208, f113, f163" </b>

3. <b>"f12, f68, f18, f65, f88, f403, f73, f63, f67, f35, f158, f313"</b> are the top 12 continuous/discrete variables by which probability that the event identified by the dependent variable happens decreases as the value of the independent variable increases

4. <b>"f366, f6, f489, f115, f122, f330, f272, f369, f488, f491, f483, f258" </b>are top 12 continuous/discrete variables which impact output probability positively when they increase


## 4. Handling the class imbalance by creating weights for our 0's and 1's

In [59]:
# Creating a list of final variables to be used 
final_variables=var1+var2+var3+var4
final_variables.append('label')
print('total number of features used are: ',len(final_variables)-1)

total number of features used are:  88


In [6]:
# Slicing training and test data with the final features and variables
train_data=train_data.select(final_variables)
test_data=test_data.select(final_variables)

In [7]:

def balanceDataset(data): 
    """
    This function is used to created weights column.
    It takes a dataset as an input and it should have 'label' column in it
    Creates a separate dataframe with assigned weights required to handle Class Imbalance
    
    """
    #// Re-balancing (weighting) of records to be used in the logistic loss objective function
    numNegatives = data.filter("label == 0").count() #5505244
    datasetSize =  data.count() # 5505526
    balancingRatio = (datasetSize - numNegatives) / datasetSize
    calculateWeights =udf(lambda d: (1 *  balancingRatio) if d==0.0 else (1 * (1.0 - balancingRatio)), DoubleType())
    weightedDataset = data.withColumn("classWeightCol", calculateWeights(data.label))
    return weightedDataset

weightedDataset=balanceDataset(train_data)

In [8]:
# Creating our test and train features
assembler=VectorAssembler(inputCols=weightedDataset.columns[1:-1],outputCol='features')
train_data=assembler.transform(weightedDataset)
test_data=assembler.transform(test_data)

## 5. Initializing our Logistic Regression model and getting the F1 score and AUC Curve

In [46]:

lr=LogisticRegression(weightCol='classWeightCol') # weights col will take into account the class imbalance
final_model=model.fit(train_data)
prediction=final_model.transform(test_data)
f1_eval = MulticlassClassificationEvaluator(metricName='f1')
lr_f1= f1_eval.evaluate(prediction)
final_lr.save('Logistic_regression_model')

print("F1 score of model logistic was: {}".format(lr_f1))

F1 score of model logistic was: 0.8663449465447742


In [47]:
auc_eval = BinaryClassificationEvaluator()
print('AUC is', auc_eval.evaluate(prediction))

AUC is 0.882231605978899


In [48]:
# Creating a confusion matrix
c=prediction.select('label').toPandas()
d=prediction.select('prediction').toPandas()
c=np.array(c)
d=np.array(d)
print(confusion_matrix(c,d))

[[1053570  324935]
 [     11      64]]


#### Our model is predicting 64 correct 1's but has some False negatives. 
#### False Positives are quite low only (11)

### Implementing Linear Support Vector Machine

In [51]:

# Implementing LinearSVM 

# svm=LinearSVC(weightCol='classWeightCol')
# mo=svm.fit(train_data)
# svm_pred=mo.transform(test_data)
# svm_f1 = acc_eval.evaluate(svm_pred)
# print("F1 score of model svm was: {}".format(svm_f1))
# sv_c=svm_pred.select('label').toPandas()
# sv_d=svm_pred.select('prediction').toPandas()
# sv_c=np.array(sv_c)
# sv_d=np.array(sv_d)
# print(confusion_matrix(sv_c,sv_d))

### Multilayer Perceptron Classifier

In [23]:
#from pyspark.ml.classification import MultilayerPerceptronClassifier
# # specify layers for the neural network:
# # input layer of size 4 (features), two intermediate of size 5 and 4
# # and output of size 3 (classes)
# layers = [88, 88, 100, 2]

# # create the trainer and set its parameters
# trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# # train the model
# model = trainer.fit(train_data)

# # compute accuracy on the test set
# result = model.transform(test_data)
# nn_eval = MulticlassClassificationEvaluator(metricName='f1')
# nn_f1= nn_eval.evaluate(result)
# model.save('neural_model')

# print("F1 score of model NN was: {}".format(nn_f1))

### Tuning the Model for better scores and identifying best parameters

In [24]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [ 0.5, 2.0, 1.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [ 50, 100, 200])
             .build())

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)  
model=tvs.fit(train_data)
predictions = model.transform(test_data)


In [19]:
best_model=model.bestModel
best_pred=best_model.transform(test_data)
best_eval=MulticlassClassificationEvaluator()
f1=best_eval.evaluate(best_pred)
print('f1 score for best model is ',f1 )

f1 score for best model is  0.8405825143836786


In [16]:
# gets parameters of your best model in terms of train validation split
import numpy as np

# gets parameters of your best model in terms of train validation split
model.getEstimatorParamMaps()[np.argmax(model.validationMetrics)]

{Param(parent='LogisticRegression_47a78f2e0a1e803395bc', name='regParam', doc='regularization parameter (>= 0).'): 0.5,
 Param(parent='LogisticRegression_47a78f2e0a1e803395bc', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_47a78f2e0a1e803395bc', name='maxIter', doc='max number of iterations (>= 0).'): 50}

# Final Analysis
<b> 1. The F1 score of the model is 0.86 and the AUC is 0.88 </b>

<b> 2. F1 Score is the weighted average of Precision and Recall. In case of uneven class distribution like in our dataset, F1 score helps in evaluating the model </b>

<b> 3. After tuning the parameters using train-validation split, the overall model F1 score came out to be 0.84. Thus we can say that the initial model parameters are the best and the score is valid </b>

<b> 4. The confusion matrix results shows that, our model predicted total 64 correct positives ( keeping in mind that this percentage is very high). It has fewer False Positives but sufficiently high False Negatives </b>

<b> 5. F1-score could have been better if Linear SVM was used, but it ran continuously and eventually had to kill it. But I have written the code which will evaluate the performance </b>
    
<b> 6. Multilayer Perceptron Classifier also like SVM ran for couple of hours and had to kill it eventually. Again, I have attached the code used </b>

<b> 7. If my AWS server was more powerful, I would have used Python instead of Pandas and would have leveraged even more powerful algorithms with better feature selection like Anova,RFE and models like XGBoost and even ANN (Keras) </b>


## Algorithms used: Logistic Regression
## Algorithms to be used if I had more time and bigger server: LinearSVM, MultiLayer Perceptron Classifier
