In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.ml import *
from pyspark.sql.functions import when, col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Bank_market") \
    .getOrCreate()

In [2]:
path = 'D:\\Onedrive\\Desktop\\bank_marketing\\full.csv'

In [3]:
df = spark.read.option('header', True).option('delimiter', ';').csv(path)

In [4]:
df.show(5)

+---+---------+-------+-----------+-------+-------+----+---------+-----+---------+--------+--------+-----+--------+-----------+----------+------------+-----------+---------+----------+---+
|age|      job|marital|  education|default|housing|loan|  contact|month|dayOfWeek|duration|campaign|pdays|previous|   poutcome|empVarRate|consPriceIdx|consConfIdx|euribor3m|nrEmployed|  y|
+---+---------+-------+-----------+-------+-------+----+---------+-----+---------+--------+--------+-----+--------+-----------+----------+------------+-----------+---------+----------+---+
| 56|housemaid|married|   basic.4y|     no|     no|  no|telephone|  may|      mon|     261|       1|  999|       0|nonexistent|       1.1|      93.994|      -36.4|    4.857|      5191| no|
| 57| services|married|high.school|unknown|     no|  no|telephone|  may|      mon|     149|       1|  999|       0|nonexistent|       1.1|      93.994|      -36.4|    4.857|      5191| no|
| 37| services|married|high.school|     no|    yes|  no

In [5]:
df = df.withColumn("label", 
                   when(col("default") == 'yes', 1)
                   .when(col("default") == 'no', 0)
                   .otherwise(0))

In [6]:
df.select('label').show()

+-----+
|label|
+-----+
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
+-----+
only showing top 20 rows



In [7]:
df = spark.read.options(header=True, delimiter=';', inferSchema=True).csv(path)

In [8]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- empVarRate: double (nullable = true)
 |-- consPriceIdx: double (nullable = true)
 |-- consConfIdx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nrEmployed: double (nullable = true)
 |-- y: string (nullable = true)



In [9]:
df.groupBy('education').count().sort('count', ascending=False).show()

+-------------------+-----+
|          education|count|
+-------------------+-----+
|  university.degree|12168|
|        high.school| 9515|
|           basic.9y| 6045|
|professional.course| 5243|
|           basic.4y| 4176|
|           basic.6y| 2292|
|            unknown| 1731|
|         illiterate|   18|
+-------------------+-----+



In [48]:
df.groupBy('age').count().sort('age', ascending=False).show()

+---+-----+
|age|count|
+---+-----+
| 98|    2|
| 95|    1|
| 94|    1|
| 92|    4|
| 91|    2|
| 89|    2|
| 88|   22|
| 87|    1|
| 86|    8|
| 85|   15|
| 84|    7|
| 83|   17|
| 82|   17|
| 81|   20|
| 80|   31|
| 79|   14|
| 78|   27|
| 77|   20|
| 76|   34|
| 75|   24|
+---+-----+
only showing top 20 rows



In [49]:
ef = df.select('age', 'education', 'duration', 'euribor3m')

In [50]:
ef.show()

+---+-------------------+--------+---------+
|age|          education|duration|euribor3m|
+---+-------------------+--------+---------+
| 56|           basic.4y|     261|    4.857|
| 57|        high.school|     149|    4.857|
| 37|        high.school|     226|    4.857|
| 40|           basic.6y|     151|    4.857|
| 56|        high.school|     307|    4.857|
| 45|           basic.9y|     198|    4.857|
| 59|professional.course|     139|    4.857|
| 41|            unknown|     217|    4.857|
| 24|professional.course|     380|    4.857|
| 25|        high.school|      50|    4.857|
| 41|            unknown|      55|    4.857|
| 25|        high.school|     222|    4.857|
| 29|        high.school|     137|    4.857|
| 57|           basic.4y|     293|    4.857|
| 35|           basic.6y|     146|    4.857|
| 54|           basic.9y|     174|    4.857|
| 35|           basic.6y|     312|    4.857|
| 46|           basic.6y|     440|    4.857|
| 50|           basic.9y|     353|    4.857|
| 39|     

In [51]:
ef.summary().show()

+-------+------------------+---------+------------------+------------------+
|summary|               age|education|          duration|         euribor3m|
+-------+------------------+---------+------------------+------------------+
|  count|             41188|    41188|             41188|             41188|
|   mean| 40.02406040594348|     null| 258.2850101971448|3.6212908128585366|
| stddev|10.421249980934057|     null|259.27924883646455|1.7344474048512557|
|    min|                17| basic.4y|                 0|             0.634|
|    25%|                32|     null|               102|             1.344|
|    50%|                38|     null|               180|             4.857|
|    75%|                47|     null|               319|             4.961|
|    max|                98|  unknown|              4918|             5.045|
+-------+------------------+---------+------------------+------------------+



In [52]:
ef.describe().show()

+-------+------------------+---------+------------------+------------------+
|summary|               age|education|          duration|         euribor3m|
+-------+------------------+---------+------------------+------------------+
|  count|             41188|    41188|             41188|             41188|
|   mean| 40.02406040594348|     null| 258.2850101971448|3.6212908128585366|
| stddev|10.421249980934057|     null|259.27924883646455|1.7344474048512557|
|    min|                17| basic.4y|                 0|             0.634|
|    max|                98|  unknown|              4918|             5.045|
+-------+------------------+---------+------------------+------------------+



In [53]:
df.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'dayOfWeek',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'empVarRate',
 'consPriceIdx',
 'consConfIdx',
 'euribor3m',
 'nrEmployed',
 'y']

In [54]:
df.schema

StructType(List(StructField(age,IntegerType,true),StructField(job,StringType,true),StructField(marital,StringType,true),StructField(education,StringType,true),StructField(default,StringType,true),StructField(housing,StringType,true),StructField(loan,StringType,true),StructField(contact,StringType,true),StructField(month,StringType,true),StructField(dayOfWeek,StringType,true),StructField(duration,IntegerType,true),StructField(campaign,IntegerType,true),StructField(pdays,IntegerType,true),StructField(previous,IntegerType,true),StructField(poutcome,StringType,true),StructField(empVarRate,DoubleType,true),StructField(consPriceIdx,DoubleType,true),StructField(consConfIdx,DoubleType,true),StructField(euribor3m,DoubleType,true),StructField(nrEmployed,DoubleType,true),StructField(y,StringType,true)))

In [55]:
df.schema.fields

[StructField(age,IntegerType,true),
 StructField(job,StringType,true),
 StructField(marital,StringType,true),
 StructField(education,StringType,true),
 StructField(default,StringType,true),
 StructField(housing,StringType,true),
 StructField(loan,StringType,true),
 StructField(contact,StringType,true),
 StructField(month,StringType,true),
 StructField(dayOfWeek,StringType,true),
 StructField(duration,IntegerType,true),
 StructField(campaign,IntegerType,true),
 StructField(pdays,IntegerType,true),
 StructField(previous,IntegerType,true),
 StructField(poutcome,StringType,true),
 StructField(empVarRate,DoubleType,true),
 StructField(consPriceIdx,DoubleType,true),
 StructField(consConfIdx,DoubleType,true),
 StructField(euribor3m,DoubleType,true),
 StructField(nrEmployed,DoubleType,true),
 StructField(y,StringType,true)]

In [56]:
df.select('marital').show()

+--------+
| marital|
+--------+
| married|
| married|
| married|
| married|
| married|
| married|
| married|
| married|
|  single|
|  single|
| married|
|  single|
|  single|
|divorced|
| married|
| married|
| married|
| married|
| married|
|  single|
+--------+
only showing top 20 rows



# Tạo Vectorize bằng StringIndexer for marital

In [62]:
maritalIndexer = feature.StringIndexer().setInputCol('marital').setOutputCol('maritalIndex') # Tạo một object cho Indexer

In [65]:
mModel = maritalIndexer.fit(df) #Để quét trên hết các row

In [66]:
ef = mModel.transform(df) # Áp dụng cho df

In [69]:
ef.select('marital', 'maritalIndex').show()

+--------+------------+
| marital|maritalIndex|
+--------+------------+
| married|         0.0|
| married|         0.0|
| married|         0.0|
| married|         0.0|
| married|         0.0|
| married|         0.0|
| married|         0.0|
| married|         0.0|
|  single|         1.0|
|  single|         1.0|
| married|         0.0|
|  single|         1.0|
|  single|         1.0|
|divorced|         2.0|
| married|         0.0|
| married|         0.0|
| married|         0.0|
| married|         0.0|
| married|         0.0|
|  single|         1.0|
+--------+------------+
only showing top 20 rows



In [70]:
mEncoder = feature.OneHotEncoder().setInputCol('maritalIndex').setOutputCol('maritalVector')

In [80]:
maritalEncoder = mEncoder.fit(ef).transform(ef) # fit trả về object, transform trả về dataframe

In [82]:
maritalEncoder.select('marital', 'maritalIndex', 'maritalVector').show(50) 
# Maried: Vector 3 chiều (1, 0, 0); Single: Vector 3 chiều (0, 1, 0); Divorced: Vector 3 chiều (0, 0, 0); Unknown: (0, 0, 0)

+--------+------------+-------------+
| marital|maritalIndex|maritalVector|
+--------+------------+-------------+
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
|  single|         1.0|(3,[1],[1.0])|
|  single|         1.0|(3,[1],[1.0])|
| married|         0.0|(3,[0],[1.0])|
|  single|         1.0|(3,[1],[1.0])|
|  single|         1.0|(3,[1],[1.0])|
|divorced|         2.0|(3,[2],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
|  single|         1.0|(3,[1],[1.0])|
| married|         0.0|(3,[0],[1.0])|
| married|         0.0|(3,[0],[1.0])|
|  single|         1.0|(3,[1],[1.0])|
|  single|  

# Tạo object vectorize for education

In [99]:
eduIndexer = feature.StringIndexer().setInputCol('education').setOutputCol('eduIndex')

In [100]:
eduEncoder = feature.OneHotEncoder().setInputCol('eduIndex').setOutputCol('eduVector')

# Tạo object training

In [130]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Assembling all the feature columns into a single feature vector
assembler = VectorAssembler(inputCols=["maritalVector", "eduVector", "age"], outputCol="features")

# Logistic Regression classifier
classifier = LogisticRegression(featuresCol="features", labelCol="label")


# Tạo vectorize bằng pipeline

In [101]:
pipeline = Pipeline(stages=[maritalIndexer, mEncoder, eduIndexer, eduEncoder ])  # Vectorize cho cả marital và education sử dụng pipeline => Thêm object nào thì cần thêm vào pipeline thì sẽ tự động chạy

In [102]:
model = pipeline.fit(df)

In [103]:
modelEncoder = model.transform(df)

In [104]:
modelEncoder.select('marital', 'maritalIndex', 'maritalVector', 'eduIndex', 'eduVector').show()

+--------+------------+-------------+--------+-------------+
| marital|maritalIndex|maritalVector|eduIndex|    eduVector|
+--------+------------+-------------+--------+-------------+
| married|         0.0|(3,[0],[1.0])|     4.0|(7,[4],[1.0])|
| married|         0.0|(3,[0],[1.0])|     1.0|(7,[1],[1.0])|
| married|         0.0|(3,[0],[1.0])|     1.0|(7,[1],[1.0])|
| married|         0.0|(3,[0],[1.0])|     5.0|(7,[5],[1.0])|
| married|         0.0|(3,[0],[1.0])|     1.0|(7,[1],[1.0])|
| married|         0.0|(3,[0],[1.0])|     2.0|(7,[2],[1.0])|
| married|         0.0|(3,[0],[1.0])|     3.0|(7,[3],[1.0])|
| married|         0.0|(3,[0],[1.0])|     6.0|(7,[6],[1.0])|
|  single|         1.0|(3,[1],[1.0])|     3.0|(7,[3],[1.0])|
|  single|         1.0|(3,[1],[1.0])|     1.0|(7,[1],[1.0])|
| married|         0.0|(3,[0],[1.0])|     6.0|(7,[6],[1.0])|
|  single|         1.0|(3,[1],[1.0])|     1.0|(7,[1],[1.0])|
|  single|         1.0|(3,[1],[1.0])|     1.0|(7,[1],[1.0])|
|divorced|         2.0|(

In [131]:
pipeline = Pipeline(stages=[maritalIndexer, mEncoder, eduIndexer, eduEncoder, assembler, classifier])

In [132]:
model2 = pipeline.fit(df)

In [133]:
model2 = model2.transform(df)

In [134]:
dir(model2)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collect_as_arrow',
 '_jcols',
 '_jdf',
 '_jmap',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_sort_cols',
 '_support_repr_html',
 '_to_corrected_pandas_type',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'exceptAll',
 'explain',
 'fillna',
 'fi

In [139]:
model2.prediction

['age',
 'job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'dayOfWeek',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'empVarRate',
 'consPriceIdx',
 'consConfIdx',
 'euribor3m',
 'nrEmployed',
 'y',
 'label',
 'maritalIndex',
 'maritalVector',
 'eduIndex',
 'eduVector',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [144]:
model2.select('prediction') == model2.select('label')

False