# Dataset Clustering


### Dataset Description

Name : [Energy consumption of the Netherlands](https://www.kaggle.com/lucabasa/dutch-energy)

Enexis, Liander, and Stedin are the three major network administrators of the Netherlands and, together, they provide energy to nearly the entire country. Every year, they release on their websites a table with the energy consumption of the areas under their administration.

The data are anonymized by aggregating the Zipcodes so that every entry describes at least 10 connections.

This market is not competitive, meaning that the zones are assigned. This means that every year they roughly provide energy to the same zipcodes. Small changes can happen from year to year either for a change of management or for a different aggregation of zipcodes.


Every file is from a network administrator from a specific year.

### The columns in each file are:

- net_manager: code of the regional network manager
- purchase_area: code of the area where the energy is purchased
- street: Name of the street
- zipcode_from and zipcode_to: 2 columns for the range of zipcodes covered, 4 numbers and 2 letters
- city: Name of the city
- num_connections: Number of connections in the range of zipcodes
- delivery_perc: percentage of the net consumption of electricity or gas. The lower, the more energy was given back to the grid (for example if you have solar panels)
- perc_of_active_connections: Percentage of active connections in the zipcode range
- type_of_connection: principal type of connection in the zipcode range. For electricity is # fuses X # ampère. For gas is G4, G6, G10, G16, G25
- type_conn_perc: percentage of presence of the principal type of connection in the zipcode range
- annual_consume: Annual consume. Kwh for electricity, m3 for gas
- annual_consume_lowtarif_perc: Percentage of consume during the low tarif hours. From 10 p.m. to 7 a.m. and during weekends.
- smartmeter_perc: percentage of smartmeters in the zipcode ranges

In [105]:
# Spark Initialization

from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [106]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x1197626a0>


## Dataset Preprocess

### Combine all the dataset

Append Dataset from specific provider

In [107]:
import numpy as np
import pandas as pd

#visualization
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

from os.path import join, isfile
from os import path, scandir, listdir

import gc

In [108]:
def list_all_files(location='../dataset/', pattern=None, recursive=True):
    """
    This function returns a list of files at a given location (including subfolders)
    
    - location: path to the directory to be searched
    - pattern: part of the file name to be searched (ex. pattern='.csv' would return all the csv files)
    - recursive: boolean, if True the function calls itself for every subdirectory it finds
    """
    subdirectories= [f.path for f in scandir(location) if f.is_dir()]
    files = [join(location, f) for f in listdir(location) if isfile(join(location, f))]
    if recursive:
        for directory in subdirectories:
            files.extend(list_all_files(directory))
    if pattern:
        files = [f for f in files if pattern in f]
    return files

In [109]:
list_all_files('../dataset/Electricity/', pattern='stedin')

['../dataset/Electricity/stedin_electricity_2019.csv',
 '../dataset/Electricity/stedin_electricity_2018.csv',
 '../dataset/Electricity/stedin_electricity_2009.csv',
 '../dataset/Electricity/stedin_electricity_2010.csv',
 '../dataset/Electricity/stedin_electricity_2011.csv',
 '../dataset/Electricity/stedin_electricity_2013.csv',
 '../dataset/Electricity/stedin_electricity_2012.csv',
 '../dataset/Electricity/stedin_electricity_2016.csv',
 '../dataset/Electricity/stedin_electricity_2017.csv',
 '../dataset/Electricity/stedin_electricity_2015.csv',
 '../dataset/Electricity/stedin_electricity_2014.csv']

In [110]:
import pandas as pd
import glob

interesting_files = glob.glob("../dataset/Electricity/stedin*.csv") 
print (sorted(interesting_files))

df_list = []
for filename in sorted(interesting_files):
    df_list.append(pd.read_csv(filename))
full_df = pd.concat(df_list)

full_df.to_csv('output.csv')

['../dataset/Electricity/stedin_electricity_2009.csv', '../dataset/Electricity/stedin_electricity_2010.csv', '../dataset/Electricity/stedin_electricity_2011.csv', '../dataset/Electricity/stedin_electricity_2012.csv', '../dataset/Electricity/stedin_electricity_2013.csv', '../dataset/Electricity/stedin_electricity_2014.csv', '../dataset/Electricity/stedin_electricity_2015.csv', '../dataset/Electricity/stedin_electricity_2016.csv', '../dataset/Electricity/stedin_electricity_2017.csv', '../dataset/Electricity/stedin_electricity_2018.csv', '../dataset/Electricity/stedin_electricity_2019.csv']


## Clustering Session

In [111]:
## reload dataset
df = spark.read.csv("./consumer/output.csv", header=True, inferSchema=True)

In [112]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- net_manager: long (nullable = true)
 |-- purchase_area: string (nullable = true)
 |-- street: string (nullable = true)
 |-- zipcode_from: string (nullable = true)
 |-- zipcode_to: string (nullable = true)
 |-- city: string (nullable = true)
 |-- num_connections: integer (nullable = true)
 |-- delivery_perc: double (nullable = true)
 |-- perc_of_active_connections: double (nullable = true)
 |-- type_conn_perc: double (nullable = true)
 |-- type_of_connection: string (nullable = true)
 |-- annual_consume: integer (nullable = true)
 |-- annual_consume_lowtarif_perc: double (nullable = true)
 |-- smartmeter_perc: double (nullable = true)



In [113]:
# assembling vector
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["annual_consume"],
    outputCol='features')

data = assembler.transform(df)
data.show()

+---+-------------+--------------+--------------------+------------+----------+----------+---------------+-------------+--------------------------+--------------+------------------+--------------+----------------------------+---------------+--------+
|_c0|  net_manager| purchase_area|              street|zipcode_from|zipcode_to|      city|num_connections|delivery_perc|perc_of_active_connections|type_conn_perc|type_of_connection|annual_consume|annual_consume_lowtarif_perc|smartmeter_perc|features|
+---+-------------+--------------+--------------------+------------+----------+----------+---------------+-------------+--------------------------+--------------+------------------+--------------+----------------------------+---------------+--------+
|  0|8716874000009|Stedin Utrecht|Gijsbrecht van Am...|      1181EJ|    1231AB|AMSTELVEEN|             32|        100.0|                     93.75|          78.0|              3x25|          5948|                       81.25|            0.0|[5948.

In [114]:
# Train model
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans().setK(10).setSeed(1)
model = kmeans.fit(data)

In [115]:
# Make a prediction
predictions = model.transform(data)
predictions.show(5)

+---+-------------+--------------+--------------------+------------+----------+----------+---------------+-------------+--------------------------+--------------+------------------+--------------+----------------------------+---------------+--------+----------+
|_c0|  net_manager| purchase_area|              street|zipcode_from|zipcode_to|      city|num_connections|delivery_perc|perc_of_active_connections|type_conn_perc|type_of_connection|annual_consume|annual_consume_lowtarif_perc|smartmeter_perc|features|prediction|
+---+-------------+--------------+--------------------+------------+----------+----------+---------------+-------------+--------------------------+--------------+------------------+--------------+----------------------------+---------------+--------+----------+
|  0|8716874000009|Stedin Utrecht|Gijsbrecht van Am...|      1181EJ|    1231AB|AMSTELVEEN|             32|        100.0|                     93.75|          78.0|              3x25|          5948|                  

In [116]:
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.6943417399022496


In [117]:
# Shows the result
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[4401.83472171]
[13633.85643048]
[3222.14101387]
[19486.50643288]
[9376.52710749]
[40276.51734104]
[27509.94333333]
[6346.39143773]
[2116.36194418]
[84505.96153846]


In [103]:
import pandas
import glob
dataset_path = "consumer"

set = '2'

interesting_files = glob.glob('consumer/output[0-'+set+'].csv') 

print (interesting_files)

df_list = []
for filename in sorted(interesting_files):
    df_list.append(pandas.read_csv(filename,header=None))
full_df = pandas.concat(df_list,)

full_df.head(20)

['consumer/output1.csv', 'consumer/output0.csv', 'consumer/output2.csv']


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13
0,net_manager,purchase_area,street,zipcode_from,zipcode_to,city,delivery_perc,num_connections,perc_of_active_connections,type_conn_perc,type_of_connection,annual_consume,annual_consume_lowtarif_perc,smartmeter_perc
1,Enexis B.V.,ENEXIS,Sasdijk,4251AB,4251AB,WERKENDAM,100,16,100,,,4282,25,0.0
2,Enexis B.V.,ENEXIS,Sasdijk,4251AC,4251AC,WERKENDAM,100,11,100,,,5113,10,0.0
3,Enexis B.V.,ENEXIS,Sasdijk,4251AD,4251AD,WERKENDAM,100,30,100,,,4809,34,0.0
4,Enexis B.V.,ENEXIS,Nieuweweg,4251AE,4251AG,WERKENDAM,100,21,100,,,5015,44,0.0
0,net_manager,purchase_area,street,zipcode_from,zipcode_to,city,delivery_perc,num_connections,perc_of_active_connections,type_conn_perc,type_of_connection,annual_consume,annual_consume_lowtarif_perc,smartmeter_perc
1,Enexis B.V.,ENEXIS,Sasdijk,4251AB,4251AB,WERKENDAM,100,16,100,,,4282,25,0.0
2,Enexis B.V.,ENEXIS,Sasdijk,4251AC,4251AC,WERKENDAM,100,11,100,,,5113,10,0.0
3,Enexis B.V.,ENEXIS,Sasdijk,4251AD,4251AD,WERKENDAM,100,30,100,,,4809,34,0.0
4,Enexis B.V.,ENEXIS,Nieuweweg,4251AE,4251AG,WERKENDAM,100,21,100,,,5015,44,0.0


In [119]:
dfspark = spark.createDataFrame(full_df)

In [120]:
dfspark.printSchema()

root
 |-- net_manager: long (nullable = true)
 |-- purchase_area: string (nullable = true)
 |-- street: string (nullable = true)
 |-- zipcode_from: string (nullable = true)
 |-- zipcode_to: string (nullable = true)
 |-- city: string (nullable = true)
 |-- num_connections: long (nullable = true)
 |-- delivery_perc: double (nullable = true)
 |-- perc_of_active_connections: double (nullable = true)
 |-- type_conn_perc: double (nullable = true)
 |-- type_of_connection: string (nullable = true)
 |-- annual_consume: long (nullable = true)
 |-- annual_consume_lowtarif_perc: double (nullable = true)
 |-- smartmeter_perc: double (nullable = true)



In [82]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['annual_consume'],
    outputCol='features')

data = assembler.transform(full_df)
data.show()

AttributeError: 'DataFrame' object has no attribute '_jdf'

In [94]:
val = 1234
type(val)

input = spark.createDataFrame([("1","2","3","4","5","6","7","8","9","10","11",5123,"13","14",[val])],["net_manager","purchase_area","street","zipcode_from","zipcode_to","city","delivery_perc","num_connections","perc_of_active_connections","type_conn_perc","type_of_connection","annual_consume","annual_consume_lowtarif_perc","smartmeter_perc","features"])
model.transform(input)

IllegalArgumentException: 'requirement failed: Column features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>] but was actually of type array<bigint>.'

In [93]:
type(val)

int

In [80]:
model.transform(input)

IllegalArgumentException: 'requirement failed: Column features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>] but was actually of type array<bigint>.'

In [69]:
import numpy as np
import pandas as pd
columns = ["net_manager","purchase_area","street","zipcode_from","zipcode_to","city","delivery_perc","num_connections","perc_of_active_connections","type_conn_perc","type_of_connection","annual_consume","annual_consume_lowtarif_perc","smartmeter_perc","features"]
rows = ["D", "E", "F"]
data = np.array(["1","2","3","4","5","6","7","8","9","10","11",5123,"13","14",[4123]])
df = pd.DataFrame(data=data, columns=columns)

ValueError: Shape of passed values is (15, 1), indices imply (15, 15)

In [66]:
df.head()

Unnamed: 0,A,B,C
D,1,2,2
E,3,3,3
F,4,4,4


In [128]:
Data = {'name':  ['ok'],
        'features': 1234,
      }


fuad = pd.DataFrame (Data, columns = ['name','features'])

In [129]:
fuad = spark.createDataFrame(fuad)
fuad.printSchema()

root
 |-- name: string (nullable = true)
 |-- features: long (nullable = true)



In [130]:
model.transform(fuad)

IllegalArgumentException: 'requirement failed: Column features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>] but was actually of type bigint.'