<h2> Creating a Spark session </h2>

In [1]:
!scala -version

Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL


<p> we need to check the scala version so that you can include the correct version of the spark-bigquery-connector jar. </p>

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('PySpark for Steel defults classification') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

In [3]:
spark

<h4> Enabling Enable repl.eagerEval </h4>

In [4]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

<h2> Import Libraries </h2>

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

This will output the results of DataFrames in each step without the new need to show df.show() and also improves the formatting of the output.

<h2> Data import </h2>

In [40]:
df=spark.read.csv("gs://pyspark-project-bucket/steel_faults.csv", header=True, inferSchema=True)

In [41]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- X_Minimum: integer (nullable = true)
 |-- X_Maximum: integer (nullable = true)
 |-- Y_Minimum: integer (nullable = true)
 |-- Y_Maximum: integer (nullable = true)
 |-- Pixels_Areas: integer (nullable = true)
 |-- X_Perimeter: integer (nullable = true)
 |-- Y_Perimeter: integer (nullable = true)
 |-- Sum_of_Luminosity: integer (nullable = true)
 |-- Minimum_of_Luminosity: integer (nullable = true)
 |-- Maximum_of_Luminosity: integer (nullable = true)
 |-- Length_of_Conveyer: integer (nullable = true)
 |-- TypeOfSteel_A300: integer (nullable = true)
 |-- TypeOfSteel_A400: integer (nullable = true)
 |-- Steel_Plate_Thickness: integer (nullable = true)
 |-- Edges_Index: double (nullable = true)
 |-- Empty_Index: double (nullable = true)
 |-- Square_Index: double (nullable = true)
 |-- Outside_X_Index: double (nullable = true)
 |-- Edges_X_Index: double (nullable = true)
 |-- Edges_Y_Index: double (nullable = true)
 |-- Outside_Global_Index: dou

Our faults labels are like following ('Pastry', 'Z_Scratch','K_Scatch','Stains','Dirtiness','Bumps','Other_Faults')

In [51]:
 df = df.drop("_c0")

In [53]:
df

X_Minimum,X_Maximum,Y_Minimum,Y_Maximum,Pixels_Areas,X_Perimeter,Y_Perimeter,Sum_of_Luminosity,Minimum_of_Luminosity,Maximum_of_Luminosity,Length_of_Conveyer,TypeOfSteel_A300,TypeOfSteel_A400,Steel_Plate_Thickness,Edges_Index,Empty_Index,Square_Index,Outside_X_Index,Edges_X_Index,Edges_Y_Index,Outside_Global_Index,LogOfAreas,Log_X_Index,Log_Y_Index,Orientation_Index,Luminosity_Index,SigmoidOfAreas,Target
42,50,270900,270944,267,17,44,24220,76,108,1687,1,0,80,0.0498,0.2415,0.1818,0.0047,0.4706,1.0,1.0,2.4265,0.9031,1.6435,0.8182,-0.2913,0.5822,Pastry
645,651,2538079,2538108,108,10,30,11397,84,123,1687,1,0,80,0.7647,0.3793,0.2069,0.0036,0.6,0.9667,1.0,2.0334,0.7782,1.4624,0.7931,-0.1756,0.2984,Pastry
829,835,1553913,1553931,71,8,19,7972,99,125,1623,1,0,100,0.971,0.3426,0.3333,0.0037,0.75,0.9474,1.0,1.8513,0.7782,1.2553,0.6667,-0.1228,0.215,Pastry
853,860,369370,369415,176,13,45,18996,99,126,1353,0,1,290,0.7287,0.4413,0.1556,0.0052,0.5385,1.0,1.0,2.2455,0.8451,1.6532,0.8444,-0.1568,0.5212,Pastry
1289,1306,498078,498335,2409,60,260,246930,37,126,1353,0,1,185,0.0695,0.4486,0.0662,0.0126,0.2833,0.9885,1.0,3.3818,1.2305,2.4099,0.9338,-0.1992,1.0,Pastry
430,441,100250,100337,630,20,87,62357,64,127,1387,0,1,40,0.62,0.3417,0.1264,0.0079,0.55,1.0,1.0,2.7993,1.0414,1.9395,0.8736,-0.2267,0.9874,Pastry
413,446,138468,138883,9052,230,432,1481991,23,199,1687,0,1,150,0.4896,0.3389999999999999,0.0795,0.0196,0.1435,0.9607,1.0,3.9567,1.5185,2.6181,0.9205,0.2791,1.0,Pastry
190,200,210936,210956,132,11,20,20007,124,172,1687,0,1,150,0.2253,0.34,0.5,0.0059,0.9091,1.0,1.0,2.1206,1.0,1.301,0.5,0.1841,0.3359,Pastry
330,343,429227,429253,264,15,26,29748,53,148,1687,0,1,150,0.3912,0.2189,0.5,0.0077,0.8667,1.0,1.0,2.4216,1.1139,1.415,0.5,-0.1197,0.5593,Pastry
74,90,779144,779308,1506,46,167,180215,53,143,1687,0,1,150,0.0877,0.4261,0.0976,0.0095,0.3478,0.982,1.0,3.1778,1.2041,2.2148,0.9024,-0.0651,1.0,Pastry


In [85]:
from pyspark.ml.feature import VectorAssembler

# Pre-process the data
assembler = VectorAssembler(inputCols=['X_Minimum','X_Maximum','Y_Minimum','Y_Maximum','Pixels_Areas','X_Perimeter',
                                  'Y_Perimeter','Sum_of_Luminosity','Minimum_of_Luminosity','Maximum_of_Luminosity',
                                  'Length_of_Conveyer','TypeOfSteel_A300','TypeOfSteel_A400','Steel_Plate_Thickness',
                                  'Edges_Index','Empty_Index','Square_Index','Outside_X_Index','Edges_X_Index',
                                  'Edges_Y_Index','Outside_Global_Index','LogOfAreas','Log_X_Index','Log_Y_Index',
                                  'Orientation_Index','Luminosity_Index','SigmoidOfAreas'], 
                            outputCol="raw_features")
vector_df = assembler.transform(df)


In [69]:
from pyspark.ml.feature import StandardScaler

# Scale features to have zero mean and unit standard deviation
standarizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='raw_features',
                              outputCol='features')
model = standarizer.fit(vector_df)
vector_df = model.transform(vector_df)

In [70]:
vector_df

X_Minimum,X_Maximum,Y_Minimum,Y_Maximum,Pixels_Areas,X_Perimeter,Y_Perimeter,Sum_of_Luminosity,Minimum_of_Luminosity,Maximum_of_Luminosity,Length_of_Conveyer,TypeOfSteel_A300,TypeOfSteel_A400,Steel_Plate_Thickness,Edges_Index,Empty_Index,Square_Index,Outside_X_Index,Edges_X_Index,Edges_Y_Index,Outside_Global_Index,LogOfAreas,Log_X_Index,Log_Y_Index,Orientation_Index,Luminosity_Index,SigmoidOfAreas,Target,raw_features,features
42,50,270900,270944,267,17,44,24220,76,108,1687,1,0,80,0.0498,0.2415,0.1818,0.0047,0.4706,1.0,1.0,2.4265,0.9031,1.6435,0.8182,-0.2913,0.5822,Pastry,"[42.0,50.0,270900...",[-1.0162194973073...
645,651,2538079,2538108,108,10,30,11397,84,123,1687,1,0,80,0.7647,0.3793,0.2069,0.0036,0.6,0.9667,1.0,2.0334,0.7782,1.4624,0.7931,-0.1756,0.2984,Pastry,"[645.0,651.0,2538...",[0.14185771262920...
829,835,1553913,1553931,71,8,19,7972,99,125,1623,1,0,100,0.971,0.3426,0.3333,0.0037,0.75,0.9474,1.0,1.8513,0.7782,1.2553,0.6667,-0.1228,0.215,Pastry,"[829.0,835.0,1553...",[0.49523450637436...
853,860,369370,369415,176,13,45,18996,99,126,1353,0,1,290,0.7287,0.4413,0.1556,0.0052,0.5385,1.0,1.0,2.2455,0.8451,1.6532,0.8444,-0.1568,0.5212,Pastry,"[853.0,860.0,3693...",[0.54132713164547...
1289,1306,498078,498335,2409,60,260,246930,37,126,1353,0,1,185,0.0695,0.4486,0.0662,0.0126,0.2833,0.9885,1.0,3.3818,1.2305,2.4099,0.9338,-0.1992,1.0,Pastry,"[1289.0,1306.0,49...",[1.37867649073726...
430,441,100250,100337,630,20,87,62357,64,127,1387,0,1,40,0.62,0.3417,0.1264,0.0079,0.55,1.0,1.0,2.7993,1.0414,1.9395,0.8736,-0.2267,0.9874,Pastry,"[430.0,441.0,1002...",[-0.2710553887578...
413,446,138468,138883,9052,230,432,1481991,23,199,1687,0,1,150,0.4896,0.3389999999999999,0.0795,0.0196,0.1435,0.9607,1.0,3.9567,1.5185,2.6181,0.9205,0.2791,1.0,Pastry,"[413.0,446.0,1384...",[-0.3037043316581...
190,200,210936,210956,132,11,20,20007,124,172,1687,0,1,150,0.2253,0.34,0.5,0.0059,0.9091,1.0,1.0,2.1206,1.0,1.301,0.5,0.1841,0.3359,Pastry,"[190.0,200.0,2109...",[-0.7319816414688...
330,343,429227,429253,264,15,26,29748,53,148,1687,0,1,150,0.3912,0.2189,0.5,0.0077,0.8667,1.0,1.0,2.4216,1.1139,1.415,0.5,-0.1197,0.5593,Pastry,"[330.0,343.0,4292...",[-0.4631079940540...
74,90,779144,779308,1506,46,167,180215,53,143,1687,0,1,150,0.0877,0.4261,0.0976,0.0095,0.3478,0.982,1.0,3.1778,1.2041,2.2148,0.9024,-0.0651,1.0,Pastry,"[74.0,90.0,779144...",[-0.9547626636125...


In [72]:
from pyspark.ml.feature import StringIndexer

# Convert categorical label to number
indexer = StringIndexer(inputCol="Target", outputCol="label")
indexed = indexer.fit(vector_df).transform(vector_df)

In [73]:
indexed

X_Minimum,X_Maximum,Y_Minimum,Y_Maximum,Pixels_Areas,X_Perimeter,Y_Perimeter,Sum_of_Luminosity,Minimum_of_Luminosity,Maximum_of_Luminosity,Length_of_Conveyer,TypeOfSteel_A300,TypeOfSteel_A400,Steel_Plate_Thickness,Edges_Index,Empty_Index,Square_Index,Outside_X_Index,Edges_X_Index,Edges_Y_Index,Outside_Global_Index,LogOfAreas,Log_X_Index,Log_Y_Index,Orientation_Index,Luminosity_Index,SigmoidOfAreas,Target,raw_features,features,label
42,50,270900,270944,267,17,44,24220,76,108,1687,1,0,80,0.0498,0.2415,0.1818,0.0047,0.4706,1.0,1.0,2.4265,0.9031,1.6435,0.8182,-0.2913,0.5822,Pastry,"[42.0,50.0,270900...",[-1.0162194973073...,4.0
645,651,2538079,2538108,108,10,30,11397,84,123,1687,1,0,80,0.7647,0.3793,0.2069,0.0036,0.6,0.9667,1.0,2.0334,0.7782,1.4624,0.7931,-0.1756,0.2984,Pastry,"[645.0,651.0,2538...",[0.14185771262920...,4.0
829,835,1553913,1553931,71,8,19,7972,99,125,1623,1,0,100,0.971,0.3426,0.3333,0.0037,0.75,0.9474,1.0,1.8513,0.7782,1.2553,0.6667,-0.1228,0.215,Pastry,"[829.0,835.0,1553...",[0.49523450637436...,4.0
853,860,369370,369415,176,13,45,18996,99,126,1353,0,1,290,0.7287,0.4413,0.1556,0.0052,0.5385,1.0,1.0,2.2455,0.8451,1.6532,0.8444,-0.1568,0.5212,Pastry,"[853.0,860.0,3693...",[0.54132713164547...,4.0
1289,1306,498078,498335,2409,60,260,246930,37,126,1353,0,1,185,0.0695,0.4486,0.0662,0.0126,0.2833,0.9885,1.0,3.3818,1.2305,2.4099,0.9338,-0.1992,1.0,Pastry,"[1289.0,1306.0,49...",[1.37867649073726...,4.0
430,441,100250,100337,630,20,87,62357,64,127,1387,0,1,40,0.62,0.3417,0.1264,0.0079,0.55,1.0,1.0,2.7993,1.0414,1.9395,0.8736,-0.2267,0.9874,Pastry,"[430.0,441.0,1002...",[-0.2710553887578...,4.0
413,446,138468,138883,9052,230,432,1481991,23,199,1687,0,1,150,0.4896,0.3389999999999999,0.0795,0.0196,0.1435,0.9607,1.0,3.9567,1.5185,2.6181,0.9205,0.2791,1.0,Pastry,"[413.0,446.0,1384...",[-0.3037043316581...,4.0
190,200,210936,210956,132,11,20,20007,124,172,1687,0,1,150,0.2253,0.34,0.5,0.0059,0.9091,1.0,1.0,2.1206,1.0,1.301,0.5,0.1841,0.3359,Pastry,"[190.0,200.0,2109...",[-0.7319816414688...,4.0
330,343,429227,429253,264,15,26,29748,53,148,1687,0,1,150,0.3912,0.2189,0.5,0.0077,0.8667,1.0,1.0,2.4216,1.1139,1.415,0.5,-0.1197,0.5593,Pastry,"[330.0,343.0,4292...",[-0.4631079940540...,4.0
74,90,779144,779308,1506,46,167,180215,53,143,1687,0,1,150,0.0877,0.4261,0.0976,0.0095,0.3478,0.982,1.0,3.1778,1.2041,2.2148,0.9024,-0.0651,1.0,Pastry,"[74.0,90.0,779144...",[-0.9547626636125...,4.0


In [86]:
# Select features and labels dataset to inject to the model
data = indexed.select(['features', 'label'])

# train test split 
train, test = data.randomSplit([0.7, 0.3])

In [87]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 1334 records
Test set length: 607 records


In [99]:
#Random forst model 
from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=1000)

In [100]:
model= rf.fit(train)

In [101]:
# Make predictions.
predictions = model.transform(test)

In [102]:
predictions

features,label,rawPrediction,probability,prediction
[-1.0968815915318...,0.0,[463.808046496273...,[0.46380804649627...,0.0
[-1.0968815915318...,3.0,[177.034323958254...,[0.17703432395825...,3.0
[-1.0968815915318...,4.0,[388.678452585006...,[0.38867845258500...,0.0
[-1.0968815915318...,0.0,[603.796972396958...,[0.60379697239695...,0.0
[-1.0968815915318...,3.0,[219.996703970035...,[0.21999670397003...,3.0
[-1.0968815915318...,0.0,[485.061564977555...,[0.48506156497755...,0.0
[-1.0968815915318...,3.0,[196.534147738326...,[0.19653414773832...,3.0
[-1.0968815915318...,0.0,[511.176024358816...,[0.51117602435881...,0.0
[-1.0968815915318...,0.0,[470.709415005937...,[0.47070941500593...,0.0
[-1.0968815915318...,0.0,[492.013380590066...,[0.49201338059006...,0.0


In [103]:
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|[-1.0968815915318...|
|       3.0|  3.0|[-1.0968815915318...|
|       0.0|  4.0|[-1.0968815915318...|
|       0.0|  0.0|[-1.0968815915318...|
|       3.0|  3.0|[-1.0968815915318...|
+----------+-----+--------------------+
only showing top 5 rows



In [104]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

print("Accuracy = %g" % accuracy)

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'
Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'StandardScaler' object has no attribute '_java_obj'


Test Error = 0.294893
Accuracy = 0.705107
