importeren 

In [2]:
import pyodbc
import pandas as pd
import sqlite3 as sql
import re
import numpy as np

leegmaken van tabellen

In [3]:
# Connect to the database
conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=MSI\\SQLEXPRESS;' 
    'DATABASE=DWH;' 
    'Trusted_Connection=yes;'
)

cursor = conn.cursor()

# Get all tables with constraints
tables_query = """
SELECT DISTINCT OBJECT_NAME(parent_object_id) AS TableName
FROM sys.foreign_keys
"""

try:
    cursor.execute(tables_query)
    tables_with_constraints = [row[0] for row in cursor.fetchall()]
    
    # Disable constraints for each table individually
    for table in tables_with_constraints:
        print(f"  Disabling constraints for {table}...")
        cursor.execute(f"ALTER TABLE [{table}] NOCHECK CONSTRAINT ALL")
        conn.commit()
    
    print("All constraints disabled")
except Exception as e:
    print(f"Error disabling constraints: {str(e)}")


# Get  tables
cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")
all_tables = [row[0] for row in cursor.fetchall()]

# Empty elk table
for table_name in all_tables:
    print(f" Legen van tabel: {table_name}")
    try:
        cursor.execute(f"DELETE FROM [{table_name}];")
        conn.commit()
    except Exception as e:
        print(f"Error deleting from {table_name}: {str(e)}")

# Re-enable constraints
try:
    # Re-enable constraints for each table individually
    for table in tables_with_constraints:
        print(f"  Re-enabling constraints for {table}...")
        cursor.execute(f"ALTER TABLE [{table}] WITH CHECK CHECK CONSTRAINT ALL")
        conn.commit()
    
    print("All constraints re-enabled successfully")
except Exception as e:
    print(f"Error re-enabling constraints: {str(e)}")

print(" Alle tabellen in de database 'sdm' zijn geleegd.")

# Close connections at the very end
cursor.close()
conn.close()

  Disabling constraints for CRM_RETAILER_SITE...
  Disabling constraints for INVENTORY_LEVELS...
  Disabling constraints for ORDER_DETAILS...
  Disabling constraints for ORDER_HEADER...
  Disabling constraints for PRODUCT_FORECAST...
  Disabling constraints for RETAILER...
  Disabling constraints for RETAILER_CONTACT...
  Disabling constraints for RETURNED_ITEM...
  Disabling constraints for SALES_BRANCH...
  Disabling constraints for SALES_DEMOGRAPHIC...
  Disabling constraints for SALES_STAFF...
  Disabling constraints for SATISFACTION...
  Disabling constraints for TRAINING...
All constraints disabled
 Legen van tabel: AGE_GROUP
 Legen van tabel: COUNTRY
 Legen van tabel: COURSE
 Legen van tabel: CRM_RETAILER_SITE
 Legen van tabel: INVENTORY_LEVELS
 Legen van tabel: ORDER_DETAILS
 Legen van tabel: ORDER_HEADER
 Legen van tabel: PRODUCT
 Legen van tabel: PRODUCT_FORECAST
 Legen van tabel: RETAILER
 Legen van tabel: RETAILER_CONTACT
 Legen van tabel: RETAILER_SEGMENT
 Legen van tabel:

Deds data omzetten van sqlite en csv naar datawarehouse

In [4]:

# function naar python types
def convert_numpy_type(val):
    if pd.isnull(val):
        return None
    elif hasattr(val, 'item'):  # Handles numpy types (int64, float64, etc.)
        return val.item()
    else:
        return val

# Helper function to handle type conversions
def convert_value_for_column(value, column_name, column_type, table_name):
    """Converts a value to the appropriate type for the target column."""
    if value is None:
        # Handle NOT NULL constraints by providing default values
        if table_name == 'TRAINING' and column_name == 'TRAINING_CODE':
            return 0  # Default value for TRAINING_CODE
        elif table_name == 'SATISFACTION' and column_name == 'SATISFACTION_CODE':
            return 0  # Default value for SATISFACTION_CODE
        return None
    
    # Convert based on target column type
    try:
        if 'INT' in column_type.upper():
            # For INT columns
            if isinstance(value, str):
                # Try to convert string to int
                try:
                    return int(value)
                except ValueError:
                    print(f"Cannot convert '{value}' to INT for {table_name}.{column_name}, using default")
                    return 0
            return int(value) if value is not None else None
        
        elif 'REAL' in column_type.upper() or 'FLOAT' in column_type.upper() or 'DECIMAL' in column_type.upper():
            # For floating-point columns
            if isinstance(value, str):
                try:
                    return float(value)
                except ValueError:
                    print(f"Cannot convert '{value}' to REAL for {table_name}.{column_name}, using default")
                    return 0.0
            return float(value) if value is not None else None
        
        elif 'TEXT' in column_type.upper() or 'VARCHAR' in column_type.upper() or 'CHAR' in column_type.upper():
            # For text columns
            return str(value) if value is not None else None
        
        elif 'DATE' in column_type.upper():
            # For date columns
            return value
        
        elif 'BIT' in column_type.upper():
            # For boolean columns
            if isinstance(value, bool):
                return 1 if value else 0
            elif isinstance(value, (int, float)):
                return 1 if value > 0 else 0
            elif isinstance(value, str):
                return 1 if value.lower() in ('true', 'yes', 'y', '1') else 0
            return 0
            
        else:
            # Default handling for other types
            return value
    except Exception as e:
        print(f"Error converting value '{value}' for {table_name}.{column_name}: {e}")
        return None

# Function to get column types from a table
def get_column_types(cursor, table_name):
    """Gets the data types of columns in a table."""
    column_types = {}
    cursor.execute(f"SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
    for row in cursor.fetchall():
        column_types[row[0]] = row[1]
    return column_types

# Function to check if a column has a NOT NULL constraint
def get_nullable_columns(cursor, table_name):
    """Gets information about which columns are nullable."""
    nullable_info = {}
    cursor.execute(f"SELECT COLUMN_NAME, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
    for row in cursor.fetchall():
        nullable_info[row[0]] = row[1] == 'YES'
    return nullable_info

# Main function to import data
def import_data():
    try:
        # Connect to SQL Server
        conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER=MSI\\SQLEXPRESS;DATABASE=DWH;Trusted_Connection=yes;')
        cursor = conn.cursor()
        
        print("Connected to SQL Server database")
        
        # Get list of tables in SQL Server
        cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")
        sql_tables = [row[0] for row in cursor.fetchall()]
        print(f"Found {len(sql_tables)} tables in SQL Server database")
        
        # Disable constraints
        print("Disabling constraints...")
        cursor.execute("SELECT DISTINCT OBJECT_NAME(parent_object_id) FROM sys.foreign_keys")
        tables = [row[0] for row in cursor.fetchall()]
        for table in tables:
            cursor.execute(f"ALTER TABLE [{table}] NOCHECK CONSTRAINT ALL")
        conn.commit()
        
        # SQLite imports
        sqlite_files = [
            {"db": "go_sales_train.sqlite", "tables": ["product", "order_details", "sales_branch", "returned_item", "sales_staff"]},
            {"db": "go_staff_train.sqlite", "tables": ["training", "course", "satisfaction", "satisfaction_type"]},
            {"db": "go_crm_train.sqlite", "tables": ["country"]}
        ]
        
        for file in sqlite_files:
            db_path = f"{file['db']}"
            print(f"Processing SQLite database: {file['db']}")
            
            try:
                conn_sqlite = sql.connect(db_path)
                for table in file["tables"]:
                    print(f"Importing table: {table}")
                    
                    # Check if table exists in SQL Server
                    if table.upper() not in [t.upper() for t in sql_tables]:
                        print(f"Table {table} not found in SQL Server, skipping...")
                        continue
                    
                    # Get column information
                    column_types = get_column_types(cursor, table)
                    nullable_info = get_nullable_columns(cursor, table)
                    
                    # Read data from SQLite
                    df = pd.read_sql(f"SELECT * FROM {table}", conn_sqlite)
                    if df.empty:
                        print(f"No data found in {table}, skipping...")
                        continue
                    
                    # Get SQL Server columns
                    cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'")
                    sql_cols = [row[0] for row in cursor.fetchall()]
                    
                    # Find common columns
                    common_cols = []
                    for col in df.columns:
                        matching_cols = [s_col for s_col in sql_cols if s_col.upper() == col.upper()]
                        if matching_cols:
                            common_cols.append((col, matching_cols[0]))  # (sqlite_col, sql_server_col)
                    
                    if not common_cols:
                        print(f"No matching columns found for {table}, skipping...")
                        continue
                    
                    # Prepare SQL statement
                    sql_cols_str = ", ".join([f"[{col[1]}]" for col in common_cols])
                    placeholders = ", ".join(["?"] * len(common_cols))
                    
                    # Special handling for SATISFACTION table
                    if table.upper() == 'SATISFACTION':
                        # Initialize satisfaction_code_counter
                        try:
                            cursor.execute("SELECT MAX(SATISFACTION_CODE) FROM SATISFACTION")
                            max_code = cursor.fetchone()[0]
                            satisfaction_code_counter = (max_code or 0) + 1
                        except:
                            satisfaction_code_counter = 1
                            
                        print(f"Starting SATISFACTION_CODE counter at {satisfaction_code_counter}")
                        
                        # Insert data with generated SATISFACTION_CODE
                        for _, row in df.iterrows():
                            try:
                                # Add SATISFACTION_CODE to column list and values
                                updated_cols = "SATISFACTION_CODE, " + sql_cols_str
                                updated_placeholders = "?, " + placeholders
                                
                                # Convert values starting with the SATISFACTION_CODE
                                values = [satisfaction_code_counter]
                                
                                for sqlite_col, sql_col in common_cols:
                                    value = convert_numpy_type(row[sqlite_col])
                                    col_type = column_types.get(sql_col, 'TEXT')
                                    converted_value = convert_value_for_column(value, sql_col, col_type, table.upper())
                                    values.append(converted_value)
                                
                                # Execute insert
                                cursor.execute(f"INSERT INTO {table} ({updated_cols}) VALUES ({updated_placeholders})", tuple(values))
                                
                                # Increment counter
                                satisfaction_code_counter += 1
                            except Exception as e:
                                print(f"Error inserting row in {table}: {e}")
                    else:
                        # Standard insert for other tables
                        for _, row in df.iterrows():
                            try:
                                # Convert values
                                values = []
                                for sqlite_col, sql_col in common_cols:
                                    value = convert_numpy_type(row[sqlite_col])
                                    col_type = column_types.get(sql_col, 'TEXT')
                                    converted_value = convert_value_for_column(value, sql_col, col_type, table.upper())
                                    values.append(converted_value)
                                
                                # Execute insert
                                cursor.execute(f"INSERT INTO {table} ({sql_cols_str}) VALUES ({placeholders})", tuple(values))
                            except Exception as e:
                                print(f"Error inserting row in {table}: {e}")
                    
                    # Commit after each table
                    conn.commit()
                    print(f"Successfully imported table: {table}")
                
                conn_sqlite.close()
            except Exception as e:
                print(f"Error processing SQLite database {file['db']}: {e}")
        
        # CSV imports with specific column handling
        csv_files = {
            "product_forecast_train.csv": {
                "table": "PRODUCT_FORECAST",
                "mapping": {
                    "`PRODUCT_NUMBER": "PRODUCT_NUMBER",
                    "YEAR": "YEAR",
                    "MONTH": "MONTH",
                    "EXPECTED_VOLUME": "EXPECTED_VOLUME"
                }
            },
            "inventory_levels_train.csv": {
                "table": "INVENTORY_LEVELS",
                "mapping": {
                    "Unnamed: 0": "ID",
                    "INVENTORY_YEAR": "INVENTORY_YEAR",
                    "INVENTORY_MONTH": "INVENTORY_MONTH",
                    "PRODUCT_NUMBER": "PRODUCT_NUMBER",
                    "INVENTORY_COUNT": "INVENTORY_COUNT"
                }
            }
        }
        
        for csv_file, info in csv_files.items():
            table_name = info["table"]
            column_mapping = info["mapping"]
            
            print(f"Importing {csv_file} to {table_name}")
            
            try:
                # Get column information
                column_types = get_column_types(cursor, table_name)
                nullable_info = get_nullable_columns(cursor, table_name)
                
                # Read CSV file
                csv_path = f"{csv_file}"
                df = pd.read_csv(csv_path)
                
                # Rename columns according to mapping
                rename_dict = {}
                for csv_col, sql_col in column_mapping.items():
                    if csv_col in df.columns:
                        rename_dict[csv_col] = sql_col
                
                if rename_dict:
                    df = df.rename(columns=rename_dict)
                
                # Get SQL Server columns
                cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
                sql_cols = [row[0] for row in cursor.fetchall()]
                
                # Find common columns
                common_cols = [col for col in df.columns if col in sql_cols]
                
                if not common_cols:
                    print(f"No matching columns found for {table_name}, skipping...")
                    continue
                
                # Prepare SQL statement
                cols_str = ", ".join([f"[{col}]" for col in common_cols])
                placeholders = ", ".join(["?"] * len(common_cols))
                
                # Insert data
                for _, row in df.iterrows():
                    try:
                        # Convert values
                        values = []
                        for col in common_cols:
                            value = convert_numpy_type(row[col])
                            col_type = column_types.get(col, 'TEXT')
                            converted_value = convert_value_for_column(value, col, col_type, table_name)
                            values.append(converted_value)
                        
                        # Execute insert
                        cursor.execute(f"INSERT INTO {table_name} ({cols_str}) VALUES ({placeholders})", tuple(values))
                    except Exception as e:
                        print(f"Error inserting row in {table_name}: {e}")
                
                # Commit after each file
                conn.commit()
                print(f"Successfully imported {csv_file}")
            except Exception as e:
                print(f"Error importing CSV file {csv_file}: {e}")
        
        # Re-enable constraints with NOCHECK
        print("Re-enabling constraints...")
        for table in tables:
            cursor.execute(f"ALTER TABLE [{table}] WITH NOCHECK CHECK CONSTRAINT ALL")
        conn.commit()
        
        cursor.close()
        conn.close()
        print("Database import completed successfully")
        
    except Exception as e:
        print(f"Fatal error during import: {e}")
        raise

# Execute the import function
if __name__ == "__main__":
    try:
        import_data()
        print("Import completed successfully.")
    except Exception as e:
        print(f"Import failed: {e}")

Connected to SQL Server database
Found 19 tables in SQL Server database
Disabling constraints...
Processing SQLite database: go_sales_train.sqlite
Importing table: product
Successfully imported table: product
Importing table: order_details
Successfully imported table: order_details
Importing table: sales_branch
Successfully imported table: sales_branch
Importing table: returned_item
Successfully imported table: returned_item
Importing table: sales_staff
Successfully imported table: sales_staff
Processing SQLite database: go_staff_train.sqlite
Importing table: training
Successfully imported table: training
Importing table: course
Successfully imported table: course
Importing table: satisfaction
Starting SATISFACTION_CODE counter at 1
Successfully imported table: satisfaction
Importing table: satisfaction_type
Table satisfaction_type not found in SQL Server, skipping...
Processing SQLite database: go_crm_train.sqlite
Importing table: country
Successfully imported table: country
Importing