## Energy Consumption Analysis


**Business problem statement:**

To better track the energy consumption government is willing to install the smart meters across every household. For this there is some data being collected and this data has to be processed.
It is important to get some basic details on the incoming data in order to figure out the main trends of the energy consumption and figure out the plans ahead.

The first step towards the above is to explore the data and understand the data. 


**Objective:**

As a data engineer, you have to experiment with the incoming data and understand the same. There are a few queries/doubts regarding the data which are given below. You are required to iterate over the data to find the answers to the same.

**Data Description:**

Details on the acorn groups and their profile of the people in the group, it's come from this xlsx spreadsheet(acorn_details.xlsx).The first three columns are the attributes studied, the ACORN-X is the index of the attribute. At a national scale, the index is 100 if for one column the value is 150 it means that there are 1.5 times more people with this attribute in the ACORN group than at the national scale. 

### Import the libraries & Establish the connection to the spark UI

In [1]:
# Import the libraries
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, DoubleType
from pyspark.sql.types import *

# Create app_name and define the SparkSession
app_name = "project_bde"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .config("spark.ui.port","42229")\
        .getOrCreate()
sc = spark.sparkContext

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-96669d09-3f46-4105-903f-48643c2bbef1;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.1.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-avro_2.12;3.1.2!spark-avro_2.12.jar (24ms)
downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
	[SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (13ms)
:: resolution report :: resolve 1159ms :: artifacts dl 39ms
	:: modules in use:
	org.apache.spark#spark-avro_2.12;3.1.2 from central in [default]
	org.spark-project.spark#unused;1.0.0 from central in [default]
	-----------------------------------

### Load the data

In [5]:
df = spark.read.csv("gs://dataproc-staging-us-central1-971073407971-lreefamw/notebooks/jupyter/acorn_details.csv", header=True)

### Display the records

In [6]:
# Write a code to display the top 5 records
df.limit(5).toPandas()

Unnamed: 0,MAIN CATEGORIES,CATEGORIES,REFERENCE,ACORN-A,ACORN-B,ACORN-C,ACORN-D,ACORN-E,ACORN-F,ACORN-G,ACORN-H,ACORN-I,ACORN-J,ACORN-K,ACORN-L,ACORN-M,ACORN-N,ACORN-O,ACORN-P,ACORN-Q
0,POPULATION,Age,Age 0-4,77.0,83.0,72.0,100.0,120.0,77.0,97.0,97.0,63.0,119.0,67.0,114.0,113.0,89.0,123.0,138.0,133.0
1,POPULATION,Age,Age 5-17,117.0,109.0,87.0,69.0,94.0,95.0,102.0,106.0,67.0,95.0,64.0,108.0,116.0,86.0,89.0,136.0,106.0
2,POPULATION,Age,Age 18-24,64.0,73.0,67.0,107.0,100.0,71.0,83.0,89.0,62.0,104.0,459.0,97.0,96.0,86.0,117.0,109.0,110.0
3,POPULATION,Age,Age 25-34,52.0,63.0,62.0,197.0,151.0,66.0,90.0,88.0,63.0,132.0,145.0,109.0,96.0,90.0,140.0,120.0,120.0
4,POPULATION,Age,Age 35-49,102.0,105.0,91.0,124.0,118.0,93.0,102.0,103.0,76.0,111.0,67.0,99.0,98.0,90.0,102.0,103.0,100.0


### Schema of the data

In [7]:
# List down the columns of dataframe
df.printSchema()

root
 |-- MAIN CATEGORIES: string (nullable = true)
 |-- CATEGORIES: string (nullable = true)
 |-- REFERENCE: string (nullable = true)
 |-- ACORN-A: string (nullable = true)
 |-- ACORN-B: string (nullable = true)
 |-- ACORN-C: string (nullable = true)
 |-- ACORN-D: string (nullable = true)
 |-- ACORN-E: string (nullable = true)
 |-- ACORN-F: string (nullable = true)
 |-- ACORN-G: string (nullable = true)
 |-- ACORN-H: string (nullable = true)
 |-- ACORN-I: string (nullable = true)
 |-- ACORN-J: string (nullable = true)
 |-- ACORN-K: string (nullable = true)
 |-- ACORN-L: string (nullable = true)
 |-- ACORN-M: string (nullable = true)
 |-- ACORN-N: string (nullable = true)
 |-- ACORN-O: string (nullable = true)
 |-- ACORN-P: string (nullable = true)
 |-- ACORN-Q: string (nullable = true)



### Shape of the data

In [8]:
# Check the shape of the data
print('Shape of Acorn data:',(df.count(), len(df.columns)))

Shape of Acorn data: (826, 20)


### Describe the data

In [9]:
# Display the statistical summary of the data
df.describe().toPandas().T

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
MAIN CATEGORIES,826,,,COMMUNITY SAFETY,TRANSPORT
CATEGORIES,826,,,Action,Visit Pubs for a Meal - Evening
REFERENCE,826,3.5,1.2909944487358056,1-10.,"�80,000-�100,000"
ACORN-A,826,131.3134950608269,201.44821153680863,100.0,99.0
ACORN-B,826,110.86025609174601,42.46404950531124,0.9570113632792827,99.0
ACORN-C,826,100.08078863966921,30.09952949887456,0.2819676825794458,99.0
ACORN-D,826,136.8575066657916,97.74079395405704,10.0,99.0
ACORN-E,826,117.89475695575238,35.76880726318079,100.0,99.0
ACORN-F,826,95.57453484940108,33.636660931693356,0.0,99.0


### Transformation and Feature engineering

In [10]:
# Convert all the Categories column in to lowercase
df_transform = df.withColumn('CATEGORIES', F.lower(df.CATEGORIES))


# Display the top 5 dataframe records after applying lowercase transformation on the data
df_transform.limit(5).toPandas()

Unnamed: 0,MAIN CATEGORIES,CATEGORIES,REFERENCE,ACORN-A,ACORN-B,ACORN-C,ACORN-D,ACORN-E,ACORN-F,ACORN-G,ACORN-H,ACORN-I,ACORN-J,ACORN-K,ACORN-L,ACORN-M,ACORN-N,ACORN-O,ACORN-P,ACORN-Q
0,POPULATION,age,Age 0-4,77.0,83.0,72.0,100.0,120.0,77.0,97.0,97.0,63.0,119.0,67.0,114.0,113.0,89.0,123.0,138.0,133.0
1,POPULATION,age,Age 5-17,117.0,109.0,87.0,69.0,94.0,95.0,102.0,106.0,67.0,95.0,64.0,108.0,116.0,86.0,89.0,136.0,106.0
2,POPULATION,age,Age 18-24,64.0,73.0,67.0,107.0,100.0,71.0,83.0,89.0,62.0,104.0,459.0,97.0,96.0,86.0,117.0,109.0,110.0
3,POPULATION,age,Age 25-34,52.0,63.0,62.0,197.0,151.0,66.0,90.0,88.0,63.0,132.0,145.0,109.0,96.0,90.0,140.0,120.0,120.0
4,POPULATION,age,Age 35-49,102.0,105.0,91.0,124.0,118.0,93.0,102.0,103.0,76.0,111.0,67.0,99.0,98.0,90.0,102.0,103.0,100.0


### Aggregation & Groupby

In [11]:
# Group the categories and reference columns and apply the sum aggregation on ACORN-C and max aggregation on ACORN-A
df_group = df_transform.groupBy(["CATEGORIES","REFERENCE"]).agg(F.sum("ACORN-C") ,F.max("ACORN-A"))

# Display the top 5 dataframe records
df_group.limit(5).toPandas()

Unnamed: 0,CATEGORIES,REFERENCE,sum(ACORN-C),max(ACORN-A)
0,action,Make an effort to cut down on water usage at home,107.0,104.0
1,action,Rarely Leave TV or PC on standby for long peri...,107.0,102.0
2,action,Take own shopping bag when shopping,112.0,112.0
3,age,Age 50-64,120.0,124.0
4,attitudes,I am prepared to pay more for products that ma...,111.0,125.0


### Spark SQL
Query the data and get the following 

##### 1. Fetch the records where reference -  Age 18-24 or category=geography

In [12]:
# Write your code here
df_1 = df_transform.filter((df_transform.REFERENCE== 'Age 18-24') | (df_transform.CATEGORIES=='geography'))

# Display the top 5 dataframe records
df_1.limit(5).toPandas()

Unnamed: 0,MAIN CATEGORIES,CATEGORIES,REFERENCE,ACORN-A,ACORN-B,ACORN-C,ACORN-D,ACORN-E,ACORN-F,ACORN-G,ACORN-H,ACORN-I,ACORN-J,ACORN-K,ACORN-L,ACORN-M,ACORN-N,ACORN-O,ACORN-P,ACORN-Q
0,POPULATION,age,Age 18-24,64.0,73.0,67.0,107.0,100.0,71.0,83.0,89.0,62.0,104.0,459.0,97.0,96.0,86.0,117.0,109.0,110.0
1,POPULATION,geography,England,107.0,101.0,103.0,114.0,106.0,75.0,107.0,106.0,102.0,106.0,95.0,93.0,97.0,89.0,97.0,110.0,97.0
2,POPULATION,geography,Northern Ireland,30.0,95.0,45.0,2.0,49.0,462.0,53.0,104.0,30.0,91.0,56.0,87.0,131.0,67.0,95.0,75.0,43.0
3,POPULATION,geography,Scotland,93.0,105.0,87.0,47.0,93.0,144.0,54.0,46.0,97.0,53.0,167.0,114.0,121.0,194.0,139.0,31.0,183.0
4,POPULATION,geography,Wales,22.0,73.0,99.0,10.0,46.0,249.0,77.0,84.0,113.0,73.0,98.0,211.0,104.0,150.0,88.0,54.0,45.0


##### 2. Add 200 to acorn B and display the output as a new column - ACORN-B_updated

In [13]:
# Write your code here
df_2 = df_transform.withColumn("ACORN-B_updated", 200 + F.col("ACORN-B"))

# Display the top 5 dataframe records
df_2.limit(5).toPandas()

Unnamed: 0,MAIN CATEGORIES,CATEGORIES,REFERENCE,ACORN-A,ACORN-B,ACORN-C,ACORN-D,ACORN-E,ACORN-F,ACORN-G,...,ACORN-I,ACORN-J,ACORN-K,ACORN-L,ACORN-M,ACORN-N,ACORN-O,ACORN-P,ACORN-Q,ACORN-B_updated
0,POPULATION,age,Age 0-4,77.0,83.0,72.0,100.0,120.0,77.0,97.0,...,63.0,119.0,67.0,114.0,113.0,89.0,123.0,138.0,133.0,283.0
1,POPULATION,age,Age 5-17,117.0,109.0,87.0,69.0,94.0,95.0,102.0,...,67.0,95.0,64.0,108.0,116.0,86.0,89.0,136.0,106.0,309.0
2,POPULATION,age,Age 18-24,64.0,73.0,67.0,107.0,100.0,71.0,83.0,...,62.0,104.0,459.0,97.0,96.0,86.0,117.0,109.0,110.0,273.0
3,POPULATION,age,Age 25-34,52.0,63.0,62.0,197.0,151.0,66.0,90.0,...,63.0,132.0,145.0,109.0,96.0,90.0,140.0,120.0,120.0,263.0
4,POPULATION,age,Age 35-49,102.0,105.0,91.0,124.0,118.0,93.0,102.0,...,76.0,111.0,67.0,99.0,98.0,90.0,102.0,103.0,100.0,305.0


### Spark UDF

In [15]:
# Create a function indicating the kids age group. For reference flag all the records with age 0-4 with KIDS and for all the records flag NOT KIDS
def age_interval(age):
    if age == 'Age 0-4': 
        return 'KIDS'
    else:
        return 'NOT KIDS'

    
# Convert the function in to the UDF
age_interval_UDF = F.udf(age_interval, StringType())

# Create a new column REFERENCE_age_groups by passing UDF function
df_age = df_transform.withColumn("REFERENCE_age_groups", age_interval_UDF("REFERENCE"))

# Display the top 5 dataframe records
df_age.limit(5).toPandas()

                                                                                

Unnamed: 0,MAIN CATEGORIES,CATEGORIES,REFERENCE,ACORN-A,ACORN-B,ACORN-C,ACORN-D,ACORN-E,ACORN-F,ACORN-G,...,ACORN-I,ACORN-J,ACORN-K,ACORN-L,ACORN-M,ACORN-N,ACORN-O,ACORN-P,ACORN-Q,REFERENCE_age_groups
0,POPULATION,age,Age 0-4,77.0,83.0,72.0,100.0,120.0,77.0,97.0,...,63.0,119.0,67.0,114.0,113.0,89.0,123.0,138.0,133.0,KIDS
1,POPULATION,age,Age 5-17,117.0,109.0,87.0,69.0,94.0,95.0,102.0,...,67.0,95.0,64.0,108.0,116.0,86.0,89.0,136.0,106.0,NOT KIDS
2,POPULATION,age,Age 18-24,64.0,73.0,67.0,107.0,100.0,71.0,83.0,...,62.0,104.0,459.0,97.0,96.0,86.0,117.0,109.0,110.0,NOT KIDS
3,POPULATION,age,Age 25-34,52.0,63.0,62.0,197.0,151.0,66.0,90.0,...,63.0,132.0,145.0,109.0,96.0,90.0,140.0,120.0,120.0,NOT KIDS
4,POPULATION,age,Age 35-49,102.0,105.0,91.0,124.0,118.0,93.0,102.0,...,76.0,111.0,67.0,99.0,98.0,90.0,102.0,103.0,100.0,NOT KIDS


### Disconnect the spark

In [26]:
#Stop the spark session
# Note: Best practices, before you stop the connection to the spark UI make sure you have to take a required screenshots.
# once you stop the spark you will lost the connection to the spark UI.



### Screenshots

In [None]:
# Submit 2 screenshots
# 1.Spark Stages - after running all the codes, Instruction: Go to spark UI, click on stages tab and take a screenshot
# 2. DAG Visualization, Instruction: Go to spark UI then go to the jobs tab, Click on latest job that ran, and take a screenshot