
This is a Glass Identification Data Set from UCI. It contains 10 attributes including id. 
The response is glass type(discrete 7 values)



In [None]:
pip install pyspark



In [None]:
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

print([(x.__name__,x.__version__) for x in [np, pd, pyspark]])

spark = pyspark.sql.SparkSession.builder.appName('Glass_Classification').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sc.setLogLevel("INFO")

[('numpy', '1.21.6'), ('pandas', '1.3.5'), ('pyspark', '3.2.1')]




In [None]:
df = spark.read.csv('glass.csv',header=True,inferSchema=True)
df.show()

+-------+-----+----+----+-----+----+----+---+----+----+
|     RI|   Na|  Mg|  Al|   Si|   K|  Ca| Ba|  Fe|Type|
+-------+-----+----+----+-----+----+----+---+----+----+
|1.52101|13.64|4.49| 1.1|71.78|0.06|8.75|0.0| 0.0|   1|
|1.51761|13.89| 3.6|1.36|72.73|0.48|7.83|0.0| 0.0|   1|
|1.51618|13.53|3.55|1.54|72.99|0.39|7.78|0.0| 0.0|   1|
|1.51766|13.21|3.69|1.29|72.61|0.57|8.22|0.0| 0.0|   1|
|1.51742|13.27|3.62|1.24|73.08|0.55|8.07|0.0| 0.0|   1|
|1.51596|12.79|3.61|1.62|72.97|0.64|8.07|0.0|0.26|   1|
|1.51743| 13.3| 3.6|1.14|73.09|0.58|8.17|0.0| 0.0|   1|
|1.51756|13.15|3.61|1.05|73.24|0.57|8.24|0.0| 0.0|   1|
|1.51918|14.04|3.58|1.37|72.08|0.56| 8.3|0.0| 0.0|   1|
|1.51755| 13.0| 3.6|1.36|72.99|0.57| 8.4|0.0|0.11|   1|
|1.51571|12.72|3.46|1.56| 73.2|0.67|8.09|0.0|0.24|   1|
|1.51763| 12.8|3.66|1.27|73.01| 0.6|8.56|0.0| 0.0|   1|
|1.51589|12.88|3.43| 1.4|73.28|0.69|8.05|0.0|0.24|   1|
|1.51748|12.86|3.56|1.27|73.21|0.54|8.38|0.0|0.17|   1|
|1.51763|12.61|3.59|1.31|73.29|0.58| 8.5|0.0| 0.

In [None]:
df.groupBy('Type').count().show()

+----+-----+
|Type|count|
+----+-----+
|   1|   70|
|   6|    9|
|   3|   17|
|   5|   13|
|   7|   29|
|   2|   76|
+----+-----+



In [None]:
from pyspark.sql.functions import col, explode, array, lit


In [None]:

type1_df = df.filter(col("Type") == 1)
type2_df = df.filter(col("Type") == 2)
type3_df = df.filter(col("Type") == 3)
type5_df = df.filter(col("Type") == 5)
type6_df = df.filter(col("Type") == 6)
type7_df = df.filter(col("Type") == 7)



In [None]:
ratio1 = int(type2_df.count()/type1_df.count())
ratio3 = int(type2_df.count()/type3_df.count())
ratio5 = int(type2_df.count()/type5_df.count())
ratio6 = int(type2_df.count()/type6_df.count())
ratio7 = int(type2_df.count()/type7_df.count())

In [None]:
a = range(ratio1)
# duplicate the minority rows
oversampled_df1 = type1_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

In [None]:
a = range(ratio3)
# duplicate the minority rows
oversampled_df3 = type3_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

In [None]:
a = range(ratio5)
# duplicate the minority rows
oversampled_df5 = type5_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

In [None]:
a = range(ratio6)
# duplicate the minority rows
oversampled_df6 = type6_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

In [None]:
a = range(ratio7)
# duplicate the minority rows
oversampled_df7 = type7_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

In [None]:

oversampled_df5.groupBy('Type').count().show()

+----+-----+
|Type|count|
+----+-----+
|   5|   65|
+----+-----+



In [None]:
df = df.unionAll(oversampled_df7)
df.groupby('Type').count().show()

+----+-----+
|Type|count|
+----+-----+
|   1|   70|
|   6|   81|
|   3|   85|
|   5|   78|
|   7|   87|
|   2|   76|
+----+-----+



In [None]:
print(df.count())

477


In [None]:
df.groupby('Type').count().show()

+----+-----+
|Type|count|
+----+-----+
|   1|   70|
|   6|   81|
|   3|   85|
|   5|   78|
|   7|   87|
|   2|   76|
+----+-----+



Imbalanced data

In [None]:
print(df.count())

477


In [None]:
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+---+---+---+---+---+---+---+---+----+
| RI| Na| Mg| Al| Si|  K| Ca| Ba| Fe|Type|
+---+---+---+---+---+---+---+---+---+----+
|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|
+---+---+---+---+---+---+---+---+---+----+



In [None]:
df.printSchema()

root
 |-- RI: double (nullable = true)
 |-- Na: double (nullable = true)
 |-- Mg: double (nullable = true)
 |-- Al: double (nullable = true)
 |-- Si: double (nullable = true)
 |-- K: double (nullable = true)
 |-- Ca: double (nullable = true)
 |-- Ba: double (nullable = true)
 |-- Fe: double (nullable = true)
 |-- Type: integer (nullable = true)



In [None]:
df.columns

['RI', 'Na', 'Mg', 'Al', 'Si', 'K', 'Ca', 'Ba', 'Fe', 'Type']

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
assembler = VectorAssembler(inputCols = ['RI', 'Na', 'Mg', 'Al', 'Si', 'K', 'Ca', 'Ba', 'Fe'], outputCol = 'features')

Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline


In [None]:
lr = LogisticRegression(featuresCol='features',labelCol='Type')

In [None]:
from sklearn.preprocessing import StandardScaler

In [None]:
pipeline = Pipeline(stages=[assembler,  lr])

In [None]:
train, test = df.randomSplit([0.7, 0.3],seed=2)

In [None]:
print(df.count())

477


In [None]:
test.count()

157

In [None]:
lr_model = pipeline.fit(train)

In [None]:
results = lr_model.transform(test)
results.show(5)

+-------+-----+----+----+-----+----+----+---+----+----+--------------------+--------------------+--------------------+----------+
|     RI|   Na|  Mg|  Al|   Si|   K|  Ca| Ba|  Fe|Type|            features|       rawPrediction|         probability|prediction|
+-------+-----+----+----+-----+----+----+---+----+----+--------------------+--------------------+--------------------+----------+
|1.51316|13.02| 0.0|3.04|70.48|6.21|6.96|0.0| 0.0|   5|[1.51316,13.02,0....|[-2.7822387363258...|[1.57645880528239...|       5.0|
|1.51514|14.01|2.68| 3.5|69.89|1.68|5.87|2.2| 0.0|   5|[1.51514,14.01,2....|[-2.3336190298122...|[8.16593658871527...|       5.0|
|1.51569|13.24|3.49|1.47|73.25|0.38|8.03|0.0| 0.0|   2|[1.51569,13.24,3....|[-4.3400557423882...|[5.63099070389085...|       1.0|
|1.51574|14.86|3.67|1.74|71.87|0.16|7.36|0.0|0.12|   2|[1.51574,14.86,3....|[-3.9810641786580...|[1.62328709768353...|       6.0|
| 1.5159|13.02|3.58|1.51|73.12|0.69|7.96|0.0| 0.0|   2|[1.5159,13.02,3.5...|[-4.3624178692

In [None]:
results.select('Type','prediction').show()

+----+----------+
|Type|prediction|
+----+----------+
|   5|       5.0|
|   5|       5.0|
|   1|       3.0|
|   2|       3.0|
|   1|       2.0|
|   2|       6.0|
|   2|       3.0|
|   2|       3.0|
|   3|       3.0|
|   7|       7.0|
|   7|       7.0|
|   7|       7.0|
|   2|       2.0|
|   2|       2.0|
|   2|       2.0|
|   2|       1.0|
|   7|       5.0|
|   2|       3.0|
|   2|       2.0|
|   2|       2.0|
+----+----------+
only showing top 20 rows



MultiClass Evaluator

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
eval = MulticlassClassificationEvaluator(predictionCol = 'prediction', labelCol = 'Type')

In [None]:
auc = eval.evaluate(results)
auc

0.8411587801431619

Decision Tree Classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
dt = DecisionTreeClassifier(featuresCol='features',labelCol='Type',maxDepth=8,impurity='entropy',seed=1)

In [None]:
pipeline = Pipeline(stages=[assembler, dt])

In [None]:
dt_model = pipeline.fit(train)

In [None]:
results = dt_model.transform(test)
results.show(5)

+-------+-----+----+----+-----+----+----+---+----+----+--------------------+--------------------+--------------------+----------+
|     RI|   Na|  Mg|  Al|   Si|   K|  Ca| Ba|  Fe|Type|            features|       rawPrediction|         probability|prediction|
+-------+-----+----+----+-----+----+----+---+----+----+--------------------+--------------------+--------------------+----------+
|1.51316|13.02| 0.0|3.04|70.48|6.21|6.96|0.0| 0.0|   5|[1.51316,13.02,0....|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|1.51514|14.01|2.68| 3.5|69.89|1.68|5.87|2.2| 0.0|   5|[1.51514,14.01,2....|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|1.51569|13.24|3.49|1.47|73.25|0.38|8.03|0.0| 0.0|   2|[1.51569,13.24,3....|[0.0,0.0,7.0,0.0,...|[0.0,0.0,1.0,0.0,...|       2.0|
|1.51574|14.86|3.67|1.74|71.87|0.16|7.36|0.0|0.12|   2|[1.51574,14.86,3....|[0.0,0.0,7.0,0.0,...|[0.0,0.0,1.0,0.0,...|       2.0|
| 1.5159|13.02|3.58|1.51|73.12|0.69|7.96|0.0| 0.0|   2|[1.5159,13.02,3.5...|[0.0,0.0,7.0,0

In [None]:
results.select('Type','prediction').show()

+----+----------+
|Type|prediction|
+----+----------+
|   5|       5.0|
|   5|       7.0|
|   2|       2.0|
|   2|       2.0|
|   2|       2.0|
|   1|       2.0|
|   2|       3.0|
|   3|       3.0|
|   7|       7.0|
|   7|       7.0|
|   2|       2.0|
|   2|       2.0|
|   7|       7.0|
|   2|       2.0|
|   7|       7.0|
|   2|       3.0|
|   5|       5.0|
|   7|       7.0|
|   2|       2.0|
|   2|       3.0|
+----+----------+
only showing top 20 rows



In [None]:
auc = eval.evaluate(results)
auc

0.9473292424007043

Random Forest Classifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier


In [None]:
rf = RandomForestClassifier(featuresCol='features',labelCol='Type',numTrees=20,maxDepth=10,impurity='entropy')

In [None]:
pipeline = Pipeline(stages = [assembler, rf ])

In [None]:
rf_model = pipeline.fit(train)

In [None]:
results = rf_model.transform(test)
results.show(5)

+-------+-----+----+----+-----+----+----+---+----+----+--------------------+--------------------+--------------------+----------+
|     RI|   Na|  Mg|  Al|   Si|   K|  Ca| Ba|  Fe|Type|            features|       rawPrediction|         probability|prediction|
+-------+-----+----+----+-----+----+----+---+----+----+--------------------+--------------------+--------------------+----------+
|1.51316|13.02| 0.0|3.04|70.48|6.21|6.96|0.0| 0.0|   5|[1.51316,13.02,0....|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|1.51514|14.01|2.68| 3.5|69.89|1.68|5.87|2.2| 0.0|   5|[1.51514,14.01,2....|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       5.0|
|1.51569|13.24|3.49|1.47|73.25|0.38|8.03|0.0| 0.0|   2|[1.51569,13.24,3....|[0.0,6.0,14.0,0.0...|[0.0,0.3,0.7,0.0,...|       2.0|
|1.51574|14.86|3.67|1.74|71.87|0.16|7.36|0.0|0.12|   2|[1.51574,14.86,3....|[0.0,9.0,9.0,2.0,...|[0.0,0.45,0.45,0....|       1.0|
| 1.5159|13.02|3.58|1.51|73.12|0.69|7.96|0.0| 0.0|   2|[1.5159,13.02,3.5...|[0.0,1.0,19.0,

In [None]:
auc = eval.evaluate(results)
auc

0.961682869594368

In [None]:
results.select('Type','prediction').show()

+----+----------+
|Type|prediction|
+----+----------+
|   5|       5.0|
|   5|       5.0|
|   2|       2.0|
|   2|       1.0|
|   2|       2.0|
|   1|       2.0|
|   2|       2.0|
|   3|       3.0|
|   7|       7.0|
|   7|       7.0|
|   2|       2.0|
|   2|       2.0|
|   7|       7.0|
|   2|       2.0|
|   7|       7.0|
|   2|       2.0|
|   5|       5.0|
|   7|       7.0|
|   2|       2.0|
|   2|       2.0|
+----+----------+
only showing top 20 rows



Naive Bayes Classifier

In [None]:
from pyspark.ml.classification import NaiveBayes


In [None]:
nb = NaiveBayes(labelCol="Type", featuresCol="features")


In [None]:
pipeline = Pipeline(stages= [assembler,nb])

In [None]:
nb_model = pipeline.fit(train)

In [None]:
results = nb_model.transform(test)
results.show(5)

+-------+-----+----+----+-----+----+----+---+----+----+--------------------+--------------------+--------------------+----------+
|     RI|   Na|  Mg|  Al|   Si|   K|  Ca| Ba|  Fe|Type|            features|       rawPrediction|         probability|prediction|
+-------+-----+----+----+-----+----+----+---+----+----+--------------------+--------------------+--------------------+----------+
|1.51316|13.02| 0.0|3.04|70.48|6.21|6.96|0.0| 0.0|   5|[1.51316,13.02,0....|[-121.92487880846...|[7.99373595760919...|       3.0|
|1.51514|14.01|2.68| 3.5|69.89|1.68|5.87|2.2| 0.0|   5|[1.51514,14.01,2....|[-126.62332559654...|[8.67272618644165...|       5.0|
|1.51569|13.24|3.49|1.47|73.25|0.38|8.03|0.0| 0.0|   2|[1.51569,13.24,3....|[-99.579979388821...|[0.32654271549650...|       0.0|
|1.51574|14.86|3.67|1.74|71.87|0.16|7.36|0.0|0.12|   2|[1.51574,14.86,3....|[-102.27014162891...|[0.30890197694621...|       2.0|
| 1.5159|13.02|3.58|1.51|73.12|0.69|7.96|0.0| 0.0|   2|[1.5159,13.02,3.5...|[-101.05032992

In [None]:
auc=eval.evaluate(results)
auc

0.058689717925386714