## Energy Consumption Analysis


**Business problem statement:**

To better track the energy consumption government is willing to install the smart meters across every household. For this there is some data being collected and this data has to be processed.
It is important to get some basic details on the incoming data in order to figure out the main trends of the energy consumption and figure out the plans ahead.

The first step towards the above is to explore the data and understand the data. 


**Objective:**

As a data engineer, you have to experiment with the incoming data and understand the same. There are a few queries/doubts regarding the data which are given below. You are required to iterate over the data to find the answers to the same.

**Data Description:**

Details on the acorn groups and their profile of the people in the group, it's come from this xlsx spreadsheet(acorn_details.xlsx).The first three columns are the attributes studied, the ACORN-X is the index of the attribute. At a national scale, the index is 100 if for one column the value is 150 it means that there are 1.5 times more people with this attribute in the ACORN group than at the national scale. 

### Import the libraries & Establish the connection to the spark UI

In [1]:
# Import the libraries
import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import time
import warnings
warnings.filterwarnings('ignore')

import pyspark 
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

# Create app_name and define the SparkSession
app_name = "Data_Eng"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .config("spark.ui.port","42229")\
        .getOrCreate()
sc = spark.sparkContext



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/21 11:08:27 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/11/21 11:08:27 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/11/21 11:08:27 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/11/21 11:08:27 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [2]:
sc

### Load the data

In [3]:
acorn = spark.read.load("gs://dataproc-staging-us-central1-683341431839-6heyc80d/notebooks/jupyter/acorn_details.csv",
                        format="csv", 
                        sep=",", 
                        inferSchema="true", 
                        header="true")

                                                                                

### Display the records

In [4]:
# Write a code to display the top 5 records
acorn.show(5)

+---------------+----------+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|MAIN CATEGORIES|CATEGORIES|REFERENCE|ACORN-A|ACORN-B|ACORN-C|ACORN-D|ACORN-E|ACORN-F|ACORN-G|ACORN-H|ACORN-I|ACORN-J|ACORN-K|ACORN-L|ACORN-M|ACORN-N|ACORN-O|ACORN-P|ACORN-Q|
+---------------+----------+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|     POPULATION|       Age|  Age 0-4|   77.0|   83.0|   72.0|  100.0|  120.0|   77.0|   97.0|   97.0|   63.0|  119.0|   67.0|  114.0|  113.0|   89.0|  123.0|  138.0|  133.0|
|     POPULATION|       Age| Age 5-17|  117.0|  109.0|   87.0|   69.0|   94.0|   95.0|  102.0|  106.0|   67.0|   95.0|   64.0|  108.0|  116.0|   86.0|   89.0|  136.0|  106.0|
|     POPULATION|       Age|Age 18-24|   64.0|   73.0|   67.0|  107.0|  100.0|   71.0|   83.0|   89.0|   62.0|  104.0|  459.0

### Schema of the data

In [5]:
# List down the columns of dataframe
acorn.printSchema()

root
 |-- MAIN CATEGORIES: string (nullable = true)
 |-- CATEGORIES: string (nullable = true)
 |-- REFERENCE: string (nullable = true)
 |-- ACORN-A: double (nullable = true)
 |-- ACORN-B: double (nullable = true)
 |-- ACORN-C: double (nullable = true)
 |-- ACORN-D: double (nullable = true)
 |-- ACORN-E: double (nullable = true)
 |-- ACORN-F: double (nullable = true)
 |-- ACORN-G: double (nullable = true)
 |-- ACORN-H: double (nullable = true)
 |-- ACORN-I: double (nullable = true)
 |-- ACORN-J: double (nullable = true)
 |-- ACORN-K: double (nullable = true)
 |-- ACORN-L: double (nullable = true)
 |-- ACORN-M: double (nullable = true)
 |-- ACORN-N: double (nullable = true)
 |-- ACORN-O: double (nullable = true)
 |-- ACORN-P: double (nullable = true)
 |-- ACORN-Q: double (nullable = true)



### Shape of the data

In [6]:
# Check the shape of the data
print((acorn.count(), len(acorn.columns)))

(826, 20)


### Describe the data

In [7]:
# Display the statistical summary of the data
acorn.describe().toPandas()

                                                                                

Unnamed: 0,summary,MAIN CATEGORIES,CATEGORIES,REFERENCE,ACORN-A,ACORN-B,ACORN-C,ACORN-D,ACORN-E,ACORN-F,...,ACORN-H,ACORN-I,ACORN-J,ACORN-K,ACORN-L,ACORN-M,ACORN-N,ACORN-O,ACORN-P,ACORN-Q
0,count,826,826,826,826.0,826.0,826.0,826.0,826.0,826.0,...,826.0,826.0,826.0,826.0,826.0,826.0,826.0,826.0,826.0,826.0
1,mean,,,3.5,131.3134950608269,110.860256091746,100.0807886396692,136.8575066657916,117.89475695575238,95.57453484940108,...,97.29891471340824,87.02854474249719,104.2165629465236,127.4829114804938,93.72420907783804,91.4102767170588,79.91237931681677,95.57933466862237,100.14130875332069,90.85542318429532
2,stddev,,,1.2909944487358056,201.44821153680863,42.46404950531124,30.09952949887456,97.74079395405704,35.76880726318079,33.63666093169336,...,18.22923401194174,30.33779445916609,19.92403348807329,97.42815928528232,22.17704111403727,22.909601664226,33.99519171911664,25.93577021666465,37.21028809745696,37.63401716891314
3,min,COMMUNITY SAFETY,Action,1-10.,12.0,0.9570113632792828,0.2819676825794458,2.0,21.0,0.0,...,1.155447725724499,6.363259332739113,16.05070751613441,17.0,0.3935460053464759,0.7148567220403874,2.0,11.0,9.0,1.0
4,max,TRANSPORT,Visit Pubs for a Meal - Evening,"�80,000-�100,000",3795.0,419.0,272.0,1159.03465007297,286.0,462.0,...,192.0,410.0,197.0,1821.0,280.0,161.0,295.0,252.0,389.0,326.0


### Transformation and Feature engineering

In [8]:
# Convert all the Categories column in to lowercase
#acorn.select("*", lower(col('MAIN CATEGORIES')))
from pyspark.sql import functions as F
acorn1=acorn.withColumn('CATEGORIES', F.lower(acorn.CATEGORIES))
# Display the top 5 dataframe records after applying lowercase transformation on the data
acorn1.show(5)
#acorn.select(*select_expression).show()
#acorn.select("*", lower(col('MAIN CATEGORIES'))).show(5)

+---------------+----------+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|MAIN CATEGORIES|CATEGORIES|REFERENCE|ACORN-A|ACORN-B|ACORN-C|ACORN-D|ACORN-E|ACORN-F|ACORN-G|ACORN-H|ACORN-I|ACORN-J|ACORN-K|ACORN-L|ACORN-M|ACORN-N|ACORN-O|ACORN-P|ACORN-Q|
+---------------+----------+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|     POPULATION|       age|  Age 0-4|   77.0|   83.0|   72.0|  100.0|  120.0|   77.0|   97.0|   97.0|   63.0|  119.0|   67.0|  114.0|  113.0|   89.0|  123.0|  138.0|  133.0|
|     POPULATION|       age| Age 5-17|  117.0|  109.0|   87.0|   69.0|   94.0|   95.0|  102.0|  106.0|   67.0|   95.0|   64.0|  108.0|  116.0|   86.0|   89.0|  136.0|  106.0|
|     POPULATION|       age|Age 18-24|   64.0|   73.0|   67.0|  107.0|  100.0|   71.0|   83.0|   89.0|   62.0|  104.0|  459.0

### Aggregation & Groupby

In [9]:
# Group the categories and reference columns and apply the sum aggregation on ACORN-C and max aggregation on ACORN-A
acorn2=acorn.groupBy(["CATEGORIES","REFERENCE"]).agg(F.sum("ACORN-C"),F.max("ACORN-A"))

# Display the top 5 dataframe records
acorn2.show(5)

+--------------------+--------------------+----------------+----------------+
|          CATEGORIES|           REFERENCE|    sum(ACORN-C)|    max(ACORN-A)|
+--------------------+--------------------+----------------+----------------+
|Expenditure per p...|   Total Expenditure|           102.0|           124.0|
|Purchased on the ...|           Mortgages|            89.0|           105.0|
|Sites regularly v...|Notonthehighstree...|           116.0|           116.0|
|High Street Retai...|          John Lewis|161.503555141006|152.182843393153|
|Highest Level of ...|ONC / BTEC / appr...|           128.0|            56.0|
+--------------------+--------------------+----------------+----------------+
only showing top 5 rows



### Spark SQL
Query the data and get the following 

##### 1. Fetch the records where reference -  Age 18-24 or category=geography

In [10]:
# Write your code here & display the top 5 dataframe records
acorn.registerTempTable('acorn')
spark.sql(\
"""select * \
from acorn \
where REFERENCE = 'Age 18-24' \
or CATEGORIES = 'Geography'""").show(5)


+---------------+----------+----------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|MAIN CATEGORIES|CATEGORIES|       REFERENCE|ACORN-A|ACORN-B|ACORN-C|ACORN-D|ACORN-E|ACORN-F|ACORN-G|ACORN-H|ACORN-I|ACORN-J|ACORN-K|ACORN-L|ACORN-M|ACORN-N|ACORN-O|ACORN-P|ACORN-Q|
+---------------+----------+----------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|     POPULATION|       Age|       Age 18-24|   64.0|   73.0|   67.0|  107.0|  100.0|   71.0|   83.0|   89.0|   62.0|  104.0|  459.0|   97.0|   96.0|   86.0|  117.0|  109.0|  110.0|
|     POPULATION| Geography|         England|  107.0|  101.0|  103.0|  114.0|  106.0|   75.0|  107.0|  106.0|  102.0|  106.0|   95.0|   93.0|   97.0|   89.0|   97.0|  110.0|   97.0|
|     POPULATION| Geography|Northern Ireland|   30.0|   95.0|   45.0|    2.0|   49.0|  462

##### 2. Add 200 to acorn B and display the output as a new column - ACORN-B_updated

In [11]:
# Write your code here
acorn3=acorn.withColumn("ACORN-B_Updated", 200 + F.col("ACORN-B"))

# Display the top 5 dataframe records
acorn3.show(5)

+---------------+----------+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+---------------+
|MAIN CATEGORIES|CATEGORIES|REFERENCE|ACORN-A|ACORN-B|ACORN-C|ACORN-D|ACORN-E|ACORN-F|ACORN-G|ACORN-H|ACORN-I|ACORN-J|ACORN-K|ACORN-L|ACORN-M|ACORN-N|ACORN-O|ACORN-P|ACORN-Q|ACORN-B_Updated|
+---------------+----------+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+---------------+
|     POPULATION|       Age|  Age 0-4|   77.0|   83.0|   72.0|  100.0|  120.0|   77.0|   97.0|   97.0|   63.0|  119.0|   67.0|  114.0|  113.0|   89.0|  123.0|  138.0|  133.0|          283.0|
|     POPULATION|       Age| Age 5-17|  117.0|  109.0|   87.0|   69.0|   94.0|   95.0|  102.0|  106.0|   67.0|   95.0|   64.0|  108.0|  116.0|   86.0|   89.0|  136.0|  106.0|          309.0|
|     POPULATION|       Age|Age 18-24|   64.0

### Spark UDF

In [12]:
# Create a function indicating the kids age group. For reference flag all the records with age 0-4 with KIDS and for all the records flag NOT KIDS

def age_interval(REFERENCE):
    if REFERENCE == "Age 0-4":
        return "KIDS"
    else:
        return "NOT KIDS"
    
# Convert the function in to the UDF
age_interval_udf=F.udf(age_interval, StringType())

# Create a new column REFERENCE_age_groups by passing UDF function
age_group=acorn3.withColumn("REFERENCE_age_groups", age_interval_udf("REFERENCE"))
# Display the top 5 dataframe records
age_group.show(5)

+---------------+----------+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+---------------+--------------------+
|MAIN CATEGORIES|CATEGORIES|REFERENCE|ACORN-A|ACORN-B|ACORN-C|ACORN-D|ACORN-E|ACORN-F|ACORN-G|ACORN-H|ACORN-I|ACORN-J|ACORN-K|ACORN-L|ACORN-M|ACORN-N|ACORN-O|ACORN-P|ACORN-Q|ACORN-B_Updated|REFERENCE_age_groups|
+---------------+----------+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+---------------+--------------------+
|     POPULATION|       Age|  Age 0-4|   77.0|   83.0|   72.0|  100.0|  120.0|   77.0|   97.0|   97.0|   63.0|  119.0|   67.0|  114.0|  113.0|   89.0|  123.0|  138.0|  133.0|          283.0|                KIDS|
|     POPULATION|       Age| Age 5-17|  117.0|  109.0|   87.0|   69.0|   94.0|   95.0|  102.0|  106.0|   67.0|   95.0|   64.0|  108.0|  116.0|   86.0|  

                                                                                

### Disconnect the spark

In [13]:
#Stop the spark session on the cloud (in this case, Google Cloud Platform)

