In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
				.builder \
				.appName('SparkApp') \
				.getOrCreate()
                #.config(master = '<ipaddress of master. to be used in a cluster mode>')\

In [2]:
spark

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,FloatType

In [4]:
Myschema = StructType([
    StructField("Loan_ID", StringType()),
    StructField("Gender", StringType()),
    StructField("Married", StringType()),
    StructField("Dependents", IntegerType()),
    StructField("Education", StringType()),
    StructField("Self_Employed", StringType()),
    StructField("ApplicantIncome", FloatType()),
    StructField("CoapplicantIncome", FloatType()),
    StructField("LoanAmount", FloatType()),
    StructField("Loan_Amount_Term", IntegerType()),
    StructField("Credit_History", IntegerType()),
    StructField("Property_Area", StringType()),
    StructField("Loan_Status", StringType())
			])

In [5]:
training = spark.read.csv(path = '/FileStore/tables/LoanData.csv', header=True,schema=Myschema) ##, ,schema=MyschemainferSchema=True

In [6]:
type(training)

In [7]:
training.printSchema()

In [8]:
training = spark.read.csv(path = '/FileStore/tables/g8zdj67d1494614178462/universalBank.csv', inferSchema=True, header=True)

In [9]:
type(training)

In [10]:
training.show(5)

In [11]:
training.printSchema()

In [12]:
display(training)

In [13]:
display(training)

In [14]:
training.select('Gender','Married','Dependents','Education','ApplicantIncome','LoanAmount','Loan_Status').show()

In [15]:
training.groupby('Gender', 'Loan_Status').count().show()

In [16]:
training.rdd.take(5)

In [17]:
training.createOrReplaceTempView("training_tbl")

In [18]:
spark.sql('Select Loan_Status , count(*) as cnt from training_tbl group by Loan_Status').show()

In [19]:
spark.sql('select min(LoanAmount), max(LoanAmount) from training_tbl').show()

In [20]:
def Normalize(x):
  return (x-9)/(700-9)

In [21]:
from pyspark.sql.functions import min, max
training.select(min('LoanAmount')).first()[0]

In [22]:
from pyspark.sql.functions import mean, min, max
minVal = training.select(min('LoanAmount')).first()[0]
maxVal = training.select(max('LoanAmount')).first()[0]

In [23]:
## Using UDF with Spark Sql Tables

In [24]:
def Normalize(x):
  return (x-minVal)/(maxVal-minVal)

In [25]:
from pyspark.sql.functions import udf
spark.udf.register('Normalize_function', Normalize,FloatType())

In [26]:
spark.sql('select LoanAmount, Normalize_function(LoanAmount) as NormalizedLoan from training_tbl').show()

In [27]:
## Using udf with Dataframes

In [28]:
from pyspark.sql.functions import udf
Normalize_fn = udf(Normalize, FloatType())

In [29]:
training.select(Normalize_fn("LoanAmount").alias('new')).show(5)

In [30]:
training.stat.corr('ApplicantIncome','LoanAmount')

In [31]:
training.printSchema()

In [32]:
display(training)

In [33]:
from pyspark.ml.feature import StringIndexer

In [34]:
indexer = StringIndexer(inputCol="Gender", outputCol="Gender_indexed")

In [35]:
indexer.fit(training)

In [36]:
indexer = StringIndexer(inputCol="Gender", outputCol="Gender_indexed").fit(training)

In [37]:
training = indexer.transform(training)

In [38]:
training = StringIndexer(inputCol="Gender", outputCol="Gender_indexed").fit(training).transform(training)

In [39]:
training.show(10)

In [40]:
#training = StringIndexer(inputCol="Gender", outputCol="Gender_indexed").fit(training).transform(training)
training = StringIndexer(inputCol="Married", outputCol="Married_indexed").fit(training).transform(training)
training = StringIndexer(inputCol="Loan_Status",outputCol="Loan_Status_indexed").fit(training).transform(training)
training = StringIndexer(inputCol="Education", outputCol="Education_indexed").fit(training).transform(training)

In [41]:
display(training)

In [42]:
training = training.select('Gender_indexed','Married_indexed','Dependents',
                           'Education_indexed','LoanAmount','Loan_Status_indexed')

In [43]:

training.show(5)

In [44]:
from pyspark.ml.feature import VectorAssembler

In [45]:
assembler = VectorAssembler(
    inputCols=['Gender_indexed','Married_indexed','Dependents','Education_indexed','LoanAmount'],
    outputCol="features")

training = assembler.transform(training)

In [46]:
training.show(5)

In [47]:
training.printSchema()

In [48]:
training = training.select('features','Loan_Status_indexed')

In [49]:
training.show(5)

In [50]:
#from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [51]:
training = VectorIndexer(inputCol="features", outputCol="indexedFeatures",maxCategories=4).\
                         fit(training).\
                         transform(training)

In [52]:
training.show(5)

In [53]:
(trainingData, testData) = training.randomSplit([0.7, 0.3])

In [54]:
from pyspark.ml.classification import DecisionTreeClassifier

In [55]:
dt = DecisionTreeClassifier(labelCol="Loan_Status_indexed", featuresCol="indexedFeatures",maxDepth = 8)
model = dt.fit(trainingData)
predictions = model.transform(testData)

In [56]:
predictions.show(5)

In [57]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [58]:
evaluator = BinaryClassificationEvaluator(labelCol="Loan_Status_indexed")

In [59]:
accuracy = evaluator.evaluate(predictions)

In [60]:
accuracy

In [61]:
print(model.toDebugString)