In [46]:
import pyodbc
import sqlite3
import pandas as pd

export_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=(localdb)\\MSSQLLocalDB;'
    'DATABASE=projectdatawarehouse;'
    'Trusted_Connection=yes;'
)

export_cursor = export_conn.cursor()

import_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=(localdb)\\MSSQLLocalDB;'
    'DATABASE=projectsourcedatamodel;'
    'Trusted_Connection=yes;'
)

import_cursor = import_conn.cursor()

In [34]:
def clear_tables():
    tables = [
        'Fact_EmployeePerformance',
        'Fact_Sales',
        'Fact_Purchase',
        'Dim_Employee',
        'Dim_Location',
        'Dim_DateTime',
        'Dim_Product',
        'Dim_Customer'
    ]

    for table in tables:
        export_cursor.execute(f"DELETE FROM {table}")
        export_conn.commit()

    print("Alle tabellen zijn geleegd.")

clear_tables()

Alle tabellen zijn geleegd.


In [35]:
def move_dimcustomer(import_conn, export_conn):
    customer = pd.read_sql_query("SELECT CustomerID, Fname, Lname, Phone, Address, City, Region, State, Country FROM Customer", import_conn)
    customer_customer_demo = pd.read_sql_query("SELECT CustomerID, CustomerTypeID FROM CustomerCustomerDemo", import_conn)
    customer_demographics = pd.read_sql_query("SELECT CustomerTypeID, CustomerDesc FROM CustomerDemographics", import_conn)

    merged_df = customer.merge(customer_customer_demo, on='CustomerID', how='left')
    merged_df = merged_df.merge(customer_demographics, on='CustomerTypeID', how='left')

    merged_df['FullName'] = merged_df['Fname'] + ' ' + merged_df['Lname']
    merged_df['Region'] = merged_df['Region'].combine_first(merged_df['State']) 
    
    final_df = merged_df[[
        'CustomerID', 
        'FullName', 
        'CustomerDesc', 
        'Phone', 
        'Address', 
        'City', 
        'Region', 
        'Country'
    ]].rename(columns={'CustomerDesc': 'CustomerType'})

    cursor = export_conn.cursor()
    
    for index, row in final_df.iterrows():
        try:
            query = '''
                INSERT INTO Dim_Customer 
                (CustomerID, FullName, CustomerType, PhoneNumber, Address, City, Region, Country) 
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            '''
            cursor.execute(query, tuple(row.fillna('')))
        except Exception as e:
            print(f"Fout bij invoegen customer {row['CustomerID']}: {e}")
    
    export_conn.commit()
    print(f"{len(final_df)} customers verwerkt in Dim_Customer")

move_dimcustomer(import_conn, export_conn)

  customer = pd.read_sql_query("SELECT CustomerID, Fname, Lname, Phone, Address, City, Region, State, Country FROM Customer", import_conn)
  customer_customer_demo = pd.read_sql_query("SELECT CustomerID, CustomerTypeID FROM CustomerCustomerDemo", import_conn)
  customer_demographics = pd.read_sql_query("SELECT CustomerTypeID, CustomerDesc FROM CustomerDemographics", import_conn)


217 customers verwerkt in Dim_Customer


In [36]:
def add_sales_customer_ids(import_conn, export_conn):
    
    sales_customers = pd.read_sql_query("SELECT CustomerID FROM Sales_Customer", import_conn)
    
    existing_customers = pd.read_sql_query("SELECT CustomerID FROM Dim_Customer", export_conn)

    new_customers = sales_customers[~sales_customers['CustomerID'].isin(existing_customers['CustomerID'])]
    
    cursor = export_conn.cursor()
    success_count = 0
    
    for customer_id in new_customers['CustomerID']:
        try:
            
            query = '''
                INSERT INTO Dim_Customer 
                (CustomerID) 
                VALUES (?)
            '''
            cursor.execute(query, (str(customer_id),))  
            success_count += 1
        except Exception as e:
            
            pass  
    
    export_conn.commit()
    print(f"{success_count} nieuwe CustomerID's toegevoegd aan Dim_Customer")
    print(f"{len(sales_customers) - success_count} CustomerID's bestonden al en zijn overgeslagen")

add_sales_customer_ids(import_conn, export_conn)

  sales_customers = pd.read_sql_query("SELECT CustomerID FROM Sales_Customer", import_conn)
  existing_customers = pd.read_sql_query("SELECT CustomerID FROM Dim_Customer", export_conn)


19694 nieuwe CustomerID's toegevoegd aan Dim_Customer
126 CustomerID's bestonden al en zijn overgeslagen


In [37]:
def move_dimemployee():
    import_cursor.execute("""
        SELECT 
            e.EmployeeID,
            e.FirstName + ' ' + e.LastName AS FullName,
            COALESCE(e.JobTitle, e.Title, 'Onbekend') AS JobTitle,
            d.dept_name AS Department,
            e.HireDate,
            e.BirthDate,
            NULL AS EmailAddress,
            NULL AS ManagerID  
        FROM Employee e
        LEFT JOIN Department d ON e.DepartmentID = d.dept_id
    """)
    
    employees = import_cursor.fetchall()
    
    for employee in employees:
        export_cursor.execute("""
            INSERT INTO Dim_Employee (
                EmployeeID,
                FullName,
                JobTitle,
                Department,
                HireDate,
                BirthDate,
                EmailAddress,
                ManagerID
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """, employee)
    
    import_cursor.execute("""
        SELECT e.EmployeeID, e.manager_id
        FROM Employee e
        WHERE e.manager_id IS NOT NULL
    """)
    
    manager_pairs = import_cursor.fetchall()
    
    for emp_id, manager_id in manager_pairs:
        export_cursor.execute("""
            UPDATE Dim_Employee
            SET ManagerID = ?
            WHERE EmployeeID = ?
            AND EXISTS (SELECT 1 FROM Dim_Employee WHERE EmployeeID = ?)  
        """, (manager_id, emp_id, manager_id))
    
    export_conn.commit()
    print(f"{len(employees)} employees imported into Dim_Employee")

move_dimemployee()

351 employees imported into Dim_Employee


In [38]:
def move_dimproduct(import_conn, export_conn):
    product = pd.read_sql_query("""
        SELECT 
            ProductID, 
            Name AS ProductName,
            Color,
            StandardCost,
            ListPrice,
            ProductLine,
            Discontinued,
            CategoryID
        FROM Product
    """, import_conn)
    
    category = pd.read_sql_query("""
        SELECT 
            CategoryID, 
            CategoryName AS ProductCategoryName
        FROM Category
    """, import_conn)
    
    merged_df = product.merge(category, on='CategoryID', how='left')
    
    merged_df['DiscontinuedDate'] = merged_df['Discontinued'].apply(
        lambda x: pd.Timestamp.today().date() if x == 1 else None
    )

    final_df = merged_df[[
        'ProductID', 
        'ProductName', 
        'ProductCategoryName', 
        'Color', 
        'StandardCost', 
        'ListPrice', 
        'ProductLine', 
        'DiscontinuedDate'
    ]]
 
    cursor = export_conn.cursor()
    
    for index, row in final_df.iterrows():
        try:
            query = '''
                INSERT INTO Dim_Product 
                (ProductID, ProductName, ProductCategoryName, Color, 
                 StandardCost, ListPrice, ProductLine, DiscontinuedDate) 
                SELECT ?, ?, ?, ?, ?, ?, ?, ?
                WHERE NOT EXISTS (
                    SELECT 1 FROM Dim_Product WHERE ProductID = ?
                )
            '''
            values = tuple(row.fillna('') if isinstance(row, pd.Series) else row) + (row['ProductID'],)
            cursor.execute(query, values)
        except Exception as e:
            print(f"Fout bij invoegen product {row['ProductID']}: {e}")
    
    export_conn.commit()
    print(f"{len(final_df)} products verwerkt in Dim_Product")

# Roep de functie aan
move_dimproduct(import_conn, export_conn)

  product = pd.read_sql_query("""
  category = pd.read_sql_query("""


585 products verwerkt in Dim_Product


In [39]:

def move_dimlocation(import_conn, export_conn):
    customer = pd.read_sql_query("SELECT CustomerID, Address AS StreetAddress, City, State FROM Customer", import_conn)
    state = pd.read_sql_query("SELECT State_id, State_name AS StateProvince, Country FROM State", import_conn)
    order = pd.read_sql_query("SELECT OrderID, CustomerID, EmployeeID FROM [Order]", import_conn)
    employee = pd.read_sql_query("SELECT EmployeeID, PostalCode FROM Employee", import_conn)
    emp_terr = pd.read_sql_query("SELECT EmployeeID, TerritoryID FROM EmployeeTerritories", import_conn)
    territories = pd.read_sql_query("SELECT TerritoryID, RegionID FROM Territories", import_conn)
    region = pd.read_sql_query("SELECT RegionID, RegionDescription AS Region FROM Region", import_conn)

    df = customer.merge(state, left_on='State', right_on='State_id', how='left') \
                 .merge(order, on='CustomerID', how='inner') \
                 .merge(employee, on='EmployeeID', how='inner') \
                 .merge(emp_terr, on='EmployeeID', how='inner') \
                 .merge(territories, on='TerritoryID', how='inner') \
                 .merge(region, on='RegionID', how='inner')

    final_df = df[[
        'StreetAddress', 
        'City', 
        'StateProvince', 
        'Region', 
        'Country', 
        'PostalCode'
    ]].drop_duplicates()

    cursor = export_conn.cursor()

    for index, row in final_df.iterrows():
        try:
            query = '''
                INSERT INTO Dim_Location 
                (StreetAddress, City, StateProvince, Region, Country, PostalCode)
                VALUES (?, ?, ?, ?, ?, ?)
            '''
            cursor.execute(query, tuple(row.fillna('')))
        except Exception as e:
            print(f"Fout bij invoegen customer {row['LocationID']}: {e}")

    export_conn.commit()
    print(f"{len(final_df)} locaties verwerkt in Dim_Location")

move_dimlocation(import_conn, export_conn)


  customer = pd.read_sql_query("SELECT CustomerID, Address AS StreetAddress, City, State FROM Customer", import_conn)
  state = pd.read_sql_query("SELECT State_id, State_name AS StateProvince, Country FROM State", import_conn)
  order = pd.read_sql_query("SELECT OrderID, CustomerID, EmployeeID FROM [Order]", import_conn)
  employee = pd.read_sql_query("SELECT EmployeeID, PostalCode FROM Employee", import_conn)
  emp_terr = pd.read_sql_query("SELECT EmployeeID, TerritoryID FROM EmployeeTerritories", import_conn)
  territories = pd.read_sql_query("SELECT TerritoryID, RegionID FROM Territories", import_conn)
  region = pd.read_sql_query("SELECT RegionID, RegionDescription AS Region FROM Region", import_conn)


464 locaties verwerkt in Dim_Location


In [40]:
def move_datetime(import_conn, export_conn):
    # Haal datumdata op uit alle relevante bronnen
    df_salesorderdate = pd.read_sql_query("""
        SELECT 
            OrderDate AS Date,
            YEAR(OrderDate) AS Year,
            DATEPART(QUARTER, OrderDate) AS Quarter,
            MONTH(OrderDate) AS Month,
            DAY(OrderDate) AS Day
        FROM Sales_SalesOrderHeader
    """, import_conn)
    
    df_purchaseorderdate = pd.read_sql_query("""
        SELECT 
            OrderDate AS Date,
            YEAR(OrderDate) AS Year,
            DATEPART(QUARTER, OrderDate) AS Quarter,
            MONTH(OrderDate) AS Month,
            DAY(OrderDate) AS Day
        FROM Purchasing_PurchaseOrderHeader
    """, import_conn)
    
    df_bonusdate = pd.read_sql_query("""
        SELECT 
            Bonus_date AS Date,
            YEAR(Bonus_date) AS Year,
            DATEPART(QUARTER, Bonus_date) AS Quarter,
            MONTH(Bonus_date) AS Month,
            DAY(Bonus_date) AS Day
        FROM Bonus
    """, import_conn)

    # Combineer alle datums en verwijder duplicaten
    unique_dates = pd.concat([df_salesorderdate, df_purchaseorderdate, df_bonusdate]).drop_duplicates(subset=['Date'])
    
    cursor = export_conn.cursor()

    for index, row in unique_dates.iterrows():
        try:
            query = '''
            MERGE INTO Dim_DateTime AS target
            USING (SELECT ? AS Date, ? AS Year, ? AS Quarter, ? AS Month, ? AS Day) AS source
            ON target.Date = source.Date
            WHEN NOT MATCHED THEN
                INSERT (Date, Year, Quarter, Month, Day)
                VALUES (source.Date, source.Year, source.Quarter, source.Month, source.Day);
            '''
            cursor.execute(query, (row['Date'], row['Year'], row['Quarter'], row['Month'], row['Day']))
        except Exception as e:
            print(f"Fout bij invoegen datum {row['Date']}: {e}")
    
    export_conn.commit()
    print(f"{len(unique_dates)} unieke datums verwerkt in Dim_DateTime")

# Roep de functie aan
move_datetime(import_conn, export_conn)

  df_salesorderdate = pd.read_sql_query("""
  df_purchaseorderdate = pd.read_sql_query("""
  df_bonusdate = pd.read_sql_query("""


1424 unieke datums verwerkt in Dim_DateTime


In [41]:
def move_feitsales(import_conn, export_conn):
    header = pd.read_sql_query("""
        SELECT 
            SalesOrderID,
            OrderDate AS SaleDateID,
            CustomerID,
            NULL AS LocationID,
            Freight,
            TaxAmt,
            TotalDue
        FROM Sales_SalesOrderHeader
    """, import_conn)

    detail = pd.read_sql_query("""
        SELECT 
            SalesOrderID,
            SalesOrderDetailID AS SalesID,
            ProductID,
            OrderQty AS Quantity,
            UnitPrice,
            UnitPriceDiscount,
            LineTotal
        FROM Sales_SalesOrderDetail
    """, import_conn)

    merged_df = detail.merge(header, on='SalesOrderID', how='inner')

    merged_df['DiscountAmount'] = merged_df['UnitPrice'] * merged_df['UnitPriceDiscount'] * merged_df['Quantity']
    merged_df['SalesAmount'] = merged_df['LineTotal'] - merged_df['DiscountAmount']
    merged_df['Profit'] = merged_df['TotalDue'] - (merged_df['LineTotal'] / merged_df['TotalDue'])

    final_df = merged_df[[
        'SalesID',
        'SaleDateID',
        'ProductID',
        'CustomerID',
        'LocationID',
        'Quantity',
        'UnitPrice',
        'DiscountAmount',
        'SalesAmount',
        'TaxAmt',
        'TotalDue',
        'Profit'
    ]]

    cursor = export_conn.cursor()
    for index, row in final_df.iterrows():
        try:
            query = '''
                INSERT INTO Fact_Sales 
                (SalesID, SaleDateID, ProductID, CustomerID, LocationID,
                 Quantity, UnitPrice, DiscountAmount, SalesAmount, TaxAmount, TotalAmount, Profit)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            '''
            values = [None if pd.isna(row[col]) else row[col] for col in final_df.columns]
            cursor.execute(query, values)
        except Exception as e:
            print(f"Fout bij invoegen verkoop (SalesID {row['SalesID']}): {e}")

    export_conn.commit()
    print(f"{len(final_df)} verkopen verwerkt in Fact_Sales")
move_feitsales(import_conn, export_conn)

  header = pd.read_sql_query("""
  detail = pd.read_sql_query("""


121317 verkopen verwerkt in Fact_Sales


In [None]:
def move_feitemployeeperformance(import_conn, export_conn):
    # Data ophalen uit de verschillende tabellen
    sales_header = pd.read_sql_query("""
        SELECT 
            SalesOrderID,
            OrderDate,
            SalesPersonID AS EmployeeID,
            TerritoryID,
            SubTotal,
            TotalDue
        FROM Sales_SalesOrderHeader
        WHERE SalesPersonID IS NOT NULL
    """, import_conn)
    
    sales_detail = pd.read_sql_query("""
        SELECT 
            SalesOrderID,
            OrderQty,
            ProductID,
            UnitPrice,
            LineTotal
        FROM Sales_SalesOrderDetail
    """, import_conn)
    
    bonus = pd.read_sql_query("""
        SELECT 
            Emp_id AS EmployeeID,
            Bonus_date,
            Bonus_amount AS BonusAmount
        FROM Bonus
    """, import_conn)
    
    sales_merged = sales_header.merge(sales_detail, on='SalesOrderID', how='left')
    
    sales_merged['OrderDate'] = pd.to_datetime(sales_merged['OrderDate']).dt.date
    bonus['Bonus_date'] = pd.to_datetime(bonus['Bonus_date']).dt.date
    
    # Groepeer sales per werknemer per datum om prestaties te berekenen
    performance = sales_merged.groupby(['EmployeeID', 'OrderDate']).agg(
        OrderCount=('SalesOrderID', 'nunique'),
        TotalSales=('TotalDue', 'sum'),
        AvgOrderValue=('TotalDue', 'mean')
    ).reset_index()
    
    # Bereken bonus statistieken per werknemer per datum
    bonus_agg = bonus.groupby(['EmployeeID', 'Bonus_date']).agg(
        TotalBonus=('BonusAmount', 'sum'),
        BonusAmount=('BonusAmount', 'first')
    ).reset_index()
    
    # Maak een complete set van alle unieke datums (zowel OrderDate als Bonus_date)
    all_dates = pd.concat([
        performance[['EmployeeID', 'OrderDate']].rename(columns={'OrderDate': 'PerformanceDate'}),
        bonus_agg[['EmployeeID', 'Bonus_date']].rename(columns={'Bonus_date': 'PerformanceDate'})
    ]).drop_duplicates()
    
    final_df = all_dates.merge(
        performance,
        left_on=['EmployeeID', 'PerformanceDate'],
        right_on=['EmployeeID', 'OrderDate'],
        how='left'
    ).merge(
        bonus_agg,
        left_on=['EmployeeID', 'PerformanceDate'],
        right_on=['EmployeeID', 'Bonus_date'],
        how='left'
    )
    
    final_df['OrderCount'] = final_df['OrderCount'].fillna(0)
    final_df['TotalSales'] = final_df['TotalSales'].fillna(0)
    final_df['AvgOrderValue'] = final_df['AvgOrderValue'].fillna(0)
    final_df['TotalBonus'] = final_df['TotalBonus'].fillna(0)
    final_df['BonusAmount'] = final_df['BonusAmount'].fillna(0)

    final_df = final_df[[
        'PerformanceDate',
        'EmployeeID',
        'OrderCount',
        'TotalBonus',
        'BonusAmount',
        'AvgOrderValue'
    ]].rename(columns={
        'PerformanceDate': 'PerformanceDateID'
    })
    cursor = export_conn.cursor()
    
    for index, row in final_df.iterrows():
        try:
            query = '''
                INSERT INTO Fact_EmployeePerformance 
                (PerformanceDateID, EmployeeID, OrderCount, TotalBonus, BonusAmount, AvgOrderValue) 
                SELECT ?, ?, ?, ?, ?, ?
                WHERE NOT EXISTS (
                    SELECT 1 FROM Fact_EmployeePerformance 
                    WHERE PerformanceDateID = ? AND EmployeeID = ?
                )
            '''
            values = (
                row['PerformanceDateID'],
                row['EmployeeID'],
                row['OrderCount'],
                row['TotalBonus'],
                row['BonusAmount'],
                row['AvgOrderValue'],
                row['PerformanceDateID'],
                row['EmployeeID']
            )
            cursor.execute(query, values)
        except Exception as e:
            print(f"Fout bij invoegen performance data voor werknemer {row['EmployeeID']} op {row['PerformanceDateID']}: {e}")
    
    export_conn.commit()
    print(f"{len(final_df)} performance records verwerkt in Fact_EmployeePerformance")
move_feitemployeeperformance(import_conn, export_conn)

  sales_header = pd.read_sql_query("""
  sales_detail = pd.read_sql_query("""
  bonus = pd.read_sql_query("""


5860 performance records verwerkt in Fact_EmployeePerformance


In [43]:
def move_feitpurchase(import_conn, export_conn):
    purchase_header = pd.read_sql_query("""
        SELECT 
            PurchaseOrderID AS PurchaseID,
            EmployeeID,
            VendorID,
            OrderDate,
            SubTotal,
            TaxAmt,
            Freight,
            TotalDue
        FROM Purchasing_PurchaseOrderHeader
    """, import_conn)
    
    purchase_detail = pd.read_sql_query("""
        SELECT 
            PurchaseOrderID,
            PurchaseOrderDetailID,
            OrderQty,
            ProductID,
            UnitPrice,
            LineTotal
        FROM Purchasing_PurchaseOrderDetail
    """, import_conn)
   
    purchase_merged = purchase_header.merge(purchase_detail, left_on='PurchaseID', right_on='PurchaseOrderID', how='inner')
 
    purchase_merged['InventoryValue'] = purchase_merged['LineTotal'] * 1.1

    final_df = purchase_merged[[
        'PurchaseID',
        'OrderDate',
        'ProductID',
        'EmployeeID',
        'VendorID',
        'OrderQty',
        'UnitPrice',
        'Freight',
        'TaxAmt',
        'TotalDue',
        'LineTotal',
        'InventoryValue'
    ]].rename(columns={
        'OrderDate': 'PurchaseDateID',
        'OrderQty': 'OrderQuantity'
    })
    
    # Voeg LocationID toe als NULL
    final_df['LocationID'] = None
    
    existing_ids = pd.read_sql_query("SELECT PurchaseID FROM Fact_Purchase", export_conn)['PurchaseID'].tolist()
 
    new_purchases = final_df[~final_df['PurchaseID'].isin(existing_ids)]

    cursor = export_conn.cursor()
    success_count = 0
    
    for index, row in new_purchases.iterrows():
        try:
            query = '''
                INSERT INTO Fact_Purchase 
                (PurchaseID, PurchaseDateID, ProductID, EmployeeID, LocationID, VendorID,
                 OrderQuantity, UnitPrice, Freight, TaxAmt, TotalDue, 
                 LineTotal, InventoryValue) 
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            '''
            values = (
                row['PurchaseID'],
                row['PurchaseDateID'],
                row['ProductID'],
                row['EmployeeID'],
                None,  # LocationID
                row['VendorID'],
                row['OrderQuantity'],
                row['UnitPrice'],
                row['Freight'],
                row['TaxAmt'],
                row['TotalDue'],
                row['LineTotal'],
                row['InventoryValue']
            )
            cursor.execute(query, values)
            success_count += 1
        except Exception as e:
            pass 
    
    export_conn.commit()
    print(f"{success_count} nieuwe purchase records toegevoegd aan Fact_Purchase")

move_feitpurchase(import_conn, export_conn)

  purchase_header = pd.read_sql_query("""
  purchase_detail = pd.read_sql_query("""
  existing_ids = pd.read_sql_query("SELECT PurchaseID FROM Fact_Purchase", export_conn)['PurchaseID'].tolist()


4012 nieuwe purchase records toegevoegd aan Fact_Purchase


In [44]:
export_conn.close()
import_conn.close()