## ingest_energy_estimations_Kenya
### Ingesting energy estimates into the database based on the building matching created by building_matching

### Initial configuration
#### To start working with this particular notebook, you need to provide necessary credential and settings
#### Below is an template of configuration, which is necessary prepare aside of this notebook and copy & paste all content in triple quotes to the next cell's input field
    """
    {
    "COS_ENDPOINT_URL": "s3.private.eu-de.cloud-object-storage.appdomain.cloud",
    "COS_AUTH_ENDPOINT_URL": "https://iam.cloud.ibm.com/oidc/token",
    "COS_APIKEY": "xxx",
    "DB2_CONNECTION_STRING": "jdbc:db2://65beb513-5d3d-4101-9001-f42e9dc954b3.brt9d04f0cmqeb8u7740.databases.appdomain.cloud:30371/BLUDB:sslConnection=true;useJDBC4ColumnNameAndLabelSemantics=false;db2.jcc.charsetDecoderEncoder=3;",
    "DB2_USERNAME": "xxx",
    "DB2_PASSWORD": "xxx",
    "UTILS_BUCKET": "notebook-utils-bucket",
    "ENERGY_ESTIMATION_BUCKET": "kenya-energy-estimation-matching",
    "JOB_STATUS_BUCKET": "notebook-job-status",
    "COUNTRY_TABLE": "FEATURES_DB_VIDA_EXTENDED"
    }
    """


In [None]:
# Read notebook configuration
import getpass
import json

config_str = getpass.getpass('Enter your prepared config: ')
config = json.loads(config_str)

In [None]:
# import necessary libraries
import ibm_boto3
from botocore.client import Config
import pandas as pd
import geopandas as gpd
from tqdm import tqdm
import os
import sys
import traceback
import io

import jaydebeapi as jdbc
import jpype
import threading
import time

In [None]:
# cloud object storage client instance
cos_client = ibm_boto3.client(service_name='s3',
                              ibm_api_key_id=config["COS_APIKEY"],
                              ibm_auth_endpoint=config["COS_AUTH_ENDPOINT_URL"],
                              config=Config(signature_version='oauth'),
                              endpoint_url=config["COS_ENDPOINT_URL"])

response = cos_client.list_objects_v2(Bucket=config["UTILS_BUCKET"])

# download utils module
try:
    from utils import *
    print('External utils succesfully imported')
    
except Exception as e:
    print('Desired packages is missing in local env, downloading it...', e)
    for obj in response['Contents']:
        name = obj['Key']
        
        if name == 'db2jcc4.jar':
            streaming_body_1 = cos_client.get_object(Bucket=config["UTILS_BUCKET"], Key=name)['Body']
            print("Downloading to localStorage :  " + name)
            with io.FileIO(name, 'w') as file:
                for i in io.BytesIO(streaming_body_1.read()):
                    file.write(i)


In [None]:
def connect_to_db():
    '''
        Connect to the IBM DB2 database
    '''
    
    jar = 'db2jcc4.jar'
    os.environ['CLASSPATH'] = jar

    args='-Djava.class.path=%s' % jar
    jvm_path = jpype.getDefaultJVMPath()
    try:
        jpype.startJVM(jvm_path, args)
    except Exception as e:
        print('startJVM exception: ', e)
        
    if jpype.isJVMStarted() and not jpype.isThreadAttachedToJVM():
        jpype.attachThreadToJVM()
        jpype.java.lang.Thread.currentThread().setContextClassLoader(jpype.java.lang.ClassLoader.getSystemClassLoader())
        
    
    conn = jdbc.connect(
                'com.ibm.db2.jcc.DB2Driver',
                config['DB2_CONNECTION_STRING'],
                [config["DB2_USERNAME"], config["DB2_PASSWORD"]],
                'db2jcc4.jar')

    return conn

conn = connect_to_db()
cursor = conn.cursor()

In [None]:
# check if exists matched parquets folder
parquets_folder = 'matched_buildings'

if os.path.exists(parquets_folder) == False:
    os.mkdir(parquets_folder)

files = os.listdir(parquets_folder)

# list matched parquets from bucket 
response = cos_client.list_objects_v2(Bucket=config["ENERGY_ESTIMATION_BUCKET"])
objects = [i['Key'] for i in response['Contents']]

In [None]:
# select patquets that need to be downloaded
objects_to_download = [i for i in objects if i not in files]
objects_to_download

In [None]:
# download remaining objects to the folder
for o in tqdm(objects_to_download, desc='Downloading', total=len(objects_to_download)):
    cos_client.download_file(config["ENERGY_ESTIMATION_BUCKET"], o, os.path.join(parquets_folder, o))

In [None]:
def log_state_to_bucket(processing_state: dict):
    '''
        Function for updating matching state to the 'notebook-job-status' bucket
        Each call of this function uploads a selected json file with updated state to the afore mentioned bucket
    '''
    
    filename = f'ingesting energy_estimates_Kenya_status.json'
    with open(filename, "w") as outfile:
                json.dump(processing_state, outfile)
                
    cos_client.upload_file(
        Filename=filename,
        Bucket=config["JOB_STATUS_BUCKET"],
        Key=filename,
        )

In [None]:
files = os.listdir(parquets_folder)
print(f'Amount of datasets in {parquets_folder} directory is: {len(files)}')

# files.remove(files[0])

files = [[i, os.path.getsize(os.path.join(parquets_folder, i)) / 1024**2] for i in files]
files.sort(key=lambda x: x[1])

threads_amount = 4

if len(files) % threads_amount == 0:
    files_in_batch = len(files) // threads_amount
else:
    files_in_batch = len(files) // threads_amount + 1

    
thread_dfs = {thread_idx: [] for thread_idx in range(threads_amount)}

current_thread_id = 0

for file_idx in range(0, len(files), threads_amount):
    
    if current_thread_id == threads_amount:
        current_thread_id = 0
        
    thread_dfs[current_thread_id] += files[file_idx: file_idx + threads_amount]
    current_thread_id += 1



In [None]:
files

In [None]:
sum([i[1] for i in thread_dfs[0]])

a = {}

all_parquets = []

for idx, j in thread_dfs.items():
    a[idx] = sum([i[1] for i in j])
    
#     parquets = [i[1] for i in j]
    all_parquets += [i[0] for i in j]
a

In [None]:
all_files = [i[0] for i in files]
all_files.sort()
all_parquets.sort()
all_parquets == all_files

In [None]:
# overlap check

for f in files:
    if f[0] not in all_parquets:
        print(f)

In [None]:
def update_DB2_row(cursor, row: dict):
    '''
    row: dict = {
                "LATITUDE": "",
                "LONGITUDE": "",
                "ELEC_ACCESS_PERCENT": "",
                "ELEC_CONSUMPTION_KWH_MONTH": "",
                "ELEC_CONSUMPTION_STD_KWH_MONTH": ""
                }
    '''
    
    sql = f"""
        UPDATE "USER1"."{config["COUNTRY_TABLE"]}"
            SET
              "ELEC_ACCESS_PERCENT" = '{row['ELEC_ACCESS_PERCENT']}',     
              "ELEC_CONSUMPTION_KWH_MONTH" = '{row['ELEC_CONSUMPTION_KWH_MONTH']}',
              "ELEC_CONSUMPTION_STD_KWH_MONTH" = '{row['ELEC_CONSUMPTION_STD_KWH_MONTH']}'
            WHERE 
                (LATITUDE = {row['LATITUDE']}) AND 
                (LONGITUDE = {row['LONGITUDE']})
        """
    
    cursor.execute(sql)

In [None]:
def process_thread_dataframes(df_names, start_from, processing_state, cursor, thread_id):
    
    print(f'Starting thread_id: {thread_id}')
    time.sleep(thread_id+1)

    updated_buildings_in_thread = 0
    
    for df_idx, df_name in enumerate(df_names):
        
        if df_idx >= start_from:
            
            print(f'thread_id: {thread_id} processing df: {df_idx}')
            time.sleep(thread_id+1)

            df = pd.read_parquet(os.path.join(parquets_folder, df_name))
            df = df.rename(
                    columns={
                        'elec access (%)': 'elec_access',
                        'cons (kWh/month)': 'elec_cons',
                        'std cons (kWh/month)': 'elec_std_cons',
                    })


            for row in df.itertuples():

                item = {
                        "LATITUDE": row.latitude,
                        "LONGITUDE": row.longitude,
                        "ELEC_ACCESS_PERCENT": round(row.elec_access, 5),
                        "ELEC_CONSUMPTION_KWH_MONTH": round(row.elec_cons, 5),
                        "ELEC_CONSUMPTION_STD_KWH_MONTH": round(row.elec_std_cons, 5),
                        }

                try:

                    update_DB2_row(cursor, item)
                    updated_buildings_in_thread += 1

                except Exception as e:
                    print(f'Thread id {thread_id} exception occured: {e}')
                    print(traceback.format_exc())
                    cursor = conn.cursor()

                    update_DB2_row(cursor, item)
                    updated_buildings_in_thread += 1


            processing_state[f'processed_dfs_in_thread_{thread_id}'] = f'Processed {df_idx + 1} of {len(df_names)} | {round(100*(df_idx + 1)/len(df_names), 2)}% | Updated: {updated_buildings_in_thread}'
            if thread_id == 0:

                try:
                    time.sleep(1)
                    log_state_to_bucket(processing_state)
                except Exception as le:
                    print(le)
            
            
    processing_state[f'processed_dfs_in_thread_{thread_id}'] = f'Processed {df_idx + 1} of {len(df_names)} | {round(100*(df_idx + 1)/len(df_names), 2)}% | Updated: {updated_buildings_in_thread} | THREAD FINISHED'
        
    try:
        log_state_to_bucket(processing_state)
    except Exception as le:
        print(le)

In [None]:

processing_state = {}

threads = []

start_dfs = [
    0, # 0 thread
    0, # 1 thread
    0, # 2 thread
    0  # 3 thread
]

for start_df_id, (thread_id, df_names) in zip(start_dfs, thread_dfs.items()):
    
    df_names = [i[0] for i in df_names]
    
    thread = threading.Thread(target=process_thread_dataframes, args=(df_names, start_df_id, processing_state, cursor, thread_id, ))
    threads.append(thread)
    
for thread in threads:
    thread.start()
    
for thread in threads:
    thread.join()