In [1]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("testca") \
    .getOrCreate()

#og df schema
steeltrain_schema = StructType([StructField("id", IntegerType(), True),
                           StructField("X_minimum", IntegerType(), True),
                           StructField("X_maximum", IntegerType(), True),
                           StructField("Y_minimum", IntegerType(), True),
                           StructField("Y_maximum", IntegerType(), True),
                           StructField("Pixels_areas", IntegerType(), True),
                           StructField("X_perimeter", IntegerType(), True),
                           StructField("Y_perimeter", IntegerType(), True),
                           StructField("Sum_of_luminosity", IntegerType(), True),
                           StructField("Minimum_luminosity", IntegerType(), True),
                           StructField("Maximum_luminosity", IntegerType(), True),
                           StructField("Length_conveyer", IntegerType(), True),
                           StructField("TypeSteel_A300", IntegerType(), True),
                           StructField("TypeSteel_A400", IntegerType(), True),
                           StructField("SteelPlate_thickness", IntegerType(), True),
                           StructField("Edges_index", FloatType(), True),
                           StructField("Empty_index", FloatType(), True),
                           StructField("Square_index", FloatType(), True),
                           StructField("OutsideX_index", FloatType(), True),
                           StructField("EdgesX_index", FloatType(), True),
                           StructField("EdgesY_index", FloatType(), True),
                           StructField("OutsideGlobal_index", FloatType(), True),
                           StructField("LogOfAreas", FloatType(), True),
                           StructField("LogX_index", FloatType(), True),
                           StructField("LogY_index", FloatType(), True),
                           StructField("Orientation_index", FloatType(), True),
                           StructField("Luminosity_index", FloatType(), True),
                           StructField("SigmoidOfAreas", FloatType(), True),
                           StructField("Pastry", IntegerType(), True),
                           StructField("Z_scratch", IntegerType(), True),
                           StructField("K_scratch", IntegerType(), True),
                           StructField("Stains", IntegerType(), True),
                           StructField("Dirtiness", IntegerType(), True),
                           StructField("Bumps", IntegerType(), True),
                           StructField("Other_faults", IntegerType(), True),
                           ])
steeltest_schema = StructType([StructField("id", IntegerType(), True),
                           StructField("X_minimum", IntegerType(), True),
                           StructField("X_maximum", IntegerType(), True),
                           StructField("Y_minimum", IntegerType(), True),
                           StructField("Y_maximum", IntegerType(), True),
                           StructField("Pixels_areas", IntegerType(), True),
                           StructField("X_perimeter", IntegerType(), True),
                           StructField("Y_perimeter", IntegerType(), True),
                           StructField("Sum_of_luminosity", IntegerType(), True),
                           StructField("Minimum_luminosity", IntegerType(), True),
                           StructField("Maximum_luminosity", IntegerType(), True),
                           StructField("Length_conveyer", IntegerType(), True),
                           StructField("TypeSteel_A300", IntegerType(), True),
                           StructField("TypeSteel_A400", IntegerType(), True),
                           StructField("SteelPlate_thickness", IntegerType(), True),
                           StructField("Edges_index", FloatType(), True),
                           StructField("Empty_index", FloatType(), True),
                           StructField("Square_index", FloatType(), True),
                           StructField("OutsideX_index", FloatType(), True),
                           StructField("EdgesX_index", FloatType(), True),
                           StructField("EdgesY_index", FloatType(), True),
                           StructField("OutsideGlobal_index", FloatType(), True),
                           StructField("LogOfAreas", FloatType(), True),
                           StructField("LogX_index", FloatType(), True),
                           StructField("LogY_index", FloatType(), True),
                           StructField("Orientation_index", FloatType(), True),
                           StructField("Luminosity_index", FloatType(), True),
                           StructField("SigmoidOfAreas", FloatType(), True),])

#import dataset
steeltrain_path = "dataset/train.csv"
steeltest_path = "dataset/test.csv"
steel_train = spark.read.csv(steeltrain_path, header=True, schema=steeltrain_schema)
steel_test = spark.read.csv(steeltest_path, header=True, schema=steeltest_schema)

#extract col names
target_cols = steel_train.columns[-7:]
feature_cols = steel_train.columns[1:-7]
print(feature_cols,"\n", target_cols)

['X_minimum', 'X_maximum', 'Y_minimum', 'Y_maximum', 'Pixels_areas', 'X_perimeter', 'Y_perimeter', 'Sum_of_luminosity', 'Minimum_luminosity', 'Maximum_luminosity', 'Length_conveyer', 'TypeSteel_A300', 'TypeSteel_A400', 'SteelPlate_thickness', 'Edges_index', 'Empty_index', 'Square_index', 'OutsideX_index', 'EdgesX_index', 'EdgesY_index', 'OutsideGlobal_index', 'LogOfAreas', 'LogX_index', 'LogY_index', 'Orientation_index', 'Luminosity_index', 'SigmoidOfAreas'] 
 ['Pastry', 'Z_scratch', 'K_scratch', 'Stains', 'Dirtiness', 'Bumps', 'Other_faults']


In [2]:
##DROP ROWS WITH MULTIPLE TRUE LABELS

#find corresponding id
id_values = []
for row in steel_train.rdd.collect(): #count 1s in the row
    count_ones = 0
    for column in target_cols:  #iterate over each col in target_cols, check 1 value
        if row[column] == 1:
            count_ones += 1 #increment
            if count_ones > 1:  #append to id_values if count > 1
                id_values.append(row["id"])
                break

#check correct
print(len(id_values))

[634, 752, 3572, 4416, 4530, 4664, 6448, 7589, 8075, 8538, 9103, 9388, 10225, 10315, 13318, 13990, 15992, 16307, 17503, 17815, 17870]


In [4]:
#remove rows
steel_train1 = steel_train.filter(~steel_train.id.isin(id_values))
steel_train1.count()

19198

In [5]:
from pyspark.sql.functions import col

##DROP ROWS WITH NO FAULT LABEL

#function to find target column based on value
def find_target_column(row):
    columns_with_ones = []  #store columns with "1" values in current row
    for column in target_cols:
        if row[column] == 1:
            columns_with_ones.append(column)
    if columns_with_ones:   #for cols with "1" on same row
        return ', '.join(columns_with_ones)
    else:
        return 'No_fault'

#convert df to RDD of rows & apply function to each row, add result as new col
rdd = steel_train1.rdd.map(lambda row: Row(**row.asDict()))
rdd = rdd.map(lambda row: row + (find_target_column(row),))

#convert RDD back to df
steel_train2 = spark.createDataFrame(rdd, steel_train1.schema.add("target_col", "string"))

#check no_fault exists in new df
steel_train2.filter(col("target_col") == "No_fault").show(5)

In [8]:
#remove rows & check
steel_train3 = steel_train2.filter(~steel_train2.target_col.isin("No_fault"))
steel_train3.filter(col("target_col") == "No_fault").show(5)

In [11]:
##SPLIT CLEAN DATA & WRITE TO FILE

train_ratio = 0.8
test_ratio = 1 - train_ratio
seed = 42 
st_train, st_test = steel_train3.randomSplit([train_ratio, test_ratio], seed=seed)

#write to csv file new train & test data
st_train.write.option("header",True).csv("dataset/train_clean.csv")
st_test.write.option("header",True).csv("dataset/test_clean.csv")

In [13]:
spark.stop()