In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=15b784fd08e4cb122cd8e90a035a331f0d81000fd543aaa28cf2c117228582ba
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [1]:
from pyspark.sql.functions import col
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark=SparkSession.builder.appName('my_app').getOrCreate()
df = spark.read.csv("/content/ML_hw_dataset.csv", header=True, inferSchema=True)

df.show()                 

+---+-----------+--------+-----------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|age|        job| marital|        education|default|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
+---+-----------+--------+-----------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| 44|blue-collar| married|         basic.4y|unknown|    yes|  no| cellular|  aug|        thu|     210|       1|  999|       0|nonexistent|         1.4|        93.444|        -36.1|    4.963|     5228.1|  0|
| 53| technician| married|          unknown|     no|     no|  no| cellular|  nov|        fri|     138|       1|  999|       0|nonexistent|        -0.1|          93.2|      

In [2]:
string_cols = [col_name for col_name, col_type in df.dtypes if col_type == 'string']
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_indexed") for col_name in string_cols]
indexer_pipeline = Pipeline(stages=indexers)
df = indexer_pipeline.fit(df).transform(df)
df = df.drop(*string_cols)
df.show()

+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+-------------------+----------------+
|age|duration|campaign|pdays|previous|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|job_indexed|marital_indexed|education_indexed|default_indexed|housing_indexed|loan_indexed|contact_indexed|month_indexed|day_of_week_indexed|poutcome_indexed|
+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+-------------------+----------------+
| 44|     210|       1|  999|       0|         1.4|        93.444|        -36.1|    4.963|     5228.1|  0|        1.0|            0.0|              4.0|            1.0|            0.0|         0.0|        

In [3]:
from pyspark.sql.functions import col
df_numeric = df
#numeric_cols=[c for c ,t in df.dtypes if t in ['int','double','float','long']]
numeric_cols = [column for column in df_numeric.columns if column != 'y']
for col_name in numeric_cols:
  q1=df.approxQuantile(col_name,[0.25],0.01)[0]
  q3=df.approxQuantile(col_name,[0.75],0.01)[0]
  iqr=q3-q1

  k=1.5
  lower_bound=q1-k*iqr
  upper_bound=q3+k*iqr
  
  df=df.filter((col(col_name) >= lower_bound) & (col(col_name)<=upper_bound))

df.show()

+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+-------------------+----------------+
|age|duration|campaign|pdays|previous|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|job_indexed|marital_indexed|education_indexed|default_indexed|housing_indexed|loan_indexed|contact_indexed|month_indexed|day_of_week_indexed|poutcome_indexed|
+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+-------------------+----------------+
| 53|     138|       1|  999|       0|        -0.1|          93.2|        -42.0|    4.021|     5195.8|  0|        2.0|            0.0|              6.0|            0.0|            1.0|         0.0|        

In [3]:
z=['duration','emp_var_rate','euribor3m','nr_employed']
assembler = VectorAssembler(inputCols=z, outputCol="features")
assembled_df = assembler.transform(df).select("features")

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled_df)
df_normalized = scaler_model.transform(assembled_df)

df_normalized = df_normalized.withColumn("id", monotonically_increasing_id())
df = df.withColumn("id", monotonically_increasing_id())

df_normalized = df_normalized.join(df, "id", "outer").drop(df_normalized["id"])
df.show(10)

df=df_normalized.select('scaled_features','y')
df.show()

+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+-------------------+----------------+---+
|age|duration|campaign|pdays|previous|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|job_indexed|marital_indexed|education_indexed|default_indexed|housing_indexed|loan_indexed|contact_indexed|month_indexed|day_of_week_indexed|poutcome_indexed| id|
+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+-------------------+----------------+---+
| 44|     210|       1|  999|       0|         1.4|        93.444|        -36.1|    4.963|     5228.1|  0|        1.0|            0.0|              4.0|            1.0|            0.0|         

In [4]:
df_numeric = df
feature_columns = [column for column in df_numeric.columns if column != 'y']

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
assembled_df = assembler.transform(df_numeric).select("features")

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled_df)
df_normalized = scaler_model.transform(assembled_df)

df_normalized = df_normalized.withColumn("id", monotonically_increasing_id())
df = df.withColumn("id", monotonically_increasing_id())

df_normalized = df_normalized.join(df, "id", "outer").drop(df_normalized["id"])
df.show(10)

df=df_normalized.select('scaled_features','y')
df.show()

df_numeric.show()

+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+-------------------+----------------+---+
|age|duration|campaign|pdays|previous|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|job_indexed|marital_indexed|education_indexed|default_indexed|housing_indexed|loan_indexed|contact_indexed|month_indexed|day_of_week_indexed|poutcome_indexed| id|
+---+--------+--------+-----+--------+------------+--------------+-------------+---------+-----------+---+-----------+---------------+-----------------+---------------+---------------+------------+---------------+-------------+-------------------+----------------+---+
| 53|     138|       1|  999|       0|        -0.1|          93.2|        -42.0|    4.021|     5195.8|  0|        2.0|            0.0|              6.0|            0.0|            1.0|         

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt

numeric_cols = [c for c,d in df.dtypes if d in ['int','double']]
df.select(numeric_cols).show()
assembler = VectorAssembler(inputCols=numeric_cols, outputCol='features')
assembled_data = assembler.transform(df).select('features')

correlation_matrix = Correlation.corr(assembled_data, 'features').head()
corr_array = correlation_matrix[0].toArray()

col_names = [c for c in numeric_cols]

sns.set(font_scale=1.2)
fig, ax = plt.subplots(figsize=(10,10))
sns.heatmap(corr_array, cmap='cool', annot=True, square=True, xticklabels=col_names, yticklabels=col_names, ax=ax)

plt.show()

In [6]:
numeric_cols = [col_name for col_name, col_type in df.dtypes if col_type != 'string' and col_name != "y"]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
assembled_df = assembler.transform(df)
pca = PCA(k=10, inputCol="features", outputCol="pca_features")
model = pca.fit(assembled_df)
df = model.transform(assembled_df).select("pca_features","y")
df.show()

+--------------------+---+
|        pca_features|  y|
+--------------------+---+
|[-0.0999546372599...|  0|
|[0.06507406236595...|  1|
|[-0.8018700894733...|  1|
|[0.41208932986766...|  0|
|[-0.3334411465658...|  0|
|[-1.2621731220604...|  0|
|[-1.1696586088691...|  0|
|[0.33875569406254...|  0|
|[-1.1519260136163...|  0|
|[-1.2631315358863...|  0|
|[-1.1594630313504...|  0|
|[-1.3171979129688...|  0|
|[0.06801899202430...|  0|
|[0.36050043913757...|  0|
|[-1.1473050751484...|  0|
|[-1.1985710866748...|  0|
|[-1.1558260541882...|  0|
|[0.38960344509810...|  0|
|[-1.2858322655068...|  0|
|[0.55329119697683...|  0|
+--------------------+---+
only showing top 20 rows



In [7]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=100)

svm = LinearSVC(featuresCol='pca_features', labelCol="y")
svm_model = svm.fit(train_data)
predictions = svm_model.transform(test_data)


print('recal svm:',svm_model.evaluate(test_data).recallByLabel)
print('accuracy svm:',svm_model.evaluate(test_data).accuracy)
print('precision svm:',svm_model.evaluate(test_data).precisionByLabel)

lr = LogisticRegression(labelCol='y', featuresCol='pca_features')
lr_model = lr.fit(train_data)
predictions = lr_model.transform(test_data)

print('##########################################')
print('recal logisticregression: ',lr_model.evaluate(test_data).recallByLabel)
print('accuracy logisticregression',lr_model.evaluate(test_data).accuracy)
print('precision logisticregression',lr_model.evaluate(test_data).precisionByLabel)

recal svm: [1.0, 0.0]
accuracy svm: 0.9820952380952381
precision svm: [0.9820952380952381, 0.0]
##########################################
recal logisticregression:  [1.0, 0.0]
accuracy logisticregression 0.9820952380952381
precision logisticregression [0.9820952380952381, 0.0]
