# - Data Warehouse -

### Het samenvoegen van bestaande tabellen voor het uiteindelijke 'Datawarehouse'

##### Importeren van benodigde dependencies

In [1]:
import pandas as pd
import pyodbc

import warnings
warnings.filterwarnings("ignore")

# Importeren van de create_connection en run_query functies uit de database_utils.py file
from utils.database_utils import create_connection, run_query, create_connectionLocal

##### 1 - Opbouwen van dataframes voor elke database-tabel

In [2]:
aenc_employee_df = run_query("SELECT * FROM employee", "AenC")

aw_employee_df = run_query("SELECT * FROM EmployeeData", "AdventureWorks2019")

nw_employee_df = run_query("SELECT * FROM Employees", "Northwind")

DatabaseError: Execution failed on sql 'SELECT * FROM EmployeeData': ('42S02', "[42S02] [Microsoft][ODBC SQL Server Driver][SQL Server]Invalid object name 'EmployeeData'. (208) (SQLExecDirectW)")

Je kunt vergelijkbare queries uitvoeren voor andere tabellen die je wilt opnemen.

#### 2 - Data transformatie

Na het opbouwen van de dataframes voor elke database-tabel, kunnen we beginnen met het transformeren van de data. Dit omvat het samenvoegen van tabellen, het toepassen van filters, het uitvoeren van berekeningen, enzovoort.

In [None]:
# Add a new surrogate key to each dataframe
aenc_employee_df['IDSK'] = range(1, len(aenc_employee_df) + 1)
aw_employee_df['IDSK'] = range(1, len(aw_employee_df) + 1)
nw_employee_df['IDSK'] = range(1, len(nw_employee_df) + 1)

In [3]:
alter_table_query = """
ALTER TABLE EmployeeData
ADD employee_id INT Identity(1,1);
"""

db_name = "adventureworks2019"
conn, cursor = create_connectionLocal(db_name)
cursor.execute(alter_table_query)
conn.commit()
conn.close()

TypeError: create_connection() takes 0 positional arguments but 1 was given

In [None]:
create_table_query = """
CREATE TABLE EmployeeData (
    IDSK INT PRIMARY KEY,
    employee_id VARCHAR(255),
    fname VARCHAR(255),
    lname VARCHAR(255),
    employee_title VARCHAR(255),
    birthdate DATE,
    address VARCHAR(255),
    city VARCHAR(255),
    phone_number VARCHAR(255),
    hiredate DATE,
    department_id VARCHAR(255),  
);
"""

# Create the table in SQL Server
db_name = "test"
conn, cursor = create_connection(db_name)
cursor.execute(create_table_query)
conn.commit()
conn.close()

In [None]:
aw_employee_df

# Select the relevante columns from each table
nw_employee_df = nw_employee_df[['EmployeeID', 'LastName', 'FirstName', 'Title', 'BirthDate', 'HireDate', 'Address', 'City', 'HomePhone']]
aenc_employee_df = aenc_employee_df[['emp_id', 'emp_fname', 'emp_lname', 'dept_id', 'street','phone', 'start_date', 'birth_date', 'sex']]
aw_employee_df = aw_employee_df[['BusinessEntityID', 'NationalIDNumber', 'JobTitle', 'BirthDate', 'Gender', 'HireDate', 'DepartmentID', 'employee_id']]

# Add a new surrogate key to each dataframe
nw_employee_df['IDSK'] = range(1, len(nw_employee_df) + 1)
aenc_employee_df['IDSK'] = range(1, len(aenc_employee_df) + 1)
aw_employee_df['IDSK'] = range(1, len(aw_employee_df) + 1)

# Merge the dataframes on the new key
merged_df = pd.merge(aenc_employee_df, aw_employee_df, on="IDSK", how="outer")
merged_df = pd.merge(merged_df, nw_employee_df, on="IDSK", how="outer")

# Reset the index and start from 1
merged_df.reset_index(drop=True, inplace=True)
merged_df.index = merged_df.index + 1

# Rename the index to 'PrimaryKey'
merged_df.index.name = 'IDSK'

merged_df.columns

In [None]:
# Combineer kolommen met vergelijkbare informatie
merged_df['EmployeeID'] = merged_df['EmployeeID'].combine_first(merged_df['emp_id']).combine_first(merged_df['employee_id'])
merged_df['FirstName'] = merged_df['FirstName'].combine_first(merged_df['emp_fname'])
merged_df['LastName'] = merged_df['LastName'].combine_first(merged_df['emp_lname'])
merged_df['Address'] = merged_df['Address'].combine_first(merged_df['street'])
merged_df['HomePhone'] = merged_df['HomePhone'].combine_first(merged_df['phone'])
merged_df['BirthDate'] = merged_df['BirthDate_x'].combine_first(merged_df['BirthDate_y']).combine_first(merged_df['birth_date'])
merged_df['HireDate'] = merged_df['HireDate_x'].combine_first(merged_df['HireDate_y']).combine_first(merged_df['start_date'])
merged_df['EmployeeTitle'] = merged_df['JobTitle'].combine_first(merged_df['JobTitle']).combine_first(merged_df['Title'])
merged_df['DepartmentID'] = merged_df['DepartmentID'].combine_first(merged_df['dept_id'])
# Verwijder de originele kolommen die nu samengevoegd zijn
columns_to_drop = ['emp_id', 'emp_fname', 'emp_lname', 'employee_id', 'phone', 'street', 'birth_date', 'start_date', 'BirthDate_x', 'BirthDate_y', 'HireDate_x', 'HireDate_y']
merged_df.drop(columns=columns_to_drop, inplace=True)

# Controleer de uiteindelijke kolommen
print(merged_df.columns)
print(merged_df.head())

In [None]:
# Vervang NaN-waarden door SQL NULL-sleutelwoord
merged_df = merged_df.where(pd.notnull(merged_df), None)

#### 3 - Data loading

Na het transformeren van de data, kunnen we de resulterende dataframe in de doeltabel van ons datawarehouse laden.

In [None]:
# Connect to the database
conn, cursor = create_connection('test')

for index, row in merged_df.iterrows():
    query = f"""
        INSERT INTO EmployeeData (
            IDSK, employee_id, fname, lname, employee_title, birthdate, address, city, phone_number, hiredate, department_id
        ) 
        VALUES (
            {row['IDSK']}, '{row['EmployeeID']}', '{row['FirstName']}', '{row['LastName']}', '{row['Title']}', '{row['BirthDate']}', '{row['Address']}', '{row['City']}', '{row['HomePhone']}', '{row['HireDate']}', '{row['dept_id']}'
        )
    """
    # Execute the query
    cursor.execute(query)

# Commit the changes and close the connection
conn.commit()
conn.close()

**Note:** Voeg indien nodig zoveel Markdown- of codeblokken toe als nodig is.

#### 4 -  Data Quality Checks

Voeg controles toe om de kwaliteit van de gegevens te waarborgen voordat ze worden geladen in het datawarehouse:

In [None]:
# Controleren op ontbrekende waarden
missing_values = merged_df.isnull().sum()

# Controleren op duplicaten
duplicate_rows = merged_df.duplicated().sum()

# Weergave van resultaten
print("Aantal ontbrekende waarden:", missing_values)
print("Aantal duplicaten:", duplicate_rows)

**Note:** Dit is optioneel, het leek mij opzich best handig om te doen.