In [None]:
#Installing dependencies needed

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

#INSTRUCTIONS





*   Download the dataset and uncompresed

*   Put the uncompresed file in that directory
 
*   Create a "bigdata" folder in My drive from google

Note: Give access to colab to mount that directory copying and pasting
the access code generated


In [None]:
ls

[0m[01;34mdrive[0m/                                       [01;34mspark-2.4.4-bin-hadoop2.7[0m/
[01;34mnormalized_household_power_consumption.csv[0m/  spark-2.4.4-bin-hadoop2.7.tgz
[01;34msample_data[0m/


In [None]:
!rm -rf spark-2.4.4-bin-hadoop2.7.tgz

In [None]:
!ls

drive					    sample_data
normalized_household_power_consumption.csv  spark-2.4.4-bin-hadoop2.7


In [None]:
#setting envirenment variables
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [None]:
#!printenv

In [None]:
#importing libraries
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [None]:
#start spark local session
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#mount personal drive from google
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!ls "/content/drive/My Drive/bigdata"

household_power_consumption.txt  normalized_household_power_consumption.csv


In [None]:
# location of the file in one personal drive
file="/content/drive/My Drive/bigdata/household_power_consumption.txt"

In [None]:
#loading dataframe
df = spark.read.load(file, format="csv", sep=";", inferSchema="true", header="true")

In [None]:
#Showing data loaded
df.show(5)

+----------+--------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|      Date|    Time|Global_active_power|Global_reactive_power|Voltage|Global_intensity|Sub_metering_1|Sub_metering_2|Sub_metering_3|
+----------+--------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|16/12/2006|17:24:00|              4.216|                0.418|234.840|          18.400|         0.000|         1.000|          17.0|
|16/12/2006|17:25:00|              5.360|                0.436|233.630|          23.000|         0.000|         1.000|          16.0|
|16/12/2006|17:26:00|              5.374|                0.498|233.290|          23.000|         0.000|         2.000|          17.0|
|16/12/2006|17:27:00|              5.388|                0.502|233.740|          23.000|         0.000|         1.000|          17.0|
|16/12/2006|17:28:00|              3.666|                0.528

In [None]:
#Printing the dataset schema
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Global_active_power: string (nullable = true)
 |-- Global_reactive_power: string (nullable = true)
 |-- Voltage: string (nullable = true)
 |-- Global_intensity: string (nullable = true)
 |-- Sub_metering_1: string (nullable = true)
 |-- Sub_metering_2: string (nullable = true)
 |-- Sub_metering_3: double (nullable = true)



2075259

Casting type


In [None]:
#Selecting working columns and casting from string to float
df2 = df.select(df["Global_active_power"].cast("float"),df["Global_reactive_power"].cast("float")
                ,df["Voltage"].cast("float"), df["Global_intensity"].cast("float"))

In [None]:
#Printing new schema
df2.printSchema()

root
 |-- Global_active_power: float (nullable = true)
 |-- Global_reactive_power: float (nullable = true)
 |-- Voltage: float (nullable = true)
 |-- Global_intensity: float (nullable = true)



In [None]:
#Importing utilities from pyspark
from pyspark.sql.functions import *

In [None]:
#Printing how many null values are found after casting
columns=["Global_active_power","Global_reactive_power","Voltage","Global_intensity"]
for c in columns:
  miss=df2.where(col(c).isNull()).count()
  print("Nulls in", c,":",miss)

Nulls in Global_active_power : 25979
Nulls in Global_reactive_power : 25979
Nulls in Voltage : 25979
Nulls in Global_intensity : 25979


Dealing with missing values

In [None]:
#Filling missing values with the mean

for c in columns:
  r=df2.select(mean(c))
  avg=r.collect()[0][0]
  df2.fillna(avg, subset=[c])
  df2.describe([c]).show()



+-------+-------------------+
|summary|Global_active_power|
+-------+-------------------+
|  count|            2049280|
|   mean| 1.0916150366540094|
| stddev| 1.0572941611180025|
|    min|              0.076|
|    max|             11.122|
+-------+-------------------+

+-------+---------------------+
|summary|Global_reactive_power|
+-------+---------------------+
|  count|              2049280|
|   mean|   0.1237144765251571|
| stddev|  0.11272197958641315|
|    min|                  0.0|
|    max|                 1.39|
+-------+---------------------+

+-------+------------------+
|summary|           Voltage|
+-------+------------------+
|  count|           2049280|
|   mean|240.83985796672414|
| stddev|3.2399866612063435|
|    min|             223.2|
|    max|            254.15|
+-------+------------------+

+-------+------------------+
|summary|  Global_intensity|
+-------+------------------+
|  count|           2049280|
|   mean| 4.627759313004169|
| stddev|4.4443962589812385|
|   

In [None]:
# Output (1)
import time
start = time.time()

for c in columns:
  print("Min, max, count values for:",c)
  result = df2.select([min(c), max(c), count(c)])
  result.show()
 
 
 
end = time.time()
print("Execution time: ",(end - start), "seconds")

Min, max, count values for: Global_active_power
+------------------------+------------------------+--------------------------+
|min(Global_active_power)|max(Global_active_power)|count(Global_active_power)|
+------------------------+------------------------+--------------------------+
|                   0.076|                  11.122|                   2049280|
+------------------------+------------------------+--------------------------+

Min, max, count values for: Global_reactive_power
+--------------------------+--------------------------+----------------------------+
|min(Global_reactive_power)|max(Global_reactive_power)|count(Global_reactive_power)|
+--------------------------+--------------------------+----------------------------+
|                       0.0|                      1.39|                     2049280|
+--------------------------+--------------------------+----------------------------+

Min, max, count values for: Voltage
+------------+------------+--------------+
|

In [None]:
#Output 2
for c in columns:
  print("Mean, standard deviation for:",c)
  result = df2.select([mean(c), stddev(c)])
  result.show()
 
 
 
end = time.time()
print("Execution time: ",(end - start), "seconds")

Mean, standard deviation for: Global_active_power
+------------------------+--------------------------------+
|avg(Global_active_power)|stddev_samp(Global_active_power)|
+------------------------+--------------------------------+
|      1.0916150366540094|              1.0572941611180025|
+------------------------+--------------------------------+

Mean, standard deviation for: Global_reactive_power
+--------------------------+----------------------------------+
|avg(Global_reactive_power)|stddev_samp(Global_reactive_power)|
+--------------------------+----------------------------------+
|        0.1237144765251571|               0.11272197958641315|
+--------------------------+----------------------------------+

Mean, standard deviation for: Voltage
+------------------+--------------------+
|      avg(Voltage)|stddev_samp(Voltage)|
+------------------+--------------------+
|240.83985796672414|  3.2399866612063435|
+------------------+--------------------+

Mean, standard deviation fo

In [None]:
#Min max scaling
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler




In [None]:
#Assembling a vector to use in the minmaxscaler 
assembler = VectorAssembler(
    inputCols=columns,
    outputCol="features")

output = assembler.setHandleInvalid("keep").transform(df2)
output.show(5)


+-------------------+---------------------+-------+----------------+--------------------+
|Global_active_power|Global_reactive_power|Voltage|Global_intensity|            features|
+-------------------+---------------------+-------+----------------+--------------------+
|              4.216|                0.418| 234.84|            18.4|[4.21600008010864...|
|               5.36|                0.436| 233.63|            23.0|[5.36000013351440...|
|              5.374|                0.498| 233.29|            23.0|[5.37400007247924...|
|              5.388|                0.502| 233.74|            23.0|[5.38800001144409...|
|              3.666|                0.528| 235.68|            15.8|[3.66599988937377...|
+-------------------+---------------------+-------+----------------+--------------------+
only showing top 5 rows



In [None]:
output.select("features").count()

2075259

In [None]:
#MinMax scaling 
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(output.select("features"))
scaledData = scalerModel.transform(output)
scaledData.show(5)

+-------------------+---------------------+-------+----------------+--------------------+--------------------+
|Global_active_power|Global_reactive_power|Voltage|Global_intensity|            features|      scaledFeatures|
+-------------------+---------------------+-------+----------------+--------------------+--------------------+
|              4.216|                0.418| 234.84|            18.4|[4.21600008010864...|[0.37479632254738...|
|               5.36|                0.436| 233.63|            23.0|[5.36000013351440...|[0.47836323183743...|
|              5.374|                0.498| 233.29|            23.0|[5.37400007247924...|[0.47963065346431...|
|              5.388|                0.502| 233.74|            23.0|[5.38800001144409...|[0.48089807509119...|
|              3.666|                0.528| 235.68|            15.8|[3.66599988937377...|[0.32500452429198...|
+-------------------+---------------------+-------+----------------+--------------------+--------------------+
o

In [None]:
scaledData.select("features", "scaledFeatures").show(5)

+--------------------+--------------------+
|            features|      scaledFeatures|
+--------------------+--------------------+
|[4.21600008010864...|[0.37479632254738...|
|[5.36000013351440...|[0.47836323183743...|
|[5.37400007247924...|[0.47963065346431...|
|[5.38800001144409...|[0.48089807509119...|
|[3.66599988937377...|[0.32500452429198...|
+--------------------+--------------------+
only showing top 5 rows



In [None]:
#Disassembling the vector
def extract(row):
    return tuple(row.scaledFeatures.toArray().tolist())

In [None]:
#Renaming the new dataframe
normalized_columns=["normalized_global_active_power","normalized_global_reactive_power","normalized_voltage","normalized_global_intensity"]
normal_out=scaledData.rdd.map(extract).toDF(normalized_columns)
normal_out.show(5)

+------------------------------+--------------------------------+-------------------+---------------------------+
|normalized_global_active_power|normalized_global_reactive_power| normalized_voltage|normalized_global_intensity|
+------------------------------+--------------------------------+-------------------+---------------------------+
|            0.3747963225473887|              0.3007194366460509|0.37609048586059823|        0.37759334108949466|
|           0.47836323183743595|              0.3136690604292478|0.33699544306942775|        0.47302903063577767|
|           0.47963065346431577|             0.35827338206620574|0.32600960687537933|        0.47302903063577767|
|           0.48089807509119564|              0.3611510643288495| 0.3405495826881009|        0.47302903063577767|
|           0.32500452429198895|              0.3798561169588938| 0.4032309194863383|         0.3236514459515824|
+------------------------------+--------------------------------+-------------------+---

In [None]:
#Ouput(3)
#saving a csv file

normal_out.repartition(1).write.csv("/content/drive/My Drive/bigdata/normalized_household_power_consumption.csv", sep=';', header=True)

