# Assessment 2: MongoDB and Spark

Subject: EEET2574 | Big Data for Engineering

Author: Nguyen Quoc Hoang

Student ID: s3697305


# Task 1: MongoDB

#### Credentials to connect to MongoDB


In [1]:
import pymongo

dbUsername = 'admin'
dbPassword = 'admin'
database = 'dataset'
uri = f"mongodb+srv://{dbUsername}:{dbPassword}@cluster0.enorw.mongodb.net/{database}?retryWrites=true&w=majority"
client = pymongo.MongoClient(uri)
db = client[database]
# db = client.test
# db.command('serverStatus')

#### Creating a local Spark session

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark
import warnings
import json
import pandas as pd
import os

conf = pyspark.SparkConf().setAppName('Assignment2').setMaster('local').set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").set("spark.ui.showConsoleProgress", "true")
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

warnings.filterwarnings("ignore")

#### Insert CSVs to MongoDB

In [43]:


electricityPath = '../EEET2574_Assignment2_data/Electricity'
gasPath = '../EEET2574_Assignment2_data/Gas'

electricityCSVs = os.listdir(electricityPath)
gasCSVs = os.listdir(gasPath)

for csv in electricityCSVs:
    fileName = csv[:csv.find('.')]
    filePath = os.path.join(electricityPath, csv)
    data = pd.read_csv(filePath)
    data_json = json.loads(data.to_json(orient='records'))
    db[fileName].remove()
    db[fileName].insert(data_json)
    print(f"Inserted {fileName}")
    
for csv in gasCSVs:
    fileName = csv[:csv.find('.')]
    filePath = os.path.join(gasPath, csv)
    data = pd.read_csv(filePath)
    data_json = json.loads(data.to_json(orient='records'))
    db[fileName].remove()
    db[fileName].insert(data_json)
    print(f"Inserted {fileName}")

print(f"Inserted {len(electricityCSVs) + len(gasCSVs)} collections")


Inserted stedin_electricity_2019
Inserted stedin_electricity_2018
Inserted stedin_electricity_2020
Inserted westland-infra_electricity_2020
Inserted coteq_electricity_2020
Inserted coteq_electricity_2019
Inserted coteq_electricity_2018
Inserted westland-infra_electricity_2018
Inserted westland-infra_electricity_2019
Inserted coteq_gas_2020
Inserted stedin_gas_2018
Inserted stedin_gas_2019
Inserted stedin_gas_2020
Inserted coteq_gas_2019
Inserted coteq_gas_2018
Inserted westland-infra_gas_2018
Inserted westland-infra_gas_2019
Inserted westland-infra_gas_2020
Inserted 18 collections


#### Checking counts in each documents

In [4]:
for i in range(len(db.list_collection_names())):
    col = db.collection_names()[i]
    counts = db[col].count_documents({})
    print(f'{i+1}.\t{col:<35}: {counts} documents')

1.	stedin_gas_2020                    : 88597 documents
2.	coteq_gas_2020                     : 6891 documents
3.	stedin_gas_2019                    : 83511 documents
4.	coteq_electricity_2018             : 2503 documents
5.	stedin_gas_2018                    : 85841 documents
6.	westland-infra_gas_2020            : 2400 documents
7.	coteq_gas_2019                     : 6868 documents
8.	stedin_electricity_2020            : 86338 documents
9.	stedin_electricity_2018            : 84695 documents
10.	stedin_electricity_2019            : 82204 documents
11.	coteq_electricity_2019             : 2563 documents
12.	coteq_electricity_2020             : 2575 documents
13.	westland-infra_electricity_2020    : 2557 documents
14.	coteq_gas_2018                     : 6774 documents
15.	westland-infra_gas_2019            : 2237 documents
16.	westland-infra_electricity_2019    : 2324 documents
17.	westland-infra_electricity_2018    : 2518 documents
18.	westland-infra_gas_2018            : 2375 docum

## Question 1A - How many collections do you have? Why?

I decided to upload all 18 collections given from both gas and electricity datasets. The reason is I want to optimize the data analysis process with the most data possible in order to minimize errors in the model training process, therefore accuracy will certainly increase
![Image of Yaktocat](./images/Q1.jpg)

## Question 1B - Is there any data model pattern in your schema? If yes, why?

When designing for an application, the schema design has a tremendous impact on the performance. Therefore, by analyzing the data model pattern, we will be able to understand our database in advance so that data analyzing process can be done appropritately with the right training model. By observing our dataset, we notice that there is a **Pre-Allocation Pattern** exists. This is because the structure is known beforehand and data can be filled in or grown much simpler. Bigger documents will consume more RAM to contain. This pattern provides a good balance between the desired outcome versus the resource that the solution may consume

# Task 2

### Reading collections from MongoDB

In [3]:
df_list = []
for i in range(len(db.list_collection_names())):
    collection = db.list_collection_names()[i]
    df = (spark.read
             .format("com.mongodb.spark.sql.DefaultSource")
             .option("uri", uri)
             .option("database", database)
             .option("collection", collection)
             .load()
            )
    df_list.append(df)
    print(i, collection)

0 stedin_gas_2020
1 coteq_gas_2020
2 stedin_gas_2019
3 coteq_electricity_2018
4 stedin_gas_2018
5 westland-infra_gas_2020
6 coteq_gas_2019
7 stedin_electricity_2020
8 stedin_electricity_2018
9 stedin_electricity_2019
10 coteq_electricity_2019
11 coteq_electricity_2020
12 westland-infra_electricity_2020
13 coteq_gas_2018
14 westland-infra_gas_2019
15 westland-infra_electricity_2019
16 westland-infra_electricity_2018
17 westland-infra_gas_2018


### Data cleaning: Missing values check

In [4]:
def check_missing_values():
    print("Missing values check:")
    for i in range(len(df_list)):
        missing = False
        collection = df_list[i]
        collectionName = db.list_collection_names()[i]
        for j in range(2, len(collection.columns)):
            column = collection.columns[j]
    #         print(column)
            missingData = collection.filter(collection[column].isNull()).collect()
            if len(missingData) != 0:
                print(f'> Collection "{collectionName}", Column "{column}" missing {len(missingData)} value')
                missing = True
        if missing == False:
            print(f"{collectionName}: 0")
            
check_missing_values()

Missing values check:
stedin_gas_2020: 0
> Collection "coteq_gas_2020", Column "zipcode_from" missing 1 value
stedin_gas_2019: 0
coteq_electricity_2018: 0
> Collection "stedin_gas_2018", Column "purchase_area" missing 1 value
westland-infra_gas_2020: 0
coteq_gas_2019: 0
stedin_electricity_2020: 0
stedin_electricity_2018: 0
stedin_electricity_2019: 0
coteq_electricity_2019: 0
coteq_electricity_2020: 0
westland-infra_electricity_2020: 0
coteq_gas_2018: 0
westland-infra_gas_2019: 0
westland-infra_electricity_2019: 0
westland-infra_electricity_2018: 0
westland-infra_gas_2018: 0


### Data Transformation:

As observed from the result above, we have some missing values in the dataset. Since we don't have a huge amount of missing values, and the missing values are in less important columns such as zipcode, I chose to omit the values because it will not cause a big impact when we clean the data

In [24]:
for i in range(len(df_list)):
    df_list[i] = df_list[i].na.drop()
print(f"Dropped missing values in {i+1} collections")

Dropped missing values in 18 collections


We check again to ensure all missing values have been dropped

In [23]:
check_missing_values()

Missing values check:
stedin_gas_2020: 0
coteq_gas_2020: 0
stedin_gas_2019: 0
coteq_electricity_2018: 0
stedin_gas_2018: 0
westland-infra_gas_2020: 0
coteq_gas_2019: 0
stedin_electricity_2020: 0
stedin_electricity_2018: 0
stedin_electricity_2019: 0
coteq_electricity_2019: 0
coteq_electricity_2020: 0
westland-infra_electricity_2020: 0
coteq_gas_2018: 0
westland-infra_gas_2019: 0
westland-infra_electricity_2019: 0
westland-infra_electricity_2018: 0
westland-infra_gas_2018: 0


#### We will be taking Westland Infra's electricity data in 2018 and 2019 and combine them for further analyzation. We will also convert to Pandas DataFrame for faster processing speed and more intuitive functions

In [98]:
westland_elec_2018_df = df_list[16].toPandas()
westland_elec_2019_df = df_list[15].toPandas()
westland_elec_2020_df = df_list[12].toPandas()

westland_elec_18_19_df = pd.concat([westland_elec_2018_df, westland_elec_2019_df], ignore_index=True)

In [99]:
westland_elec_18_19_df

Unnamed: 0,%Defintieve aansl (NRM),_id,annual_consume,annual_consume_lowtarif_perc,city,delivery_perc,net_manager,num_connections,perc_of_active_connections,purchase_area,smartmeter_perc,street,type_conn_perc,type_of_connection,zipcode_from,zipcode_to
0,100,"(612b9a252b356483f99c6e38,)",7029,96.55,WATERINGEN,89.66,westland-infra,29,100.00,871687800090000015,72.41,OOSTEINDE,59,3x25,2291AA,2291AE
1,100,"(612b9a252b356483f99c6e39,)",5411,100.00,WATERINGEN,87.50,westland-infra,40,100.00,871687800090000015,82.50,WATERPARK,63,1x35,2291AK,2291AK
2,100,"(612b9a252b356483f99c6e3a,)",3239,100.00,WATERINGEN,100.00,westland-infra,29,100.00,871687800090000015,96.55,AZALEAPARK,86,1x35,2291AL,2291AL
3,100,"(612b9a252b356483f99c6e3b,)",5364,100.00,WATERINGEN,69.05,westland-infra,42,97.62,871687800090000015,88.10,DRUIVENLAAN,71,3x25,2291AN,2291AT
4,100,"(612b9a252b356483f99c6e3c,)",2949,100.00,WATERINGEN,100.00,westland-infra,16,100.00,871687800090000015,87.50,KWAKLAAN,81,1x35,2291AV,2291AV
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4837,100,"(612b9a262b356483f99c811d,)",6065,100.00,MAASLAND,100.00,westland-infra,14,100.00,871687800090000015,0.00,IN DE BALIJE,100,3x25,3155XA,3155XA
4838,100,"(612b9a262b356483f99c811e,)",4692,92.86,MAASLAND,100.00,westland-infra,14,100.00,871687800090000015,0.00,KONINGIN JULIANAWEG,86,3x25,3155XB,3155XB
4839,100,"(612b9a262b356483f99c811f,)",6189,86.21,MAASLAND,89.66,westland-infra,29,100.00,871687800090000015,0.00,KONINGIN JULIANAWEG,86,3x25,3155XC,3155XD
4840,100,"(612b9a262b356483f99c8120,)",4309,100.00,MAASLAND,100.00,westland-infra,28,100.00,871687800090000015,0.00,KONINGIN JULIANAWEG,100,3x25,3155XE,3155XH


### Checking anomalies / impossible values:

This function is to check if there's abnormal / impossible values in the numeric fields, such as unwanted negative values, outliers, etc.

In [100]:
for row in range(len(westland_elec_18_19_df)):
    if (westland_elec_18_19_df.num_connections[row] < 0 or
        westland_elec_18_19_df.delivery_perc[row] > 100 or
        westland_elec_18_19_df.perc_of_active_connections[row] > 100 or
        westland_elec_18_19_df.type_conn_perc[row] > 100 or
        westland_elec_18_19_df.annual_consume[row] < 0 or
        westland_elec_18_19_df.smartmeter_perc[row] > 100):
            print("Anomalies / impossible values exist")
            quit()
else:
    print("Passed!")

Passed!


### One hot encoding for categorial data

In [118]:
oneHot_data = pd.get_dummies(westland_elec_18_19_df["type_of_connection"])
trainDF = pd.concat([westland_elec_18_19_df, oneHot_data], axis=1).drop(["type_of_connection"],axis=1)
trainDF

Unnamed: 0,%Defintieve aansl (NRM),_id,annual_consume,annual_consume_lowtarif_perc,city,delivery_perc,net_manager,num_connections,perc_of_active_connections,purchase_area,smartmeter_perc,street,type_conn_perc,zipcode_from,zipcode_to,1x35,3x25,3x35,3x50,3x80
0,100,"(612b9a252b356483f99c6e38,)",7029,96.55,WATERINGEN,89.66,westland-infra,29,100.00,871687800090000015,72.41,OOSTEINDE,59,2291AA,2291AE,0,1,0,0,0
1,100,"(612b9a252b356483f99c6e39,)",5411,100.00,WATERINGEN,87.50,westland-infra,40,100.00,871687800090000015,82.50,WATERPARK,63,2291AK,2291AK,1,0,0,0,0
2,100,"(612b9a252b356483f99c6e3a,)",3239,100.00,WATERINGEN,100.00,westland-infra,29,100.00,871687800090000015,96.55,AZALEAPARK,86,2291AL,2291AL,1,0,0,0,0
3,100,"(612b9a252b356483f99c6e3b,)",5364,100.00,WATERINGEN,69.05,westland-infra,42,97.62,871687800090000015,88.10,DRUIVENLAAN,71,2291AN,2291AT,0,1,0,0,0
4,100,"(612b9a252b356483f99c6e3c,)",2949,100.00,WATERINGEN,100.00,westland-infra,16,100.00,871687800090000015,87.50,KWAKLAAN,81,2291AV,2291AV,1,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4837,100,"(612b9a262b356483f99c811d,)",6065,100.00,MAASLAND,100.00,westland-infra,14,100.00,871687800090000015,0.00,IN DE BALIJE,100,3155XA,3155XA,0,1,0,0,0
4838,100,"(612b9a262b356483f99c811e,)",4692,92.86,MAASLAND,100.00,westland-infra,14,100.00,871687800090000015,0.00,KONINGIN JULIANAWEG,86,3155XB,3155XB,0,1,0,0,0
4839,100,"(612b9a262b356483f99c811f,)",6189,86.21,MAASLAND,89.66,westland-infra,29,100.00,871687800090000015,0.00,KONINGIN JULIANAWEG,86,3155XC,3155XD,0,1,0,0,0
4840,100,"(612b9a262b356483f99c8120,)",4309,100.00,MAASLAND,100.00,westland-infra,28,100.00,871687800090000015,0.00,KONINGIN JULIANAWEG,100,3155XE,3155XH,0,1,0,0,0


### Drop unnecessary data. Keep numerical and one hot encoded columns

In [119]:
trainDF.drop(["%Defintieve aansl (NRM)", "_id", "city", "street", "zipcode_from", "zipcode_to", "net_manager", "purchase_area"], axis=1,inplace=True)

In [120]:
trainDF

Unnamed: 0,annual_consume,annual_consume_lowtarif_perc,delivery_perc,num_connections,perc_of_active_connections,smartmeter_perc,type_conn_perc,1x35,3x25,3x35,3x50,3x80
0,7029,96.55,89.66,29,100.00,72.41,59,0,1,0,0,0
1,5411,100.00,87.50,40,100.00,82.50,63,1,0,0,0,0
2,3239,100.00,100.00,29,100.00,96.55,86,1,0,0,0,0
3,5364,100.00,69.05,42,97.62,88.10,71,0,1,0,0,0
4,2949,100.00,100.00,16,100.00,87.50,81,1,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...
4837,6065,100.00,100.00,14,100.00,0.00,100,0,1,0,0,0
4838,4692,92.86,100.00,14,100.00,0.00,86,0,1,0,0,0
4839,6189,86.21,89.66,29,100.00,0.00,86,0,1,0,0,0
4840,4309,100.00,100.00,28,100.00,0.00,100,0,1,0,0,0


### We repeat the same steps above to construct the test data

In [127]:
oneHot2020 = pd.get_dummies(westland_elec_2020_df["type_of_connection"])
testDF = pd.concat([westland_elec_2020_df, oneHot2020], axis=1).drop(["type_of_connection"],axis=1)
testDF.drop(["%Defintieve aansl (NRM)", "_id", "city", "street", "zipcode_from", "zipcode_to", "net_manager", "purchase_area"], axis=1,inplace=True)
testDF

Unnamed: 0,annual_consume,annual_consume_lowtarif_perc,delivery_perc,num_connections,perc_of_active_connections,smartmeter_perc,type_conn_perc,1x35,3x25,3x35,3x50,3x63,3x80
0,5851,96.43,89.29,28,100.0,71.43,57,0,1,0,0,0,0
1,5077,97.50,62.50,40,100.0,87.50,60,1,0,0,0,0,0
2,3078,100.00,100.00,29,100.0,96.55,86,1,0,0,0,0,0
3,3956,100.00,62.26,53,100.0,96.23,81,0,1,0,0,0,0
4,2574,100.00,100.00,18,100.0,94.44,78,1,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2552,6313,100.00,92.86,14,100.0,85.71,86,0,1,0,0,0,0
2553,4966,100.00,92.86,14,100.0,100.00,64,0,1,0,0,0,0
2554,7293,100.00,75.86,29,100.0,82.76,69,0,1,0,0,0,0
2555,3194,96.43,75.00,28,100.0,60.71,71,0,1,0,0,0,0


## Q2: What are the chosen data cleaning and data transformation steps? Why?

The data cleaning and data transformationa steps I went with are:
- **Check for missing values**: I omit the rows that contain null. Since most of the data lie in less important columns such as zipcode, and the amount of data removed was very small, it wouldn't affect our algorithm later
- **Checking anomalies / impossible values**: among the numerical fields, I look for those with impossible values based on common sense and domain knowledge, such as negative value, percentage greater than 100, etc. If all fields pass the condition check, we are ready to move on 
- **One hot encoding for categorial data**: the type of connection column indicates the principal type of connection in the zipcode range. For electricity is (fuses * ampère). Since the types are stored in string plus they are repeating and we only have one type in each field, we can perform a data transformation with one hot encoding for categorical data and construct a different DataFrame to store such information for further analysis

# Task 3: Model training and tracking with data pipeline and MLflow

In [136]:
# Required Machine Learning libraries
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.linear_model import ElasticNet
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt

# MLflow
import mlflow

np.random.seed(69)

## ElasticNet algorithm

In [132]:
def train_model(trainDF, testDF, alpha_in, l1_ratio_in):
  
  # Evaluate metrics
  def eval_metrics(actual, pred):
      rmse = np.sqrt(mean_squared_error(real, prediction))
      mae = mean_absolute_error(real, prediction)
      r2 = r2_score(real, prediction)
      return rmse, mae, r2

  train_x = trainDF.drop(["annual_consume"], axis=1)
  test_x = testDF.drop(["annual_consume"], axis=1)
  train_y = trainDF[["annual_consume"]]
  test_y = testDF[["annual_consume"]]

  if float(alpha_in) is None:
    alpha = 0.05
  else:
    alpha = float(alpha_in)
    
  if float(l1_ratio_in) is None:
    l1_ratio = 0.05
  else:
    l1_ratio = float(l1_ratio_in)
  
  # Start an MLflow pipeline to keep track of all our runs
  with mlflow.start_run():
    lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
    lr.fit(train_x, train_y)

    predicted_qualities = lr.predict(test_x)

    (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

    print(f"Elasticnet model (alpha={alpha}, l1_ratio=l1_ratio{}):")
    print(f"  RMSE: {rmse}")
    print(f"  MAE: {mae}")
    print(f"  R2: {r2}")

    # Log MLflow
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)
    mlflow.sklearn.log_model(lr, "model")
    modelpath = "/dbfs/mlflow/task_3/model-%f-%f" % (alpha, l1_ratio)
    mlflow.sklearn.save_model(lr, modelpath)
    
    temp = pd.concat([trainDF ,testDF], axis = 0, ignore_index = True)
    X = temp.drop(["annual_consume"], axis = 1)
    X = X.to_numpy()
    y = temp["annual_consume"].to_numpy()


In [None]:
train_model(trainDF, testDF, 0.02, 0.03)

## Linear Regression Algorithm

In [None]:
trainX = trainDF.iloc[:, 0].values.reshape(-1, 1)
# trainX = trainX.transpose()
trainY = testDF.iloc[:, 1].values.reshape(-1, 1)
# trainY = trainY.tranpose()
linear_regressor = LinearRegression() 
linear_regressor.fit(trainX, trainY)
predicted_Y = linear_regressor.predict(trainX)
predicted_Y

## Q3-A: What is/are your final model(s) based on the evaluation metrics?

I tried to used ElasticNet algorithm and Linear Regression Algorithm in my model.

## Q3-B: Did you build one model for both electricity and gas or separate models? Why?

Since both models share the same schema, for the simplicity of the training, I built one model for both electricity and gas.

## Q3-C: Should we build a separate model for each company or not? Why?

Even though they all share the same schema, if we have enough time, we should build a separate model for each company because they have different dataset of each zipcode. For instance, Stedin has drastically bigger data compared to the other 2, therefore being able to analyze the 3 companies combined, we should arrive at a higher accuracy for the model.

# Task 4: Visualisation

Link to my Dashboard: https://charts.mongodb.com/charts-assignment2_mongodb_spark-qmusg/public/dashboards/612bb8e5-2749-4603-8530-521b809bca89

Thank you for your reading ♥️