# Energy Consumption Analysis in the Steel Industry

-------------

### Introduction

The dataset "Steel Industry Energy Consumption" comprises 35,040 observations with 11 variables. These variables include data related to different aspects of energy consumption in the steel industry, such as timestamp, energy consumed, type of energy, operating status, type of product being produced, etc.

The objective is to analyze this dataset to uncover patterns and relationships that can help in improving energy efficiency. To achieve this, we'll use Apache Spark's PySpark module, which allows us to handle large datasets and perform complex data analysis tasks efficiently.

------------

### Dataset
The dataset chosen for this project is the "Steel Industry Energy Consumption" dataset available on Kaggle. This dataset comprises 35,040 observations with 11 variables related to different aspects of energy consumption in the steel industry. The link to the dataset is https://www.kaggle.com/datasets/csafrit2/steel-industry-energy-consumption.

----------

### Setting Up PySpark

In [56]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date


spark = SparkSession.builder \
    .appName("Energy Consumption Analysis in the Steel Industry") \
    .getOrCreate()

-----------
### Data Loading



In [57]:
df = spark.read.csv("archive (5)\Steel_industry_data.csv", header=True, inferSchema=True)
df = df.withColumn('date', to_date(df['date'], 'd-M-yy H.m').cast(DateType()))

------------
### Data Exploration



In [58]:
df.show(5)

+----------+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+----+----------+-----------+----------+
|      date|Usage_kWh|Lagging_Current_Reactive_Power_kVarh|Leading_Current_Reactive_Power_kVarh|CO2(tCO2)|Lagging_Current_Power_Factor|Leading_Current_Power_Factor| NSM|WeekStatus|Day_of_week| Load_Type|
+----------+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+----+----------+-----------+----------+
|2018-01-01|     3.17|                                2.95|                                 0.0|      0.0|                       73.21|                       100.0| 900|   Weekday|     Monday|Light_Load|
|2018-01-01|      4.0|                                4.46|                                 0.0|      0.0|                       66.77|                       100.0|1800|   Weekday|    

In [59]:
df.printSchema()

root
 |-- date: date (nullable = true)
 |-- Usage_kWh: double (nullable = true)
 |-- Lagging_Current_Reactive_Power_kVarh: double (nullable = true)
 |-- Leading_Current_Reactive_Power_kVarh: double (nullable = true)
 |-- CO2(tCO2): double (nullable = true)
 |-- Lagging_Current_Power_Factor: double (nullable = true)
 |-- Leading_Current_Power_Factor: double (nullable = true)
 |-- NSM: integer (nullable = true)
 |-- WeekStatus: string (nullable = true)
 |-- Day_of_week: string (nullable = true)
 |-- Load_Type: string (nullable = true)



In [60]:
df.describe().show()

+-------+------------------+------------------------------------+------------------------------------+--------------------+----------------------------+----------------------------+------------------+----------+-----------+-----------+
|summary|         Usage_kWh|Lagging_Current_Reactive_Power_kVarh|Leading_Current_Reactive_Power_kVarh|           CO2(tCO2)|Lagging_Current_Power_Factor|Leading_Current_Power_Factor|               NSM|WeekStatus|Day_of_week|  Load_Type|
+-------+------------------+------------------------------------+------------------------------------+--------------------+----------------------------+----------------------------+------------------+----------+-----------+-----------+
|  count|             35040|                               35040|                               35040|               35040|                       35040|                       35040|             35040|     35040|      35040|      35040|
|   mean|27.386892408677415|                  13.0353835

In [61]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns if c !="date"]).show()

+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+---+----------+-----------+---------+
|Usage_kWh|Lagging_Current_Reactive_Power_kVarh|Leading_Current_Reactive_Power_kVarh|CO2(tCO2)|Lagging_Current_Power_Factor|Leading_Current_Power_Factor|NSM|WeekStatus|Day_of_week|Load_Type|
+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+---+----------+-----------+---------+
|        0|                                   0|                                   0|        0|                           0|                           0|  0|         0|          0|        0|
+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+---+----------+-----------+---------+



In [62]:
df.groupBy('Load_Type').count().show()

+------------+-----+
|   Load_Type|count|
+------------+-----+
| Medium_Load| 9696|
|Maximum_Load| 7272|
|  Light_Load|18072|
+------------+-----+



---------------
### Data Pre-Procesing

In [63]:
train_data,test_data=df.randomSplit([0.7, 0.3], seed=42)

In [64]:
train_data.groupBy('Load_Type').count().show()

+------------+-----+
|   Load_Type|count|
+------------+-----+
| Medium_Load| 6805|
|Maximum_Load| 5128|
|  Light_Load|12673|
+------------+-----+



In [65]:
from pyspark.sql.functions import to_date, year, month, dayofmonth
from pyspark.sql.types import DateType

# Extract day, month, year
train_data = train_data.withColumn('day', dayofmonth(train_data['date']))
train_data = train_data.withColumn('month', month(train_data['date']))
train_data = train_data.withColumn('year', year(train_data['date']))

test_data = test_data.withColumn('day', dayofmonth(test_data['date']))
test_data = test_data.withColumn('month', month(test_data['date']))
test_data = test_data.withColumn('year', year(test_data['date']))

# Print the schema to see if it worked
train_data.printSchema()

train_data=train_data.drop('date')
test_data=test_data.drop('date')

root
 |-- date: date (nullable = true)
 |-- Usage_kWh: double (nullable = true)
 |-- Lagging_Current_Reactive_Power_kVarh: double (nullable = true)
 |-- Leading_Current_Reactive_Power_kVarh: double (nullable = true)
 |-- CO2(tCO2): double (nullable = true)
 |-- Lagging_Current_Power_Factor: double (nullable = true)
 |-- Leading_Current_Power_Factor: double (nullable = true)
 |-- NSM: integer (nullable = true)
 |-- WeekStatus: string (nullable = true)
 |-- Day_of_week: string (nullable = true)
 |-- Load_Type: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [66]:
train_data.show(5)

+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+-----+----------+-----------+----------+---+-----+----+
|Usage_kWh|Lagging_Current_Reactive_Power_kVarh|Leading_Current_Reactive_Power_kVarh|CO2(tCO2)|Lagging_Current_Power_Factor|Leading_Current_Power_Factor|  NSM|WeekStatus|Day_of_week| Load_Type|day|month|year|
+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+-----+----------+-----------+----------+---+-----+----+
|     3.17|                                2.95|                                 0.0|      0.0|                       73.21|                       100.0|  900|   Weekday|     Monday|Light_Load|  1|    1|2018|
|      3.2|                                 3.6|                                 0.0|      0.0|                       66.44|                       100.0|27900|   We

In [67]:
train_data.printSchema()

root
 |-- Usage_kWh: double (nullable = true)
 |-- Lagging_Current_Reactive_Power_kVarh: double (nullable = true)
 |-- Leading_Current_Reactive_Power_kVarh: double (nullable = true)
 |-- CO2(tCO2): double (nullable = true)
 |-- Lagging_Current_Power_Factor: double (nullable = true)
 |-- Leading_Current_Power_Factor: double (nullable = true)
 |-- NSM: integer (nullable = true)
 |-- WeekStatus: string (nullable = true)
 |-- Day_of_week: string (nullable = true)
 |-- Load_Type: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [68]:
from pyspark.sql.functions import col

counts = train_data.groupBy('Load_Type').count().collect()
num_classes = len(counts)

# Calculate ratio for undersampling
ratio = counts[1]['count'] / counts[2]['count']

# Filter 'Light_Load' instances and sample
light_load_train = train_data.filter(col('Load_Type') == 'Light_Load').sample(False, ratio)

# Filter for the other classes
other_train = train_data.filter(col('Load_Type') != 'Light_Load')

# Union the DataFrames
train_balanced = light_load_train.union(other_train)

In [69]:
train_balanced.groupBy('Load_Type').count().show()

+------------+-----+
|   Load_Type|count|
+------------+-----+
|  Light_Load| 5138|
| Medium_Load| 6805|
|Maximum_Load| 5128|
+------------+-----+



--------------
### Feature Engineering

In [70]:
train_balanced.show(5)

+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+-----+----------+-----------+----------+---+-----+----+
|Usage_kWh|Lagging_Current_Reactive_Power_kVarh|Leading_Current_Reactive_Power_kVarh|CO2(tCO2)|Lagging_Current_Power_Factor|Leading_Current_Power_Factor|  NSM|WeekStatus|Day_of_week| Load_Type|day|month|year|
+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+-----+----------+-----------+----------+---+-----+----+
|     3.28|                                3.64|                                 0.0|      0.0|                       66.94|                       100.0| 8100|   Weekday|     Monday|Light_Load|  1|    1|2018|
|     3.31|                                3.74|                                 0.0|      0.0|                       66.27|                       100.0|12600|   We

In [71]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline


# String Indexing for categorical columns
week_status_indexer = StringIndexer(inputCol="WeekStatus", outputCol="WeekStatusIndex")
day_of_week_indexer = StringIndexer(inputCol="Day_of_week", outputCol="DayOfWeekIndex")

# One-hot encoding for categorical columns
week_status_encoder = OneHotEncoder(inputCol="WeekStatusIndex", outputCol="WeekStatusVec")
day_of_week_encoder = OneHotEncoder(inputCol="DayOfWeekIndex", outputCol="DayOfWeekVec")

# String Indexing for target column (Load_Type)
load_type_indexer = StringIndexer(inputCol="Load_Type", outputCol="label")

# Assemble all the features with VectorAssembler
required_features = ['Usage_kWh',
                    'Lagging_Current_Reactive_Power_kVarh',
                    'Leading_Current_Reactive_Power_kVarh',
                    'CO2(tCO2)',
                    'Lagging_Current_Power_Factor',
                    'Leading_Current_Power_Factor',
                    'NSM',
                    'WeekStatusVec',
                    'DayOfWeekVec']

assembler = VectorAssembler(inputCols=required_features, outputCol='features')

# Create a Pipeline
pipeline = Pipeline(stages=[week_status_indexer, day_of_week_indexer, 
                            week_status_encoder, day_of_week_encoder,
                            load_type_indexer,
                            assembler])

# Fit and transform the data
pipeline_model = pipeline.fit(train_balanced)
train_balanced = pipeline_model.transform(train_balanced)
test_data = pipeline_model.transform(test_data)

In [72]:
train_balanced.show(5)

+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+-----+----------+-----------+----------+---+-----+----+---------------+--------------+-------------+-------------+-----+--------------------+
|Usage_kWh|Lagging_Current_Reactive_Power_kVarh|Leading_Current_Reactive_Power_kVarh|CO2(tCO2)|Lagging_Current_Power_Factor|Leading_Current_Power_Factor|  NSM|WeekStatus|Day_of_week| Load_Type|day|month|year|WeekStatusIndex|DayOfWeekIndex|WeekStatusVec| DayOfWeekVec|label|            features|
+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+-----+----------+-----------+----------+---+-----+----+---------------+--------------+-------------+-------------+-----+--------------------+
|     3.28|                                3.64|                                 0.0|      0.0|                    

-------------------------
### Training the model

In [73]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create Decision Tree model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# Fit the model
dt_model = dt.fit(train_balanced)

# Predict with the test dataset
dt_predictions = dt_model.transform(test_data)

--------------------------
### Evaluate the model

In [74]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create evaluators
evaluator1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator3 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluator4 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Apply the evaluators to the DataFrame with the predictions
accuracy = evaluator1.evaluate(dt_predictions)
weighted_precision = evaluator2.evaluate(dt_predictions)
weighted_recall = evaluator3.evaluate(dt_predictions)
f1 = evaluator4.evaluate(dt_predictions)

print("Test Accuracy: ", accuracy)
print("Test Weighted Precision: ", weighted_precision)
print("Test Weighted Recall: ", weighted_recall)
print("Test F1 Score: ", f1)

Test Accuracy:  0.820299022426682
Test Weighted Precision:  0.8412120914514901
Test Weighted Recall:  0.820299022426682
Test F1 Score:  0.8265635109982734


-------------------
### Inference
The Decision Tree model achieved an accuracy of approximately 80.90% on the test set, which suggests that it's correctly predicting the target variable for about four out of five observations in the unseen data. Also, the precision, recall, and F1 scores, all being above 80%, suggest a good balance between the model's ability to find relevant instances (precision), its ability to find all relevant instances (recall), and the harmonic mean of precision and recall (F1 score). This means the model has performed well not only in terms of overall accuracy but also in terms of distinguishing between different classes

------------------
### Conclusion
Considering the metrics above, we can conclude that the Decision Tree model has performed reasonably well for this specific problem. Decision Trees are known for their simplicity and interpretability, which makes it easier for us to understand the underlying decisions made by the model. They also handle a mix of numerical and categorical features well, which may be beneficial for our dataset.

----------------------

### Future Work

Hyperparameter Tuning: We could perform further hyperparameter tuning on the Decision Tree model to improve its performance. Grid search or random search could be used to systematically work through multiple combinations of hyperparameters.

Feature Engineering: We should experiment with feature engineering to create new features that could potentially improve the model's performance. This could include polynomial features, interactions, or domain-specific features.

Data Collection: Since the performance is still not satisfactory, we should consider collecting more data or improving the quality of the current data if possible.

-------------------------