Imports & Env

In [None]:
import datetime
import os
from pymongo import MongoClient, collection
import mysql.connector
from mysql.connector import Error
from mysql.connector.connection import MySQLConnection
from mysql.connector.cursor import MySQLCursor
from mysql.connector.cursor_cext import CMySQLCursor
import pandas as pd
%load_ext dotenv
%dotenv

Load ENV

In [None]:
host = os.getenv("DBHost")
mongoPort = os.getenv("mongoPort")
mySQLPort = os.getenv("mySQLPort")
user = os.getenv("DBUser")
password = os.getenv("DBPass")

Define methods for DB connections

In [None]:
def DBTGetMongoConnection(database, collection):
    client = MongoClient('mongodb://'+user+":"+password+"@"+host+":"+str(mongoPort))
    
    db = client[database]
    if collection not in db.list_collection_names():
        return LookupError
    collection = db[collection]
    return collection

def DBTGetMySQLConnection(database):
    try:
        connection = mysql.connector.connect(
            host=host,
            port=mySQLPort,
            database = database,
            user=user,
            password=password
        )

        if connection.is_connected():
            return connection

    except Error as e:
        print("Error while connecting to MySQL", e)

Define sql helpers

In [None]:
# Gets all items from the selected table
def DBTGetAllItemsInSQLTable(cursor, table):
    query = f"SELECT * FROM {table}"
    cursor.execute(query)
    entries = cursor.fetchall()
    columnNames = [desc[0] for desc in cursor.description]
    return pd.DataFrame(entries,columns=columnNames)

# ReEncoder to fix some encoding errors in the database causing mismatches with the kadaster dataset
def DBTReEncodeAddresses(dataframe: pd.DataFrame, columnName):
    dataframe[columnName] = dataframe[columnName].astype(str).apply(lambda x: x.encode('latin-1',"ignore").decode('utf8'))
    return dataframe

# Formatter to make addresses in the SQL database compatible with the kadaster dataset
def DBTConcatMySQLAddresses(dataframe: pd.DataFrame):
    dataframe["adres"] = (
        dataframe["straat"].astype(str) 
        +" "
        +dataframe["huisnummer"].astype(str)
        +dataframe["huisletter"].apply(lambda x: x if x is not None else "").apply(str.upper)
        +dataframe["huisnummertoevoeging"].apply(lambda x: " "+ x if x is not None else "")
    )
    return dataframe


def DBTSaveDFToSQL(dataframe: pd.DataFrame, connection: MySQLConnection, table_name: str, primary_key_column: str) -> None:
    cursor: CMySQLCursor = connection.cursor()

    # Check if the table exists, create it if it doesn't
    cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
    table_exists = cursor.fetchone()

    if table_exists is None:
        # Create table with all columns from DataFrame
        columns = ', '.join([f'{column} TEXT' for column in dataframe.columns])
        create_table_query = f"CREATE TABLE {table_name} ({columns}, PRIMARY KEY ({primary_key_column}(255)));"
        cursor.execute(create_table_query)

    # Check if primary key column exists, add it if it doesn't
    cursor.execute(f"SHOW COLUMNS FROM {table_name} WHERE Field = %s", (primary_key_column,))
    primary_key_exists = cursor.fetchone()

    if primary_key_exists is None:
        add_primary_key_query = f"ALTER TABLE {table_name} ADD COLUMN {primary_key_column} INT PRIMARY KEY AUTO_INCREMENT;"
        cursor.execute(add_primary_key_query)

    # Get the columns present in the database table
    cursor.execute(f"SHOW COLUMNS FROM {table_name}")
    db_columns = [column[0] for column in cursor.fetchall()]

    # Iterate over the rows in the DataFrame
    for _, row in dataframe.iterrows():
        primary_key_value = row[primary_key_column]

        # Check if the record already exists in the table
        query = f"SELECT * FROM {table_name} WHERE {primary_key_column} = %s"
        cursor.execute(query, (primary_key_value,))
        existing_record = cursor.fetchone()

        # If the record doesn't exist, insert a new one
        if existing_record is None:
            columns = [column for column in dataframe.columns if column in db_columns]
            values = ', '.join(['%s'] * len(columns))
            insert_query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({values})"
            cursor.execute(insert_query, tuple(row[column] for column in columns))

        # If the record exists, update the empty values
        else:
            update_values = [(column, row[column]) for column in dataframe.columns if
                             existing_record[dataframe.columns.get_loc(column)] is None]
            if update_values:
                set_values = ', '.join([f"{column} = %s" for column, _ in update_values])
                update_query = f"UPDATE {table_name} SET {set_values} WHERE {primary_key_column} = %s"
                cursor.execute(update_query, tuple([value for _, value in update_values] + [primary_key_value]))
        
    connection.commit()
    cursor.close()



Get initial data from the datawarehouse

In [None]:
mongoConnection = DBTGetMongoConnection("kadasterdata_nl","kadasterdata")
kadasterData = pd.DataFrame(mongoConnection.find({}))
kadasterData



Get initial data from mySQL

In [None]:
SQLConnection = DBTGetMySQLConnection("dbDEDSv2")
locatieData = DBTGetAllItemsInSQLTable(SQLConnection.cursor(),"Locatie")
# Use the reEncoder to fix some encoding errors in the "straat" column and format the loose address parts into one column called "adres"
LocatieData = DBTConcatMySQLAddresses(DBTReEncodeAddresses(locatieData,"straat"))

# SQLConnection = DBTGetMySQLConnection("Test")

# DBTSaveDFToSQL(locatieData,SQLConnection,"Locatie","id")



Merge the dataset and discard unused rows

In [None]:
kadasterData.drop(kadasterData.index[kadasterData['address'] == "Van Beverningkstraat 103"], inplace = True)
