In [None]:
!pip install pyspark

In [None]:
from pyspark import SparkConf,SparkContext
conf=SparkConf().setAppName('abc').setMaster('local') #
sc=SparkContext(conf=conf)
sc.setLogLevel('ERROR')
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('abc').config('','').getOrCreate()
# NUMPY Dense Vector
import numpy as np
v1=np.array([1,2,3,4,5])
print(v1)
# simple python list
v2=[1,2,3,4,5,6]
print(v2)
# Sparce & dense spark vector
from pyspark.mllib.linalg import Vectors
v3=Vectors.dense([3,4,5,6])
print(v3)
v4 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
print(v4)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

df = spark.read.csv('/content/drive/MyDrive/ColabInputs/bank.csv', header = True, inferSchema = True)
df.printSchema()

In [None]:
df.show(5,0)

In [None]:
df.count()

In [None]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

In [None]:
numeric_data = df.select(numeric_features).toPandas()
import seaborn as sns
sns.pairplot(numeric_data)

In [None]:
# remove age and month
df = df.select('age', 'job', 'marital', 'education', 'default', 'balance','housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit')

cols = df.columns
df.printSchema()

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing','loan', 'contact', 'poutcome']
stages = []
for categoricalCol in categoricalColumns:
  stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
  encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
  stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
assemblerInputs

In [None]:
stages

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

In [None]:
df.show(5,0)

In [None]:
df.select(['label','features']).show(5,0)

In [None]:
import pandas as pd

df2=pd.DataFrame(df.take(5),columns=df.columns).iloc[:,:2]
pd.set_option('display.max_colwidth', None)
print(df2)

In [None]:
print(df.count())
train, test = df.randomSplit([0.7, 0.3], seed = 123)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [None]:
# Logistic Regression Model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

In [None]:
lrModel.coefficients

In [None]:
trainingSummary = lrModel.summary

In [None]:
roc = trainingSummary.roc.toPandas()
import matplotlib.pyplot as plt
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))