<a href="https://colab.research.google.com/github/SamanMansoor/Data-Science/blob/main/Big_Data_Steel_Industry.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=6bcf52a868d42ba0cd97aeca1eaa339bc3a44ab0a38ff966e38d8c9bcaee9be1
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
#Connection with spark context
#First of all, we need a connection to the cluster. This is done using creating an instance of the SparkContext class.

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Big data") \
                            .master("local[*]")\
                            .config("spark.some.config.option", "some-value") \
                            .getOrCreate()


In [5]:

#load the Dataset
Data = spark.read.csv('/content/Steel_industry.csv',header = True,inferSchema=True)

Data.limit(5).toPandas()


Unnamed: 0,Usage_kWh,Lagging_Current_Reactive.Power_kVarh,Leading_Current_Reactive_Power_kVarh,CO2(tCO2),Lagging_Current_Power_Factor,Leading_Current_Power_Factor,NSM,WeekStatus,Day_of_week,Load_Type
0,3.17,2.95,0.0,0.0,73.21,100.0,900,Weekday,Monday,Light_Load
1,4.0,4.46,0.0,0.0,66.77,100.0,1800,Weekday,Monday,Light_Load
2,3.24,3.28,0.0,0.0,70.28,100.0,2700,Weekday,Monday,Light_Load
3,3.31,3.56,0.0,0.0,68.09,100.0,3600,Weekday,Monday,Light_Load
4,3.82,4.5,0.0,0.0,64.72,100.0,4500,Weekday,Monday,Light_Load


# **Pyspark code to display Interquartile Range of the**
#**‘Usage_kWh’.**

In [11]:
column = "Usage_kWh"
sorted_column = Data.select(column).orderBy(column)
num_rows = sorted_column.count()
q1_index = int((num_rows + 1) * 0.25)
q3_index = int((num_rows + 1) * 0.75)

q1 = sorted_column.take(q1_index)[q1_index - 1][0]
q3 = sorted_column.take(q3_index)[q3_index - 1][0]

iqr = q3 - q1
print(f"The interquartile range of '{column}' is: {iqr}")


The interquartile range of 'Usage_kWh' is: 48.029999999999994


#**Mean of ‘Usage_kWh’ round to next integer and break it into 3 whole**
#**numbers such that they sum up to the original number and count number of ways.**

In [13]:
mean = Data.selectExpr("round(avg(Usage_kWh))").first()[0]
target_sum = int(mean * 3)

count = 0
for a in range(target_sum + 1):
    for b in range(target_sum + 1):
        c = target_sum - a - b
        if c >= 0:
            count += 1

print(f"The mean of 'Usage_kWh' is: {mean}")
print(f"The number of ways to break it into 3 whole numbers that sum up to {target_sum} is: {count}")

The mean of 'Usage_kWh' is: 27.0
The number of ways to break it into 3 whole numbers that sum up to 81 is: 3403


#**Spark SQL query to display the details of highest ‘Usage_kwh’ in each load type (top 3).**

In [14]:
Data.createOrReplaceTempView("load_data")

query = """
    SELECT load_type, Usage_kWh
    FROM (
        SELECT load_type, Usage_kWh,
            ROW_NUMBER() OVER (PARTITION BY load_type ORDER BY Usage_kWh DESC) as rank
        FROM load_data
    ) ranked
    WHERE rank <= 3
"""

result = spark.sql(query)
result.show()

+------------+---------+
|   load_type|Usage_kWh|
+------------+---------+
|            |   145.01|
|            |    140.8|
|            |   138.06|
|  Light_Load|   140.29|
|  Light_Load|   139.43|
|  Light_Load|   139.03|
|Maximum_Load|   151.67|
|Maximum_Load|   149.65|
|Maximum_Load|   146.88|
| Medium_Load|   157.18|
| Medium_Load|   153.14|
| Medium_Load|   151.31|
+------------+---------+



# **Apply any machine learning algorithm on data using Spark ML-lib**

In [16]:
import pandas as pd
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
import matplotlib
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')

In [17]:
Data.show()

+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+-----+----------+-----------+----------+
|Usage_kWh|Lagging_Current_Reactive.Power_kVarh|Leading_Current_Reactive_Power_kVarh|CO2(tCO2)|Lagging_Current_Power_Factor|Leading_Current_Power_Factor|  NSM|WeekStatus|Day_of_week| Load_Type|
+---------+------------------------------------+------------------------------------+---------+----------------------------+----------------------------+-----+----------+-----------+----------+
|     3.17|                                2.95|                                 0.0|      0.0|                       73.21|                       100.0|  900|   Weekday|     Monday|Light_Load|
|      4.0|                                4.46|                                 0.0|      0.0|                       66.77|                       100.0| 1800|   Weekday|     Monday|Light_Load|
|     3.24|                   

In [34]:

#We are converting required columns from string to float
from pyspark.sql.functions import col
selected_features = ['Usage_kWh','Leading_Current_Reactive_Power_kVarh','CO2(tCO2)','Lagging_Current_Power_Factor','Leading_Current_Power_Factor','NSM']
features_df = Data.select(*(col(c).cast("float").alias(c) for c in selected_features))
features_df.show()

+---------+------------------------------------+---------+----------------------------+----------------------------+-------+
|Usage_kWh|Leading_Current_Reactive_Power_kVarh|CO2(tCO2)|Lagging_Current_Power_Factor|Leading_Current_Power_Factor|    NSM|
+---------+------------------------------------+---------+----------------------------+----------------------------+-------+
|     3.17|                                 0.0|      0.0|                       73.21|                       100.0|  900.0|
|      4.0|                                 0.0|      0.0|                       66.77|                       100.0| 1800.0|
|     3.24|                                 0.0|      0.0|                       70.28|                       100.0| 2700.0|
|     3.31|                                 0.0|      0.0|                       68.09|                       100.0| 3600.0|
|     3.82|                                 0.0|      0.0|                       64.72|                       100.0| 4500.0|


In [37]:

df1=features_df.toPandas()

In [38]:
#Sum of all the NULL values
df1.isnull().sum()

Usage_kWh                               0
Leading_Current_Reactive_Power_kVarh    0
CO2(tCO2)                               0
Lagging_Current_Power_Factor            0
Leading_Current_Power_Factor            0
NSM                                     0
dtype: int64

In [39]:
#Remove Null Values
df1=df1.fillna('0')

In [40]:
df1.isnull().sum()

Usage_kWh                               0
Leading_Current_Reactive_Power_kVarh    0
CO2(tCO2)                               0
Lagging_Current_Power_Factor            0
Leading_Current_Power_Factor            0
NSM                                     0
dtype: int64

In [42]:
from pyspark.ml.feature import VectorAssembler
# BEFORE USING SPARKML IT IS COMPULSORY TO CONVERT YOUR DATAFRAME INTO DENSE VECTOR. WITHOUT CONVERTING INTO DENSE VECTOR ALGORITHM WILL NOT WORK.
# DENSE VECTOR JUST CONVERT THAT COLUMNS INTO POINTS. THAT'S ALL

In [44]:
required_features=['Usage_kWh','Leading_Current_Reactive_Power_kVarh','CO2(tCO2)','Lagging_Current_Power_Factor','Leading_Current_Power_Factor','NSM']

In [45]:
assembler = VectorAssembler(inputCols=required_features, outputCol='features')

In [46]:
transformed_data = assembler.transform(features_df)

In [47]:
transformed_data.show(5)

+---------+------------------------------------+---------+----------------------------+----------------------------+------+--------------------+
|Usage_kWh|Leading_Current_Reactive_Power_kVarh|CO2(tCO2)|Lagging_Current_Power_Factor|Leading_Current_Power_Factor|   NSM|            features|
+---------+------------------------------------+---------+----------------------------+----------------------------+------+--------------------+
|     3.17|                                 0.0|      0.0|                       73.21|                       100.0| 900.0|[3.17000007629394...|
|      4.0|                                 0.0|      0.0|                       66.77|                       100.0|1800.0|[4.0,0.0,0.0,66.7...|
|     3.24|                                 0.0|      0.0|                       70.28|                       100.0|2700.0|[3.24000000953674...|
|     3.31|                                 0.0|      0.0|                       68.09|                       100.0|3600.0|[3.3099

In [49]:
#Sp;it the data
(training_data, test_data) = transformed_data.randomSplit([0.7,0.3])

# **Linear Regression**

In [50]:
from pyspark.ml.regression import LinearRegression

In [51]:
Lr = LinearRegression(labelCol='Usage_kWh',
                            featuresCol='features',maxIter=10)

In [52]:
model = Lr.fit(training_data)
predictions = model.transform(test_data)
predictions.select("features","prediction").show(5)

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|           (6,[],[])|9.035455182527636...|
|[2.45000004768371...|  2.4500000476836985|
|[2.45000004768371...|  2.4500000476836985|
|[2.48000001907348...|  2.4800000190734752|
|[2.48000001907348...|  2.4800000190734757|
+--------------------+--------------------+
only showing top 5 rows



# **Decision Tree**

In [57]:
from pyspark.ml.regression import DecisionTreeRegressor

In [58]:
dt = DecisionTreeRegressor(featuresCol='features',labelCol='NSM')

In [56]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol='features',labelCol='NSM')
modell=dt.fit(training_data)
pred=modell.transform(test_data)
pred.select('prediction','NSM').show(5)

+------------------+-------+
|        prediction|    NSM|
+------------------+-------+
|1357.7437858508604|    0.0|
|           84604.8|83700.0|
|           84604.8|84600.0|
| 27887.12613784135|27900.0|
|22523.166023166024|23400.0|
+------------------+-------+
only showing top 5 rows

