In [3]:
! pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [4]:
import os

import findspark
findspark.init()

from pyspark.sql import SparkSession
import pyspark.sql.functions as s_f

from pathlib import Path

In [5]:
spark = SparkSession.builder.appName('sparkDataManagement').getOrCreate()

In [6]:
DATA_DIR = Path("../data/faults")

feature_data_file = os.path.join(DATA_DIR, "faults_features.csv")
label_data_file = os.path.join(DATA_DIR, "faults_labels.csv")

In [7]:
feature_names_file = os.path.join(DATA_DIR, "names_features.txt")
label_names_file = os.path.join(DATA_DIR, "names_labels.txt")

In [8]:
feature_df = spark.read.csv(feature_data_file, inferSchema=True)
label_df = spark.read.csv(label_data_file, inferSchema=True)

feature_names = spark.read.text(feature_names_file)
label_names = spark.read.text(label_names_file)

In [9]:
# Acquire column names from files
# Renaming whole DataFrames is annoying because they are immutable

feature_names_list = [x[0] for x in feature_names.collect()]
feature_df = feature_df.toDF(*feature_names_list)  # Note the '*' here to unroll the list as arguments

label_names_list = [x[0] for x in label_names.collect()]
label_df = label_df.toDF(*label_names_list)  # Note the '*' here to unroll the list as arguments

In [11]:
# Join on ID
faults_df = feature_df.join(label_df, on="ID", how="left")

In [13]:
### Basic DataFrame manipulations
# Check shape
print(faults_df.count())
len(faults_df.columns)


1941


35

In [9]:
# Print schema
faults_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- X_Minimum: integer (nullable = true)
 |-- X_Maximum: integer (nullable = true)
 |-- Y_Minimum: integer (nullable = true)
 |-- Y_Maximum: integer (nullable = true)
 |-- Pixels_Areas: integer (nullable = true)
 |-- X_Perimeter: integer (nullable = true)
 |-- Y_Perimeter: integer (nullable = true)
 |-- Sum_of_Luminosity: integer (nullable = true)
 |-- Minimum_of_Luminosity: integer (nullable = true)
 |-- Maximum_of_Luminosity: integer (nullable = true)
 |-- Length_of_Conveyer: integer (nullable = true)
 |-- TypeOfSteel_A300: integer (nullable = true)
 |-- TypeOfSteel_A400: integer (nullable = true)
 |-- Steel_Plate_Thickness: integer (nullable = true)
 |-- Edges_Index: double (nullable = true)
 |-- Empty_Index: double (nullable = true)
 |-- Square_Index: double (nullable = true)
 |-- Outside_X_Index: double (nullable = true)
 |-- Edges_X_Index: double (nullable = true)
 |-- Edges_Y_Index: double (nullable = true)
 |-- Outside_Global_Index: doubl

In [14]:
# Display first 4 rows
faults_df.show(4)

+----+---------+---------+---------+---------+------------+-----------+-----------+-----------------+---------------------+---------------------+------------------+----------------+----------------+---------------------+-----------+-----------+------------+---------------+-------------+-------------+--------------------+----------+-----------+-----------+-----------------+----------------+--------------+------+---------+--------+------+---------+-----+------------+
|  ID|X_Minimum|X_Maximum|Y_Minimum|Y_Maximum|Pixels_Areas|X_Perimeter|Y_Perimeter|Sum_of_Luminosity|Minimum_of_Luminosity|Maximum_of_Luminosity|Length_of_Conveyer|TypeOfSteel_A300|TypeOfSteel_A400|Steel_Plate_Thickness|Edges_Index|Empty_Index|Square_Index|Outside_X_Index|Edges_X_Index|Edges_Y_Index|Outside_Global_Index|LogOfAreas|Log_X_Index|Log_Y_Index|Orientation_Index|Luminosity_Index|SigmoidOfAreas|Pastry|Z_Scratch|K_Scatch|Stains|Dirtiness|Bumps|Other_Faults|
+----+---------+---------+---------+---------+------------+-

In [11]:
# Get numerical summary
faults_df.describe().show()

+-------+-----+-----------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+---------------------+---------------------+------------------+-------------------+-------------------+---------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+
|summary|   ID|        X_Minimum|        X_Maximum|         Y_Minimum|        Y_Maximum|      Pixels_Areas|       X_Perimeter|       Y_Perimeter| Sum_of_Luminosity|Minimum_of_Luminosity|Maximum_of_Luminosity|Length_of_Conveyer|   TypeOfSteel_A300|   TypeOfSteel_A400|Steel_Plate_Thickness|        Edges_Index|  

In [16]:
# Select X_minimum only
faults_df.select("X_Minimum").describe().show()

+-------+-----------------+
|summary|        X_Minimum|
+-------+-----------------+
|  count|             1941|
|   mean|571.1360123647604|
| stddev| 520.690671421655|
|    min|                0|
|    max|             1705|
+-------+-----------------+



In [17]:
# Rename first column
faults_df = faults_df.withColumnRenamed('X_Minimum', 'x_min')

In [18]:
## Sort Data
faults_df.sort(faults_df["x_min"]).show(5)
faults_df.sort(faults_df["x_min"].desc()).show(5)

+-----+-----+---------+---------+---------+------------+-----------+-----------+-----------------+---------------------+---------------------+------------------+----------------+----------------+---------------------+-----------+-----------+------------+---------------+-------------+-------------+--------------------+----------+-----------+-----------+-----------------+----------------+--------------+------+---------+--------+------+---------+-----+------------+
|   ID|x_min|X_Maximum|Y_Minimum|Y_Maximum|Pixels_Areas|X_Perimeter|Y_Perimeter|Sum_of_Luminosity|Minimum_of_Luminosity|Maximum_of_Luminosity|Length_of_Conveyer|TypeOfSteel_A300|TypeOfSteel_A400|Steel_Plate_Thickness|Edges_Index|Empty_Index|Square_Index|Outside_X_Index|Edges_X_Index|Edges_Y_Index|Outside_Global_Index|LogOfAreas|Log_X_Index|Log_Y_Index|Orientation_Index|Luminosity_Index|SigmoidOfAreas|Pastry|Z_Scratch|K_Scatch|Stains|Dirtiness|Bumps|Other_Faults|
+-----+-----+---------+---------+---------+------------+----------

In [19]:
## Subset data
faults_df.filter(faults_df["x_min"] > 1000).show(3)
print(faults_df.filter(faults_df["x_min"] > 1000).count())
faults_df.filter(faults_df["x_min"] > 1000).describe("x_min").show()

+-----+-----+---------+---------+---------+------------+-----------+-----------+-----------------+---------------------+---------------------+------------------+----------------+----------------+---------------------+-----------+-----------+------------+---------------+-------------+-------------+--------------------+----------+-----------+-----------+-----------------+----------------+--------------+------+---------+--------+------+---------+-----+------------+
|   ID|x_min|X_Maximum|Y_Minimum|Y_Maximum|Pixels_Areas|X_Perimeter|Y_Perimeter|Sum_of_Luminosity|Minimum_of_Luminosity|Maximum_of_Luminosity|Length_of_Conveyer|TypeOfSteel_A300|TypeOfSteel_A400|Steel_Plate_Thickness|Edges_Index|Empty_Index|Square_Index|Outside_X_Index|Edges_X_Index|Edges_Y_Index|Outside_Global_Index|LogOfAreas|Log_X_Index|Log_Y_Index|Orientation_Index|Luminosity_Index|SigmoidOfAreas|Pastry|Z_Scratch|K_Scatch|Stains|Dirtiness|Bumps|Other_Faults|
+-----+-----+---------+---------+---------+------------+----------

In [20]:
faults_df.select('*').show(3) # Select All Columns

+----+-----+---------+---------+---------+------------+-----------+-----------+-----------------+---------------------+---------------------+------------------+----------------+----------------+---------------------+-----------+-----------+------------+---------------+-------------+-------------+--------------------+----------+-----------+-----------+-----------------+----------------+--------------+------+---------+--------+------+---------+-----+------------+
|  ID|x_min|X_Maximum|Y_Minimum|Y_Maximum|Pixels_Areas|X_Perimeter|Y_Perimeter|Sum_of_Luminosity|Minimum_of_Luminosity|Maximum_of_Luminosity|Length_of_Conveyer|TypeOfSteel_A300|TypeOfSteel_A400|Steel_Plate_Thickness|Edges_Index|Empty_Index|Square_Index|Outside_X_Index|Edges_X_Index|Edges_Y_Index|Outside_Global_Index|LogOfAreas|Log_X_Index|Log_Y_Index|Orientation_Index|Luminosity_Index|SigmoidOfAreas|Pastry|Z_Scratch|K_Scatch|Stains|Dirtiness|Bumps|Other_Faults|
+----+-----+---------+---------+---------+------------+-----------+-

In [17]:
### Data cleaning
## Fix "Other_Faults": the 'unlikely' level should be 0
faults_df.select('Other_Faults').distinct().show()
faults_df.groupBy('Other_Faults').count().show()

+------------+
|Other_Faults|
+------------+
|           0|
|    Unlikely|
|           1|
+------------+

+------------+-----+
|Other_Faults|count|
+------------+-----+
|           0| 1253|
|    Unlikely|   15|
|           1|  673|
+------------+-----+



In [18]:
faults_df = faults_df.withColumn('Other_Faults',
                     s_f.when(faults_df['Other_Faults'] != 'Unlikely', faults_df['Other_Faults']).otherwise(0))

faults_df.select('Other_Faults').distinct().show()

+------------+
|Other_Faults|
+------------+
|           0|
|           1|
+------------+



In [19]:
## Dealing with missing data
faults_df.count()
faults_df.dropna().count()


1917

In [20]:
# Turns out "Pastry" has some missing values:
faults_df.select("Pastry").count()
faults_df.select("Pastry").dropna().count()

1917

In [21]:
# These can be set to zero:
faults_df = faults_df.fillna(0)


In [22]:
faults_df.count()
faults_df.dropna().count()

1941

In [23]:
# Transform the 'Other_Faults' column to show the logical inverse and rename it
faults_df = faults_df.withColumn('named_faults', 1 - faults_df['Other_Faults'])

In [26]:
# save DF

faults_df.write.parquet(os.path.join(DATA_DIR, 'cleaned.parquet'))