## Implementeer elk gemaakt ETL-schema in Python

**imports & connection**

In [1]:
import pandas as pd
import pyodbc


In [2]:
DB = {'servername': 'LAPTOP-LPE28RPE\SQLEXPRESS', 
    'database': 'United_outdoors'}

export_conn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + DB['servername'] + 
                              ';DATABASE=' + DB['database'])

export_cursor = export_conn.cursor()
export_cursor   

<pyodbc.Cursor at 0x281edc0a9b0>

**Database connection**

In [3]:

Adventure = {
    'servername' : 'LAPTOP-LPE28RPE\SQLEXPRESS',
    'database' : 'AdventureWorks2019'
}


Northwind = {
    'servername' : 'LAPTOP-LPE28RPE\SQLEXPRESS',
    'database' : 'Northwind'
}

access_db_path = r'C:\Users\Humberto de Castro\OneDrive\Desktop\SEM4\AenC\aenc.accdb'

#Connect to AdventureWorks
Adventure_conn = pyodbc.connect(f"DRIVER={{SQL Server}};SERVER={Adventure['servername']};DATABASE={Adventure['database']};Trusted_Connection=yes;")
Adventure_cursor = Adventure_conn.cursor()

#Connect to Northwind
Northwind_conn = pyodbc.connect(f"DRIVER={{SQL Server}};SERVER={Northwind['servername']};DATABASE={Northwind['database']};Trusted_Connection=yes;")
Northwind_cursor = Northwind_conn.cursor()

#Connect to AenC
AenC_conn = pyodbc.connect(f"DRIVER={{Microsoft Access Driver (*.mdb, *.accdb)}};DBQ={access_db_path};")



In [4]:
AdventureWorks_queries = {
    'SalesOrderHeader' : 'SELECT * FROM Sales.SalesOrderHeader',
    'SalesOrderDetail' : 'SELECT * FROM Sales.SalesOrderDetail'
}

Northwind_queries = {
    'Orders' : 'SELECT * FROM Orders',
    'OrderDetails' : 'SELECT * FROM [Order Details]'
}

AenC_queries = {
    'sales_order' : 'SELECT * FROM sales_order',
    'sales_order_item' : 'SELECT * FROM sales_order_item'
}

dataframes = {}

# Lees elke tabel in een DataFrame

for table_name, query in AdventureWorks_queries.items():
    dataframes[table_name] = pd.read_sql_query(query, Adventure_conn)

for table_name, query in Northwind_queries.items():
    dataframes[table_name] = pd.read_sql_query(query, Northwind_conn)

for table_name, query in AenC_queries.items():
    dataframes[table_name] = pd.read_sql_query(query, AenC_conn)

#als je ik elk tabel als een dataframe/ variabele wil behandelen of aanroepen moet ik dit uitvoeren.
for table_name, df in dataframes.items():
    globals()[table_name] = df


  dataframes[table_name] = pd.read_sql_query(query, Adventure_conn)
  dataframes[table_name] = pd.read_sql_query(query, Northwind_conn)
  dataframes[table_name] = pd.read_sql_query(query, AenC_conn)


### Order

**Extract**

In [5]:
#eerst horizontaal samenvoegen per database:
AdventureWorks_Order = pd.merge(SalesOrderHeader, SalesOrderDetail, on = 'SalesOrderID')
Northwind_Order = pd.merge(Orders, OrderDetails, on = 'OrderID')
AenC_Order = pd.merge(sales_order, sales_order_item, on = 'id')

**Transform**

**Check of orderID's conflicten**

In [6]:
AdventureWorks_Order.rename(columns={
    'SalesOrderID': 'OrderID',
    'SalesOrderDetailID': 'OrderDetailID',
    'OrderQty': 'Quantity',
    'UnitPriceDiscount': 'Discount',
    'Order_Date': 'OrderDate',
    'SalesPersonID' : 'EmployeeID'
}, inplace=True)

# change shipvia to string 
#Northwind_Order['ShipVia'] = Northwind_Order['ShipVia'].astype(str)
Northwind_Order.rename(columns={
    'ShipVia': 'ShipMethodID'
}, inplace=True)

AenC_Order.rename(columns={
    'id': 'OrderID',
    'item_id': 'OrderDetailID',
    'quantity': 'Quantity',
    'order_date': 'OrderDate',
    'cust_id': 'CustomerID',
    'sales_rep': 'EmployeeID',
    'prod_id': 'ProductID'
}, inplace=True)

order_order_ids = set(AdventureWorks_Order['OrderID'])
order_details_order_ids = set(Northwind_Order['OrderID'])
Order_AenC_ids = set(AenC_Order['OrderID'])

# Zoek conflicten tussen AdventureWorks en Northwind
conflicting_order_ids_1 = order_order_ids.intersection(order_details_order_ids)

# Zoek conflicten tussen AdventureWorks en AenC
conflicting_order_ids_2 = order_order_ids.intersection(Order_AenC_ids)

# Zoek conflicten tussen Northwind en AenC
conflicting_order_ids_3 = order_details_order_ids.intersection(Order_AenC_ids)

# Combineer alle conflicterende ID's
all_conflicting_order_ids = conflicting_order_ids_1.union(conflicting_order_ids_2).union(conflicting_order_ids_3)

if all_conflicting_order_ids:
    print("Conflicting Order IDs found:", all_conflicting_order_ids)
else:
    print("No conflicts found in Order IDs.")

# Display the conflicting Order IDs (if any)
conflicts_df = pd.DataFrame(list(all_conflicting_order_ids), columns=['Conflicting Order IDs'])
print(conflicts_df)

No conflicts found in Order IDs.
Empty DataFrame
Columns: [Conflicting Order IDs]
Index: []


**Alle dubbele ID's met een suffix zetten per database**

In [7]:
# Prefixes toevoegen
AdventureWorks_Order['EmployeeID'] = AdventureWorks_Order['EmployeeID'].apply(lambda x: f'AW_{x}')
Northwind_Order['EmployeeID'] = Northwind_Order['EmployeeID'].apply(lambda x: f'NW_{x}')
AenC_Order['EmployeeID'] = AenC_Order['EmployeeID'].apply(lambda x: f'AC_{x}')

AdventureWorks_Order['ShipMethodID'] = AdventureWorks_Order['ShipMethodID'].apply(lambda x: f'AW_{x}')
Northwind_Order['ShipMethodID'] = Northwind_Order['ShipMethodID'].apply(lambda x: f'NW_{x}')
# AenC_Order heeft geen ShipMethodID

AdventureWorks_Order['ProductID'] = AdventureWorks_Order['ProductID'].apply(lambda x: f'AW_{x}')
Northwind_Order['ProductID'] = Northwind_Order['ProductID'].apply(lambda x: f'NW_{x}')
AenC_Order['ProductID'] = AenC_Order['ProductID'].apply(lambda x: f'AC_{x}')

AdventureWorks_Order['CustomerID'] = AdventureWorks_Order['CustomerID'].apply(lambda x: f'AW_{x}')
Northwind_Order['CustomerID'] = Northwind_Order['CustomerID'].apply(lambda x: f'NW_{x}')
AenC_Order['CustomerID'] = AenC_Order['CustomerID'].apply(lambda x: f'AC_{x}')

In [8]:


# Zorg ervoor dat alle vereiste kolommen bestaan en selecteer ze
required_columns = ['OrderID', 'OrderDetailID', 'Quantity', 'UnitPrice', 'Discount', 'OrderDate', 'CustomerID', 'EmployeeID', 'ShipMethodID', 'ProductID']

for col in required_columns:
    if col not in AdventureWorks_Order.columns:
        AdventureWorks_Order[col] = None
    if col not in Northwind_Order.columns:
        Northwind_Order[col] = None
    if col not in AenC_Order.columns:
        AenC_Order[col] = None

# Selecteer de kolommen in de juiste volgorde
AdventureWorks_Order = AdventureWorks_Order[required_columns]
Northwind_Order = Northwind_Order[required_columns]
AenC_Order = AenC_Order[required_columns]

# Combineer de dataframes
Order = pd.concat([AdventureWorks_Order, Northwind_Order, AenC_Order], ignore_index=True)

# Toon het resultaat
#print(combined_df)
print(Order.dtypes)

OrderID           object
OrderDetailID     object
Quantity          object
UnitPrice        float64
Discount         float64
OrderDate         object
CustomerID        object
EmployeeID        object
ShipMethodID      object
ProductID         object
dtype: object


  Order = pd.concat([AdventureWorks_Order, Northwind_Order, AenC_Order], ignore_index=True)


In [10]:
# Ensure correct data types and formats
Order['OrderDate'] = pd.to_datetime(Order['OrderDate'], errors='coerce')

# Insert data into the table
for index, row in Order.iterrows():
    try:
        query = """
        INSERT INTO [Order] (OrderID, OrderDetailID, Quantity, UnitPrice, Discount, OrderDate, CustomerID, EmployeeID, ShipMethodID, ProductID)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """
        # Ensure values are in the correct format
        params = (
            row['OrderID'],
            row['OrderDetailID'] if pd.notnull(row['OrderDetailID']) else None,
            float(row['Quantity']) if pd.notnull(row['Quantity']) else None,
            float(row['UnitPrice']) if pd.notnull(row['UnitPrice']) else None,
            float(row['Discount']) if pd.notnull(row['Discount']) else None,
            row['OrderDate'].strftime('%Y-%m-%d %H:%M:%S') if pd.notnull(row['OrderDate']) else None,
            row['CustomerID'],
            row['EmployeeID'],
            row['ShipMethodID'] if pd.notnull(row['ShipMethodID']) else None,
            row['ProductID']
        )
        export_cursor.execute(query, params)
    except pyodbc.Error as e:
        print(f"An error occurred: {e}")
        print(query)
        print("Parameters:", params)


export_conn.commit()

#test