#  Energy Meter Analysis
# Data Engineer Assignment

<img src="logo.png">

# Scenario (Assignment)

In this repository you'll find a directory with files containing energy meter-data.

Please build a PySpark solution that reads in the data and writes out a table with the energy 
usage for each meter at each 15 minute interval, including additional data fields with 
the hourly averaged energy usage and a flag indicating missing intervals. 
The only requirement is that the solution should only include native PySpark (and thereby no UDFs).

The solution should be a generic solution for this problem and not tailored to the data provided.

Please provide your Pyspark code to this assignment along with your 
application to the data engineering position at XYZ. We are looking forward to your solution!

In [1]:
import os
from pyspark.sql.session import SparkSession
from pyspark.sql.types import DoubleType , DateType,TimestampType
from pyspark.sql.functions import lit

building session object

In [2]:
sess=SparkSession.builder.master('local').appName("Energy_Meter_Analysis").getOrCreate()

In [3]:
DataDirectory = "data//"

In [4]:
print(os.listdir(DataDirectory))

['input_0.csv', 'input_1.csv', 'input_2.csv', 'input_3.csv', 'input_4.csv', 'input_5.csv', 'input_6.csv', 'input_7.csv', 'input_8.csv', 'input_9.csv']


In [5]:
#Reading Data CSV file
df=sess.read.option("header",True).csv(os.path.join(DataDirectory,"input_0.csv"))

In [6]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- meter_id: string (nullable = true)
 |-- interval_date_time: string (nullable = true)
 |-- energy_wh: string (nullable = true)



In [7]:
print(df.show(3))

+---+--------------------+--------------------+---------+
|_c0|            meter_id|  interval_date_time|energy_wh|
+---+--------------------+--------------------+---------+
|  0|475bd821-23e6-463...|2020-06-06T09:15:...|    312.4|
|  1|475bd821-23e6-463...|2020-06-04T11:45:...|    208.2|
|  2|475bd821-23e6-463...|2020-06-04T15:15:...|    113.5|
+---+--------------------+--------------------+---------+
only showing top 3 rows

None


# Reading all the CSV data files in single shot

In [8]:
test=sess.read.option("header", True).csv(DataDirectory+"*.csv")

In [9]:
test.count()

1246

In [10]:
test.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- meter_id: string (nullable = true)
 |-- interval_date_time: string (nullable = true)
 |-- energy_wh: string (nullable = true)



In [11]:
print(type(test))

<class 'pyspark.sql.dataframe.DataFrame'>


In [12]:
test1=test.rdd.map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y)

In [13]:
test1.take(3)

[('475bd821-23e6-4634-83c3-d85aa0c548c9', 572),
 ('a55b578f-5bbf-4a46-a83c-1d911ed1ff95', 672),
 ('108081ae-a03d-4513-b118-b98cbe84a766', 2)]

In [14]:
df_sorted=test.orderBy([test[1],test[2]],ascending=[False,True]) # meter ID and Date

In [15]:
df_sorted.show(5)

+---+--------------------+--------------------+---------+
|_c0|            meter_id|  interval_date_time|energy_wh|
+---+--------------------+--------------------+---------+
| 63|a55b578f-5bbf-4a4...|2020-06-01T00:00:...|    111.5|
|121|a55b578f-5bbf-4a4...|2020-06-01T00:15:...|    111.5|
|122|a55b578f-5bbf-4a4...|2020-06-01T00:30:...|    111.5|
| 76|a55b578f-5bbf-4a4...|2020-06-01T00:45:...|    111.5|
| 78|a55b578f-5bbf-4a4...|2020-06-01T01:00:...|    146.8|
+---+--------------------+--------------------+---------+
only showing top 5 rows



In [16]:
df_sorted=df_sorted.select(df_sorted[0].alias("index").cast(DoubleType()),df_sorted[1].alias("Meter_ID"),df_sorted[2].alias("Interval").cast(TimestampType()),df_sorted[3].alias("EnergyConsumption[15Mins]").cast(DoubleType()))

In [17]:
df_sorted.show(3)
df_sorted.printSchema()

+-----+--------------------+-------------------+-------------------------+
|index|            Meter_ID|           Interval|EnergyConsumption[15Mins]|
+-----+--------------------+-------------------+-------------------------+
| 63.0|a55b578f-5bbf-4a4...|2020-06-01 05:30:00|                    111.5|
|121.0|a55b578f-5bbf-4a4...|2020-06-01 05:45:00|                    111.5|
|122.0|a55b578f-5bbf-4a4...|2020-06-01 06:00:00|                    111.5|
+-----+--------------------+-------------------+-------------------------+
only showing top 3 rows

root
 |-- index: double (nullable = true)
 |-- Meter_ID: string (nullable = true)
 |-- Interval: timestamp (nullable = true)
 |-- EnergyConsumption[15Mins]: double (nullable = true)



In [18]:
sample=df_sorted

# Total Consumption Per Meter

In [19]:
total_consumption_per_meter=sample.rdd.map(lambda x:(x[1],x[3])).reduceByKey(lambda a,b:round(a+b,2)).collect()
# total_consumption_per_meter=[total_consumption[0][0],total_consumption[0][1]]
total_consumption_per_meter

[('475bd821-23e6-4634-83c3-d85aa0c548c9', 192638.7),
 ('108081ae-a03d-4513-b118-b98cbe84a766', 86.0),
 ('a55b578f-5bbf-4a46-a83c-1d911ed1ff95', 196805.2)]

# Total number of hours per meter (excluding NULL values)

In [20]:
total_hours_per_meter=sample.rdd.filter(lambda x:x[3] >0).map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y).collect()
total_hours_per_meter=[[i,j/4] for i,j in total_hours_per_meter]
total_hours_per_meter

[['475bd821-23e6-4634-83c3-d85aa0c548c9', 143.0],
 ['108081ae-a03d-4513-b118-b98cbe84a766', 0.5],
 ['a55b578f-5bbf-4a46-a83c-1d911ed1ff95', 125.0]]

# Total number of Missing Entries per meter

In [21]:
total_Missing_Entries_Per_Meter=sample.rdd.filter(lambda x:x[3] == 0).map(lambda x:(x[1],1)).reduceByKey(lambda a,b:round(a+b,2)).collect()
# map(lambda x:(x[1],1)).reduceByKey(lambda a,b:round(a+b,2)).collect()
total_Missing_Entries_Per_Meter=[[i,j] for i,j in total_Missing_Entries_Per_Meter]
total_Missing_Entries_Per_Meter

[['a55b578f-5bbf-4a46-a83c-1d911ed1ff95', 172]]

Merging lists based on key

In [22]:
#Combined  list with number of reading hours , total comsumption and missing values
Merged_List=[]
for i in  total_hours_per_meter:
    for j in total_consumption_per_meter:
        for k in total_Missing_Entries_Per_Meter:
            if j[0]==i[0]:
                if k[0]==j[0]:
                    Merged_List.append([j[0],i[1],j[1],k[1]])
                else:
                    Merged_List.append([j[0],i[1],j[1],0])                    
   
Merged_List

[['475bd821-23e6-4634-83c3-d85aa0c548c9', 143.0, 192638.7, 0],
 ['108081ae-a03d-4513-b118-b98cbe84a766', 0.5, 86.0, 0],
 ['a55b578f-5bbf-4a46-a83c-1d911ed1ff95', 125.0, 196805.2, 172]]

In [23]:
#not used
# def nullCount(meterId):
#     tmp=0
#     for i in total_Missing_Entries_Per_Meter:
#         if i[0][1] in meterId:
#             tmp=i[0][1]
#     return lit(tmp)

In [24]:
final_df=sess.createDataFrame(Merged_List).toDF("MeterId","Hours","TotalConsumption","MissingIntervals")
final_df.show()

+--------------------+-----+----------------+----------------+
|             MeterId|Hours|TotalConsumption|MissingIntervals|
+--------------------+-----+----------------+----------------+
|475bd821-23e6-463...|143.0|        192638.7|               0|
|108081ae-a03d-451...|  0.5|            86.0|               0|
|a55b578f-5bbf-4a4...|125.0|        196805.2|             172|
+--------------------+-----+----------------+----------------+



# Final Table displaying Meter ID, with avg hourly consumption, avg 15 min consumption and missing interval for each meter ID

In [25]:
final_df.select(final_df[0],(final_df[2]/final_df[1]).cast(DoubleType()).alias("Average_HourlyConsumption"),((final_df[2]/final_df[1])/4).cast(DoubleType()).alias("Average_15minConsumption"), final_df[3]).show()

+--------------------+-------------------------+------------------------+----------------+
|             MeterId|Average_HourlyConsumption|Average_15minConsumption|MissingIntervals|
+--------------------+-------------------------+------------------------+----------------+
|475bd821-23e6-463...|       1347.1237762237763|       336.7809440559441|               0|
|108081ae-a03d-451...|                    172.0|                    43.0|               0|
|a55b578f-5bbf-4a4...|                1574.4416|                393.6104|             172|
+--------------------+-------------------------+------------------------+----------------+

