In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [2]:
spark=SparkSession.builder.appName("logistic").getOrCreate()

In [8]:
file_path="hdfs://localhost:9000/user/hadoop/kaggel/bank.csv"
bankDF= spark.read.csv(file_path,header=True, inferSchema=True,sep=";")
bankDF.show()

+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30|   unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35|   management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30|   management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|  blue-collar|married|secondary|     no|      0|    yes|  no| unknown| 

In [None]:


categorical_cols = ['job', 'marital', 'education', 'default', 'housing', 
                    'loan', 'contact', 'month', 'poutcome']
indexers = [StringIndexer(inputCol=col, outputCol=col+"_Index") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_Index", outputCol=col+"_Vec") for col in categorical_cols]

label_indexer = StringIndexer(inputCol="y", outputCol="label")

assembler = VectorAssembler(
    inputCols=['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous'] +
              [col+"_Vec" for col in categorical_cols],
    outputCol="features"
)

lr = LogisticRegression(featuresCol='features', labelCol='label')

pipeline = Pipeline(stages=indexers + encoders + [label_indexer, assembler, lr])

model = pipeline.fit(bankDF)
      
predictions = model.transform(bankDF)
predictions.select("features", "label", "prediction", "probability").show()


+--------------------+-----+----------+--------------------+
|            features|label|prediction|         probability|
+--------------------+-----+----------+--------------------+
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.86876321156153...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.97209420115662...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.88043859827153...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.98339271254021...|
|(42,[0,2,3,4,5,8,...|  0.0|       0.0|[0.99195566268778...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.84148117127748...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.83851917122759...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.97027542211690...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.99325792586304...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.93355075190214...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.98329249843300...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.94269352693287...|
|(42,[0,1,2,3,4,5,...|  0.0|       0.0|[0.8733880694049,...|
|(42,[0,1,2,3,4,5,...|  