In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=2f7fc4f4b2b5848ff18530e7ec1e8200d56ef8f8118f3d39ccf4cf0e0a73adf4
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
from pyspark.sql import SparkSession

In [3]:
session = SparkSession.builder.appName("MyCode").getOrCreate()

In [4]:
data = session.read.csv("loan_data.csv", header = True, inferSchema = True)

In [5]:
data.show(4)

+-------------+------------------+--------+-----------+--------------+-----+----+-----------------+---------+----------+--------------+-----------+-------+--------------+
|credit_policy|           purpose|int_rate|installment|log_annual_inc|  dti|fico|days_with_cr_line|revol_bal|revol_util|inq_last_6mths|delinq_2yrs|pub_rec|not_fully_paid|
+-------------+------------------+--------+-----------+--------------+-----+----+-----------------+---------+----------+--------------+-----------+-------+--------------+
|            1|debt_consolidation|  0.1189|      829.1|   11.35040654|19.48| 737|      5639.958333|    28854|      52.1|             0|          0|      0|             0|
|            1|       credit_card|  0.1071|     228.22|   11.08214255|14.29| 707|           2760.0|    33623|      76.7|             0|          0|      0|             0|
|            1|debt_consolidation|  0.1357|     366.86|   10.37349118|11.63| 682|           4710.0|     3511|      25.6|             1|          

In [101]:
data.columns

['credit_policy',
 'purpose',
 'int_rate',
 'installment',
 'log_annual_inc',
 'dti',
 'fico',
 'days_with_cr_line',
 'revol_bal',
 'revol_util',
 'inq_last_6mths',
 'delinq_2yrs',
 'pub_rec',
 'not_fully_paid']

In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder

In [141]:
str_index = StringIndexer(inputCols = ["credit_policy", "purpose", "int_rate", "installment", "log_annual_inc", "dti", "fico", "days_with_cr_line", "revol_bal", "revol_util", "inq_last_6mths", "delinq_2yrs", "pub_rec", "not_fully_paid"], outputCols = ["newcredit_policy", "newpurpose", "newint_rate", "newinstallment", "newlog_annual_inc", "newdti", "newfico", "newdays_with_cr_line", "newrevol_bal", "newrevol_util", "newinq_last_6mths", "newdeling_2yrs", "newpub_rec", "newnot_fully_paid"], handleInvalid = "skip", stringOrderType= "frequencyDesc")

In [142]:
one_hot = OneHotEncoder(inputCols =["newpurpose"],  outputCols=["newpurpose1"] )

In [154]:
vector_ass = VectorAssembler(inputCols = ["newcredit_policy","newpurpose1","newint_rate", "newinstallment", "newdays_with_cr_line", "newrevol_bal", "newrevol_util", "newinq_last_6mths", "newdeling_2yrs", "newpub_rec"], outputCol = "allfeatures")

In [155]:
from pyspark.ml.classification import LogisticRegression

In [156]:
log_reg = LogisticRegression(featuresCol="allfeatures", labelCol = "newnot_fully_paid")

In [157]:
from pyspark.ml import Pipeline

In [158]:
mypipeline = Pipeline(stages = [str_index, one_hot, vector_ass, log_reg])

In [159]:
training, test = data.randomSplit([0.8, 0.2])

In [160]:
log_reg_model = mypipeline.fit(training)

In [161]:
result = log_reg_model.transform(test)

In [162]:
result.show()

+-------------+------------------+--------+-----------+--------------+-----+----+-----------------+---------+----------+--------------+-----------+-------+--------------+----------------+----------+-----------+--------------+-----------------+------+-------+--------------------+------------+-------------+-----------------+--------------+----------+-----------------+-------------+--------------------+--------------------+--------------------+----------+
|credit_policy|           purpose|int_rate|installment|log_annual_inc|  dti|fico|days_with_cr_line|revol_bal|revol_util|inq_last_6mths|delinq_2yrs|pub_rec|not_fully_paid|newcredit_policy|newpurpose|newint_rate|newinstallment|newlog_annual_inc|newdti|newfico|newdays_with_cr_line|newrevol_bal|newrevol_util|newinq_last_6mths|newdeling_2yrs|newpub_rec|newnot_fully_paid|  newpurpose1|         allfeatures|       rawPrediction|         probability|prediction|
+-------------+------------------+--------+-----------+--------------+-----+----+-----

In [163]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
eval = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol = "newnot_fully_paid")

In [164]:
eval.evaluate(result)

0.6011997913406363