In [1]:
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=dda095419e0fdc07d59ae0ba5f150620e169e3785cc603f3def115d0a0f03bd3
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [19]:
from pyspark.sql.functions import col,count,when,isnan
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import MinMaxScaler


In [9]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [11]:
pathh = "/content/drive/MyDrive/ML_hw_dataset (1).csv"

In [12]:
spark = SparkSession.builder.appName("DataPreprocessing").getOrCreate()
df = spark.read.csv(pathh, header=True, inferSchema=True)
df.show()

+---+-----------+--------+-----------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|age|        job| marital|        education|default|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
+---+-----------+--------+-----------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| 44|blue-collar| married|         basic.4y|unknown|    yes|  no| cellular|  aug|        thu|     210|       1|  999|       0|nonexistent|         1.4|        93.444|        -36.1|    4.963|     5228.1|  0|
| 53| technician| married|          unknown|     no|     no|  no| cellular|  nov|        fri|     138|       1|  999|       0|nonexistent|        -0.1|          93.2|      

In [13]:
df.describe().show()

+-------+------------------+-------+--------+---------+-------+-------+-----+---------+-----+-----------+------------------+------------------+------------------+-------------------+--------+-------------------+------------------+------------------+------------------+-----------------+-------------------+
|summary|               age|    job| marital|education|default|housing| loan|  contact|month|day_of_week|          duration|          campaign|             pdays|           previous|poutcome|       emp_var_rate|    cons_price_idx|     cons_conf_idx|         euribor3m|      nr_employed|                  y|
+-------+------------------+-------+--------+---------+-------+-------+-----+---------+-----+-----------+------------------+------------------+------------------+-------------------+--------+-------------------+------------------+------------------+------------------+-----------------+-------------------+
|  count|             41188|  41188|   41188|    41188|  41188|  41188|41188|  

In [14]:
string_cols = [c for c, t in df.dtypes if t == "string"]
indexers = {c: StringIndexer(inputCol=c, outputCol=c+"_index") for c in string_cols}

indexed_df = df
for c, indexer in indexers.items():
    indexed_df = indexer.fit(indexed_df).transform(indexed_df)
print("Before:")
indexed_df.show(3)
numeric_cols = [c for c, t in indexed_df.dtypes if t in ["int", "double", "float", "long"]]
numeric_df = indexed_df.select(numeric_cols)
print("**********************************************************************************\nAfter:")
numeric_df.show(3)

Before:
+---+-----------+-------+-----------------+-------+-------+----+--------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+-----------------+--------------+
|age|        job|marital|        education|default|housing|loan| contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|job_index|marital_index|education_index|default_index|housing_index|loan_index|contact_index|month_index|day_of_week_index|poutcome_index|
+---+-----------+-------+-----------------+-------+-------+----+--------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+---------

In [20]:
null_counts = numeric_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in numeric_df.columns])
print("Nulls:")
null_counts.show()
print("We have no null values in this dataset.")

Nulls:
+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+-----------------+--------------+
|age|duration|campaign|pdays|previous|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|job_index|marital_index|education_index|default_index|housing_index|loan_index|contact_index|month_index|day_of_week_index|poutcome_index|
+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+-----------------+--------------+
|  0|       0|       0|    0|       0|           0|             0|            0|        0|          0|  0|        0|            0|              0|            0|            0|         0|            0|          0|                0|             0|
+---+--------

In [21]:
numeric_df.show()

+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+-----------------+--------------+
|age|duration|campaign|pdays|previous|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|job_index|marital_index|education_index|default_index|housing_index|loan_index|contact_index|month_index|day_of_week_index|poutcome_index|
+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+-----------------+--------------+
| 44|     210|       1|  999|       0|         1.4|        93.444|        -36.1|    4.963|     5228.1|  0|      1.0|          0.0|            4.0|          1.0|          0.0|       0.0|          0.0|        2.0|              0.0|           0.0|
| 53|     138|      

In [22]:
numeric_df.show()

+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+-----------------+--------------+
|age|duration|campaign|pdays|previous|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|job_index|marital_index|education_index|default_index|housing_index|loan_index|contact_index|month_index|day_of_week_index|poutcome_index|
+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+-----------------+--------------+
| 44|     210|       1|  999|       0|         1.4|        93.444|        -36.1|    4.963|     5228.1|  0|      1.0|          0.0|            4.0|          1.0|          0.0|       0.0|          0.0|        2.0|              0.0|           0.0|
| 53|     138|      

In [23]:
numeric_df.count()
#numeric_cols

41188

In [None]:
numeric_df.count()

41188

In [30]:
numeric_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- emp_var_rate: double (nullable = true)
 |-- cons_price_idx: double (nullable = true)
 |-- cons_conf_idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr_employed: double (nullable = true)
 |-- y: integer (nullable = true)
 |-- job_index: double (nullable = false)
 |-- marital_index: double (nullable = false)
 |-- education_index: double (nullable = false)
 |-- default_index: double (nullable = false)
 |-- housing_index: double (nullable = false)
 |-- loan_index: double (nullable = false)
 |-- contact_index: double (nullable = false)
 |-- month_index: double (nullable = false)
 |-- day_of_week_index: double (nullable = false)
 |-- poutcome_index: double (nullable = false)



In [31]:
featureforVec = []
for c in numeric_df.columns:
  if c !='y':
    featureforVec.append(c)


In [32]:
featureforVec

['age',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'emp_var_rate',
 'cons_price_idx',
 'cons_conf_idx',
 'euribor3m',
 'nr_employed',
 'job_index',
 'marital_index',
 'education_index',
 'default_index',
 'housing_index',
 'loan_index',
 'contact_index',
 'month_index',
 'day_of_week_index',
 'poutcome_index']

In [33]:
numeric_df3 = numeric_df

In [34]:
feature_assembeled2 = VectorAssembler(inputCols=featureforVec,outputCol='features')
numeric_df2_vectored= feature_assembeled2.transform(numeric_df3)
numeric_df2_vectored.show()

+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+-----------------+--------------+--------------------+
|age|duration|campaign|pdays|previous|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|job_index|marital_index|education_index|default_index|housing_index|loan_index|contact_index|month_index|day_of_week_index|poutcome_index|            features|
+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+---------+-------------+---------------+-------------+-------------+----------+-------------+-----------+-----------------+--------------+--------------------+
| 44|     210|       1|  999|       0|         1.4|        93.444|        -36.1|    4.963|     5228.1|  0|      1.0|          0.0|            4.0|          1.0|          0.0|       0.0|          0.0|   

In [35]:
numeric_df2 = numeric_df2_vectored.select('features','y')

In [36]:
numeric_df2.show()

+--------------------+---+
|            features|  y|
+--------------------+---+
|[44.0,210.0,1.0,9...|  0|
|[53.0,138.0,1.0,9...|  0|
|[28.0,339.0,3.0,6...|  1|
|[39.0,185.0,2.0,9...|  0|
|[55.0,137.0,1.0,3...|  1|
|[30.0,68.0,8.0,99...|  0|
|(20,[0,1,2,3,5,6,...|  0|
|[39.0,191.0,1.0,9...|  0|
|[36.0,174.0,1.0,3...|  1|
|[27.0,191.0,2.0,9...|  0|
|[34.0,62.0,2.0,99...|  0|
|(20,[0,1,2,3,5,6,...|  0|
|[55.0,372.0,3.0,9...|  1|
|[33.0,75.0,5.0,99...|  0|
|[26.0,1021.0,1.0,...|  0|
|[52.0,117.0,2.0,9...|  0|
|[35.0,1034.0,2.0,...|  1|
|[27.0,540.0,1.0,9...|  1|
|[28.0,140.0,1.0,9...|  0|
|[26.0,104.0,4.0,9...|  0|
+--------------------+---+
only showing top 20 rows



In [37]:
scaler=MinMaxScaler(inputCol="features",outputCol="scaled_features")

In [38]:

scalerModel = scaler.fit(numeric_df2)


In [39]:
scaledData = scalerModel.transform(numeric_df2)


In [40]:
scaledData2 = scaledData.select('scaled_features','y')
scaledData2.show()

+--------------------+---+
|     scaled_features|  y|
+--------------------+---+
|(20,[0,1,3,5,6,7,...|  0|
|[0.44444444444444...|  0|
|[0.13580246913580...|  1|
|[0.27160493827160...|  0|
|[0.46913580246913...|  1|
|[0.16049382716049...|  0|
|(20,[0,1,3,5,6,7,...|  0|
|(20,[0,1,3,5,6,7,...|  0|
|[0.23456790123456...|  1|
|[0.12345679012345...|  0|
|[0.20987654320987...|  0|
|(20,[0,1,3,5,6,7,...|  0|
|[0.46913580246913...|  1|
|[0.19753086419753...|  0|
|[0.11111111111111...|  0|
|[0.43209876543209...|  0|
|[0.22222222222222...|  1|
|[0.12345679012345...|  1|
|[0.13580246913580...|  0|
|[0.11111111111111...|  0|
+--------------------+---+
only showing top 20 rows



In [41]:
train_data, test_data =scaledData2.randomSplit([0.7, 0.3])

In [42]:
from pyspark.ml.classification import LinearSVC


In [43]:
lsvc = LinearSVC(featuresCol='scaled_features',labelCol='y',maxIter=5).fit(train_data)


In [44]:
lsvc

LinearSVCModel: uid=LinearSVC_84c9396c1b90, numClasses=2, numFeatures=20

In [45]:
train_result = lsvc.evaluate(train_data).predictions

In [46]:
train_result.show()

+--------------------+---+--------------------+----------+
|     scaled_features|  y|       rawPrediction|prediction|
+--------------------+---+--------------------+----------+
|(20,[0,1,2,3,4,5,...|  0|[1.46313634191875...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.31728935869051...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.33316555419792...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.35208185321906...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.34599891519562...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[0.79988823853899...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[0.21911355713001...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.39743815637322...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.51563695994411...|       0.0|
|(20,[0,1,2,3,4,5,...|  1|[-0.0336256652107...|       1.0|
|(20,[0,1,2,3,4,5,...|  0|[-0.0911890401319...|       1.0|
|(20,[0,1,2,3,4,5,...|  0|[1.33423790135531...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[0.24898938275548...|       0.0|
|(20,[0,1,2,3,4,5,...|  1|[0.70065791499685...|       0.

In [47]:
results = lsvc.evaluate(test_data).predictions
results.show()

+--------------------+---+--------------------+----------+
|     scaled_features|  y|       rawPrediction|prediction|
+--------------------+---+--------------------+----------+
|(20,[0,1,2,3,4,5,...|  0|[1.21636384564524...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.73448464132129...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.62242738223775...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.45759826466453...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[-0.1545137593413...|       1.0|
|(20,[0,1,2,3,4,5,...|  0|[1.38826493392795...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[0.48954625406724...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[0.33139551299294...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.12773913079154...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.61658933597707...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.15039835924869...|       0.0|
|(20,[0,1,2,3,4,5,...|  0|[1.41040050546555...|       0.0|
|(20,[0,1,2,3,4,5,...|  1|[-0.4428570692944...|       1.0|
|(20,[0,1,2,3,4,5,...|  1|[0.16147662711969...|       0.

In [48]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [49]:

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="y", metricName="accuracy")
accuracy = evaluator.evaluate(results)
print("Accuracy = %g" % accuracy)

Accuracy = 0.90326


In [50]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="y", metricName="weightedPrecision")
precision = evaluator.evaluate(results)
print("Precision = %g" % precision)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="y", metricName="weightedRecall")
recall = evaluator.evaluate(results)
print("Recall = %g" % recall)

Precision = 0.887843
Recall = 0.90326
