In [8]:
!pip install faker


Defaulting to user installation because normal site-packages is not writeable


In [9]:
!pip install pyodbc


Defaulting to user installation because normal site-packages is not writeable


In [10]:
import pyodbc
print(pyodbc.drivers())


['SQL Server', 'PostgreSQL ODBC Driver(ANSI)', 'PostgreSQL ODBC Driver(UNICODE)', 'ODBC Driver 17 for SQL Server', 'Microsoft Access Driver (*.mdb, *.accdb)', 'Microsoft Excel Driver (*.xls, *.xlsx, *.xlsm, *.xlsb)', 'Microsoft Access Text Driver (*.txt, *.csv)', 'Microsoft Access dBASE Driver (*.dbf, *.ndx, *.mdx)']


In [11]:
!pip install sqlalchemy eralchemy graphviz


Defaulting to user installation because normal site-packages is not writeable


In [None]:
import pyodbc

def execute_sql_script(cursor, script):
    """
    Executes a SQL script, splitting on 'GO' batch separators.
    """
    # Split the script into individual statements
    statements = script.strip().split('GO')
    
    for statement in statements:
        # Remove any leading/trailing whitespace
        statement = statement.strip()
        if statement:
            try:
                cursor.execute(statement)
                cursor.commit()
            except pyodbc.Error as e:
                print(f"Error executing statement:\n{statement}\nError: {e}")
                cursor.rollback()
                raise

def main():
    # Database connection parameters
    server = 'YourServerName'  # Replace with your server name
    database = 'master'  # Connect to the master database to execute DROP/CREATE DATABASE commands
    trusted_connection = 'yes'  # Use 'yes' for Windows Authentication, 'no' for SQL Server Authentication

    # If using SQL Server Authentication, uncomment and set UID and PWD
    # uid = 'your_username'
    # pwd = 'your_password'

    # Connection string
    connection_string = (
        'DRIVER={ODBC Driver 17 for SQL Server};'
        f'SERVER={server};'
        f'DATABASE={database};'
        f'Trusted_Connection={trusted_connection};'
    )

    # If using SQL Server Authentication, include UID and PWD
    # connection_string += f'UID={uid};PWD={pwd};'

    try:
        # Connect to the SQL Server
        conn = pyodbc.connect(connection_string, autocommit=True)
        cursor = conn.cursor()

        print("Connected to SQL Server.")

        # SQL script to create the Chinook database
        sql_script = """
        /*******************************************************************************
           Chinook Database - Version 1.4.5
           Script: Chinook_SqlServer.sql
        ********************************************************************************/

        /*******************************************************************************
        ********************************************************************************/

        /*******************************************************************************
           Drop database if it exists
        ********************************************************************************/
        IF EXISTS (SELECT name FROM master.dbo.sysdatabases WHERE name = N'Chinook')
        BEGIN
            ALTER DATABASE [Chinook] SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
            DROP DATABASE [Chinook];
        END

        /*******************************************************************************
           Create database
        ********************************************************************************/
        CREATE DATABASE [Chinook];

        USE [Chinook];

        /*******************************************************************************
           Create Tables
        ********************************************************************************/
        CREATE TABLE [dbo].[Album]
        (
            [AlbumId] INT NOT NULL,
            [Title] NVARCHAR(160) NOT NULL,
            [ArtistId] INT NOT NULL,
            CONSTRAINT [PK_Album] PRIMARY KEY CLUSTERED ([AlbumId])
        );

        CREATE TABLE [dbo].[Artist]
        (
            [ArtistId] INT NOT NULL,
            [Name] NVARCHAR(120),
            CONSTRAINT [PK_Artist] PRIMARY KEY CLUSTERED ([ArtistId])
        );

        CREATE TABLE [dbo].[Customer]
        (
            [CustomerId] INT NOT NULL,
            [FirstName] NVARCHAR(40) NOT NULL,
            [LastName] NVARCHAR(20) NOT NULL,
            [Company] NVARCHAR(80),
            [Address] NVARCHAR(70),
            [City] NVARCHAR(40),
            [State] NVARCHAR(40),
            [Country] NVARCHAR(40),
            [PostalCode] NVARCHAR(10),
            [Phone] NVARCHAR(24),
            [Fax] NVARCHAR(24),
            [Email] NVARCHAR(60) NOT NULL,
            [SupportRepId] INT,
            CONSTRAINT [PK_Customer] PRIMARY KEY CLUSTERED ([CustomerId])
        );

        CREATE TABLE [dbo].[Employee]
        (
            [EmployeeId] INT NOT NULL,
            [LastName] NVARCHAR(20) NOT NULL,
            [FirstName] NVARCHAR(20) NOT NULL,
            [Title] NVARCHAR(30),
            [ReportsTo] INT,
            [BirthDate] DATETIME,
            [HireDate] DATETIME,
            [Address] NVARCHAR(70),
            [City] NVARCHAR(40),
            [State] NVARCHAR(40),
            [Country] NVARCHAR(40),
            [PostalCode] NVARCHAR(10),
            [Phone] NVARCHAR(24),
            [Fax] NVARCHAR(24),
            [Email] NVARCHAR(60),
            CONSTRAINT [PK_Employee] PRIMARY KEY CLUSTERED ([EmployeeId])
        );

        CREATE TABLE [dbo].[Genre]
        (
            [GenreId] INT NOT NULL,
            [Name] NVARCHAR(120),
            CONSTRAINT [PK_Genre] PRIMARY KEY CLUSTERED ([GenreId])
        );

        CREATE TABLE [dbo].[Invoice]
        (
            [InvoiceId] INT NOT NULL,
            [CustomerId] INT NOT NULL,
            [InvoiceDate] DATETIME NOT NULL,
            [BillingAddress] NVARCHAR(70),
            [BillingCity] NVARCHAR(40),
            [BillingState] NVARCHAR(40),
            [BillingCountry] NVARCHAR(40),
            [BillingPostalCode] NVARCHAR(10),
            [Total] NUMERIC(10,2) NOT NULL,
            CONSTRAINT [PK_Invoice] PRIMARY KEY CLUSTERED ([InvoiceId])
        );

        CREATE TABLE [dbo].[InvoiceLine]
        (
            [InvoiceLineId] INT NOT NULL,
            [InvoiceId] INT NOT NULL,
            [TrackId] INT NOT NULL,
            [UnitPrice] NUMERIC(10,2) NOT NULL,
            [Quantity] INT NOT NULL,
            CONSTRAINT [PK_InvoiceLine] PRIMARY KEY CLUSTERED ([InvoiceLineId])
        );

        CREATE TABLE [dbo].[MediaType]
        (
            [MediaTypeId] INT NOT NULL,
            [Name] NVARCHAR(120),
            CONSTRAINT [PK_MediaType] PRIMARY KEY CLUSTERED ([MediaTypeId])
        );

        CREATE TABLE [dbo].[Playlist]
        (
            [PlaylistId] INT NOT NULL,
            [Name] NVARCHAR(120),
            CONSTRAINT [PK_Playlist] PRIMARY KEY CLUSTERED ([PlaylistId])
        );

        CREATE TABLE [dbo].[PlaylistTrack]
        (
            [PlaylistId] INT NOT NULL,
            [TrackId] INT NOT NULL,
            CONSTRAINT [PK_PlaylistTrack] PRIMARY KEY NONCLUSTERED ([PlaylistId], [TrackId])
        );

        CREATE TABLE [dbo].[Track]
        (
            [TrackId] INT NOT NULL,
            [Name] NVARCHAR(200) NOT NULL,
            [AlbumId] INT,
            [MediaTypeId] INT NOT NULL,
            [GenreId] INT,
            [Composer] NVARCHAR(220),
            [Milliseconds] INT NOT NULL,
            [Bytes] INT,
            [UnitPrice] NUMERIC(10,2) NOT NULL,
            CONSTRAINT [PK_Track] PRIMARY KEY CLUSTERED ([TrackId])
        );

        /*******************************************************************************
           Create Foreign Keys
        ********************************************************************************/
        ALTER TABLE [dbo].[Album] ADD CONSTRAINT [FK_AlbumArtistId]
            FOREIGN KEY ([ArtistId]) REFERENCES [dbo].[Artist] ([ArtistId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_AlbumArtistId] ON [dbo].[Album] ([ArtistId]);

        ALTER TABLE [dbo].[Customer] ADD CONSTRAINT [FK_CustomerSupportRepId]
            FOREIGN KEY ([SupportRepId]) REFERENCES [dbo].[Employee] ([EmployeeId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_CustomerSupportRepId] ON [dbo].[Customer] ([SupportRepId]);

        ALTER TABLE [dbo].[Employee] ADD CONSTRAINT [FK_EmployeeReportsTo]
            FOREIGN KEY ([ReportsTo]) REFERENCES [dbo].[Employee] ([EmployeeId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_EmployeeReportsTo] ON [dbo].[Employee] ([ReportsTo]);

        ALTER TABLE [dbo].[Invoice] ADD CONSTRAINT [FK_InvoiceCustomerId]
            FOREIGN KEY ([CustomerId]) REFERENCES [dbo].[Customer] ([CustomerId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_InvoiceCustomerId] ON [dbo].[Invoice] ([CustomerId]);

        ALTER TABLE [dbo].[InvoiceLine] ADD CONSTRAINT [FK_InvoiceLineInvoiceId]
            FOREIGN KEY ([InvoiceId]) REFERENCES [dbo].[Invoice] ([InvoiceId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_InvoiceLineInvoiceId] ON [dbo].[InvoiceLine] ([InvoiceId]);

        ALTER TABLE [dbo].[InvoiceLine] ADD CONSTRAINT [FK_InvoiceLineTrackId]
            FOREIGN KEY ([TrackId]) REFERENCES [dbo].[Track] ([TrackId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_InvoiceLineTrackId] ON [dbo].[InvoiceLine] ([TrackId]);

        ALTER TABLE [dbo].[PlaylistTrack] ADD CONSTRAINT [FK_PlaylistTrackPlaylistId]
            FOREIGN KEY ([PlaylistId]) REFERENCES [dbo].[Playlist] ([PlaylistId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_PlaylistTrackPlaylistId] ON [dbo].[PlaylistTrack] ([PlaylistId]);

        ALTER TABLE [dbo].[PlaylistTrack] ADD CONSTRAINT [FK_PlaylistTrackTrackId]
            FOREIGN KEY ([TrackId]) REFERENCES [dbo].[Track] ([TrackId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_PlaylistTrackTrackId] ON [dbo].[PlaylistTrack] ([TrackId]);

        ALTER TABLE [dbo].[Track] ADD CONSTRAINT [FK_TrackAlbumId]
            FOREIGN KEY ([AlbumId]) REFERENCES [dbo].[Album] ([AlbumId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_TrackAlbumId] ON [dbo].[Track] ([AlbumId]);

        ALTER TABLE [dbo].[Track] ADD CONSTRAINT [FK_TrackGenreId]
            FOREIGN KEY ([GenreId]) REFERENCES [dbo].[Genre] ([GenreId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_TrackGenreId] ON [dbo].[Track] ([GenreId]);

        ALTER TABLE [dbo].[Track] ADD CONSTRAINT [FK_TrackMediaTypeId]
            FOREIGN KEY ([MediaTypeId]) REFERENCES [dbo].[MediaType] ([MediaTypeId]) ON DELETE NO ACTION ON UPDATE NO ACTION;

        CREATE INDEX [IFK_TrackMediaTypeId] ON [dbo].[Track] ([MediaTypeId]);
        """

        # Execute the SQL script
        execute_sql_script(cursor, sql_script)
        print("Chinook database created successfully.")

    except pyodbc.Error as e:
        print("An error occurred:", e)

    finally:
        # Close the connection
        cursor.close()
        conn.close()
        print("Connection closed.")

if __name__ == '__main__':
    main()


In [19]:
import os
from sqlalchemy import create_engine
from eralchemy import render_er
import os
os.environ["PATH"] += os.pathsep + r"C:\Program Files\Graphviz\bin"

def main():
    # Database connection parameters
    server = 'DPC2023'  # Replace with your server name
    database = 'Chinook'
    trusted_connection = 'yes'  # Use 'yes' for Windows Authentication, 'no' for SQL Server Authentication


    # Create the connection string for SQLAlchemy
    if trusted_connection == 'yes':
        connection_string = f'mssql+pyodbc://@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes'
    else:
        connection_string = f'mssql+pyodbc://{uid}:{pwd}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server'

    # Create the SQLAlchemy engine
    try:
        engine = create_engine(connection_string)
        print("Connected to the Chinook database.")
    except Exception as e:
        print("Error connecting to the database:", e)
        return

    # Output file name
    output_file = 'chinook_erd.png'

    # Generate the ERD
    try:
        print("Generating the ERD...")
        render_er(connection_string, output_file)
        print(f"ERD generated and saved to {output_file}")
    except Exception as e:
        print("Error generating the ERD:", e)
        return

    # Open the ERD image
    try:
        if os.name == 'nt':  # For Windows
            os.startfile(output_file)
        elif os.name == 'posix':  # For Unix/Linux/Mac
            os.system(f'open "{output_file}"')
        else:
            print("Please open the ERD image manually:", output_file)
    except Exception as e:
        print("Error opening the ERD image:", e)

if __name__ == '__main__':
    main()


Connected to the Chinook database.
Generating the ERD...
ERD generated and saved to chinook_erd.png


In [21]:
import pyodbc
import pandas as pd
import sys

def connect_to_db(server, database):
    """Establishes a connection to the SQL Server database."""
    try:
        conn = pyodbc.connect(
            'DRIVER={ODBC Driver 17 for SQL Server};'
            f'SERVER={server};'
            f'DATABASE={database};'
            'Trusted_Connection=yes;'
        )
        return conn
    except pyodbc.Error as e:
        print("Error connecting to database:", e)
        sys.exit(1)

def get_tables(cursor):
    """Retrieves a list of tables in the database."""
    cursor.execute("""
    SELECT TABLE_NAME
    FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG = ?
    """, cursor.connection.getinfo(pyodbc.SQL_DATABASE_NAME))
    tables = [row.TABLE_NAME for row in cursor.fetchall()]
    return tables

def get_non_nullable_columns(cursor, table_name):
    """Retrieves a list of non-nullable columns for a table."""
    cursor.execute("""
    SELECT COLUMN_NAME
    FROM INFORMATION_SCHEMA.COLUMNS
    WHERE TABLE_NAME = ? AND IS_NULLABLE = 'NO'
    """, table_name)
    columns = [row.COLUMN_NAME for row in cursor.fetchall()]
    return columns

def get_primary_keys(cursor, table_name):
    """Retrieves the primary key columns for a table."""
    cursor.execute("""
    SELECT KU.COLUMN_NAME
    FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
    JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU
        ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
    WHERE TC.TABLE_NAME = ? AND TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
    """, table_name)
    pk_columns = [row.COLUMN_NAME for row in cursor.fetchall()]
    return pk_columns

def get_foreign_keys(cursor, table_name):
    """Retrieves foreign key relationships for a table."""
    cursor.execute("""
    SELECT
        FK.COLUMN_NAME,
        PK.TABLE_NAME AS REFERENCED_TABLE_NAME,
        PK.COLUMN_NAME AS REFERENCED_COLUMN_NAME
    FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS C
    INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE FK
        ON C.CONSTRAINT_NAME = FK.CONSTRAINT_NAME
    INNER JOIN (
        SELECT
            i1.TABLE_NAME,
            i2.COLUMN_NAME
        FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS i1
        INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE i2
            ON i1.CONSTRAINT_NAME = i2.CONSTRAINT_NAME
        WHERE i1.CONSTRAINT_TYPE = 'PRIMARY KEY'
    ) PK
        ON C.UNIQUE_CONSTRAINT_NAME = PK.TABLE_NAME
    WHERE FK.TABLE_NAME = ?
    """, table_name)
    foreign_keys = cursor.fetchall()
    return foreign_keys

def check_nulls_in_non_nullable_columns(cursor, table_name):
    """Checks for NULLs in non-nullable columns."""
    non_nullable_columns = get_non_nullable_columns(cursor, table_name)
    issues = []
    for column in non_nullable_columns:
        cursor.execute(f"""
        SELECT COUNT(*) AS NullCount
        FROM {table_name}
        WHERE {column} IS NULL
        """)
        null_count = cursor.fetchone().NullCount
        if null_count > 0:
            issues.append(f"Column {column} in table {table_name} has {null_count} NULL values.")
    return issues

def check_duplicate_primary_keys(cursor, table_name):
    """Checks for duplicate primary keys."""
    pk_columns = get_primary_keys(cursor, table_name)
    if not pk_columns:
        return []
    columns_joined = ', '.join(pk_columns)
    cursor.execute(f"""
    SELECT {columns_joined}, COUNT(*) AS Count
    FROM {table_name}
    GROUP BY {columns_joined}
    HAVING COUNT(*) > 1
    """)
    duplicates = cursor.fetchall()
    issues = []
    if duplicates:
        for dup in duplicates:
            pk_values = ', '.join([f"{col}={val}" for col, val in zip(pk_columns, dup[:-1])])
            issues.append(f"Duplicate primary key in table {table_name}: {pk_values}, Count: {dup.Count}")
    return issues

def check_foreign_key_violations(cursor, table_name):
    """Checks for foreign key violations."""
    cursor.execute("""
    SELECT
        FK.COLUMN_NAME,
        FK.TABLE_NAME AS FK_TABLE_NAME,
        PK.TABLE_NAME AS REFERENCED_TABLE_NAME,
        PK.COLUMN_NAME AS REFERENCED_COLUMN_NAME
    FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS C
    INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE FK
        ON C.CONSTRAINT_NAME = FK.CONSTRAINT_NAME
    INNER JOIN (
        SELECT
            i1.CONSTRAINT_NAME,
            i2.TABLE_NAME,
            i2.COLUMN_NAME
        FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS i1
        INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE i2
            ON i1.CONSTRAINT_NAME = i2.CONSTRAINT_NAME
        WHERE i1.CONSTRAINT_TYPE = 'PRIMARY KEY'
    ) PK
        ON C.UNIQUE_CONSTRAINT_NAME = PK.CONSTRAINT_NAME
    WHERE FK.TABLE_NAME = ?
    """, table_name)
    foreign_keys = cursor.fetchall()
    issues = []
    for fk in foreign_keys:
        fk_column = fk.COLUMN_NAME
        fk_table = fk.FK_TABLE_NAME
        pk_table = fk.REFERENCED_TABLE_NAME
        pk_column = fk.REFERENCED_COLUMN_NAME
        cursor.execute(f"""
        SELECT COUNT(*) AS ViolationCount
        FROM {fk_table} f
        LEFT JOIN {pk_table} p ON f.{fk_column} = p.{pk_column}
        WHERE p.{pk_column} IS NULL AND f.{fk_column} IS NOT NULL
        """)
        violation_count = cursor.fetchone().ViolationCount
        if violation_count > 0:
            issues.append(f"Foreign key violation in table {fk_table}.{fk_column} referencing {pk_table}.{pk_column}. Violations: {violation_count}")
    return issues

def check_negative_values(cursor, table_name, columns):
    """Checks for negative values in specified columns."""
    issues = []
    for column in columns:
        cursor.execute(f"""
        SELECT COUNT(*) AS NegativeCount
        FROM {table_name}
        WHERE {column} < 0
        """)
        negative_count = cursor.fetchone().NegativeCount
        if negative_count > 0:
            issues.append(f"Column {column} in table {table_name} has {negative_count} negative values.")
    return issues

def check_future_dates(cursor, table_name, date_columns):
    """Checks for dates in the future."""
    issues = []
    for column in date_columns:
        cursor.execute(f"""
        SELECT COUNT(*) AS FutureDateCount
        FROM {table_name}
        WHERE {column} > GETDATE()
        """)
        future_date_count = cursor.fetchone().FutureDateCount
        if future_date_count > 0:
            issues.append(f"Column {column} in table {table_name} has {future_date_count} dates in the future.")
    return issues

def check_redundant_data(cursor, table_name):
    """Checks for redundant data (duplicate rows)."""
    cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ?", table_name)
    columns = [row.COLUMN_NAME for row in cursor.fetchall()]
    pk_columns = get_primary_keys(cursor, table_name)
    non_pk_columns = [col for col in columns if col not in pk_columns]
    if not non_pk_columns:
        return []
    columns_joined = ', '.join(non_pk_columns)
    cursor.execute(f"""
    SELECT {columns_joined}, COUNT(*) AS Count
    FROM {table_name}
    GROUP BY {columns_joined}
    HAVING COUNT(*) > 1
    """)
    duplicates = cursor.fetchall()
    issues = []
    if duplicates:
        issues.append(f"Table {table_name} has {len(duplicates)} sets of duplicate records based on non-primary key columns.")
    return issues

def main():
    # Database connection parameters
    server = 'DPC2023'  # Replace with your server name
    database = 'Chinook'

    # Connect to the database
    conn = connect_to_db(server, database)
    cursor = conn.cursor()

    # Retrieve list of tables
    tables = get_tables(cursor)

    # Initialize an issues list
    all_issues = []

    # Analyze each table
    for table in tables:
        print(f"\nAnalyzing table: {table}")
        # Check for nulls in non-nullable columns
        null_issues = check_nulls_in_non_nullable_columns(cursor, table)
        # Check for duplicate primary keys
        dup_pk_issues = check_duplicate_primary_keys(cursor, table)
        # Check for foreign key violations
        fk_issues = check_foreign_key_violations(cursor, table)
        # Check for redundant data
        redundant_issues = check_redundant_data(cursor, table)
        # Check for negative values in specific columns
        negative_issues = []
        if table == 'InvoiceLine':
            negative_issues += check_negative_values(cursor, table, ['UnitPrice', 'Quantity'])
        elif table == 'Invoice':
            negative_issues += check_negative_values(cursor, table, ['Total'])
        elif table == 'Track':
            negative_issues += check_negative_values(cursor, table, ['UnitPrice', 'Milliseconds', 'Bytes'])

        # Check for future dates in date columns
        date_issues = []
        if table == 'Invoice':
            date_issues += check_future_dates(cursor, table, ['InvoiceDate'])
        elif table == 'Employee':
            date_issues += check_future_dates(cursor, table, ['BirthDate', 'HireDate'])

        # Collect all issues for the table
        table_issues = null_issues + dup_pk_issues + fk_issues + redundant_issues + negative_issues + date_issues

        if table_issues:
            print(f"Issues found in table {table}:")
            for issue in table_issues:
                print(f"- {issue}")
            all_issues.extend(table_issues)
        else:
            print(f"No issues found in table {table}.")

    # Close connections
    cursor.close()
    conn.close()

    # Summary
    print("\n--- Data Quality Analysis Summary ---")
    if all_issues:
        print(f"Total issues found: {len(all_issues)}")
        for issue in all_issues:
            print(f"- {issue}")
    else:
        print("No data quality issues found in the database.")

if __name__ == '__main__':
    main()



Analyzing table: Album
No issues found in table Album.

Analyzing table: Artist
Issues found in table Artist:
- Table Artist has 2 sets of duplicate records based on non-primary key columns.

Analyzing table: Customer
No issues found in table Customer.

Analyzing table: Employee
No issues found in table Employee.

Analyzing table: Genre
Issues found in table Genre:
- Table Genre has 38 sets of duplicate records based on non-primary key columns.

Analyzing table: Invoice
Issues found in table Invoice:
- Column InvoiceDate in table Invoice has 92 dates in the future.

Analyzing table: InvoiceLine
No issues found in table InvoiceLine.

Analyzing table: MediaType
Issues found in table MediaType:
- Table MediaType has 19 sets of duplicate records based on non-primary key columns.

Analyzing table: Playlist
Issues found in table Playlist:
- Table Playlist has 16 sets of duplicate records based on non-primary key columns.

Analyzing table: PlaylistTrack
No issues found in table PlaylistTrack

In [None]:
import pyodbc
import sys
from datetime import datetime

def connect_to_db(server, database):
    """Establishes a connection to the SQL Server database."""
    try:
        conn = pyodbc.connect(
            'DRIVER={ODBC Driver 17 for SQL Server};'
            f'SERVER={server};'
            f'DATABASE={database};'
            'Trusted_Connection=yes;'
        )
        return conn
    except pyodbc.Error as e:
        print("Error connecting to database:", e)
        sys.exit(1)

def fix_duplicate_records(conn, cursor, table_name):
    """
    Identifies and removes duplicate records based on non-primary key columns.
    Updates foreign key references to point to the kept record.
    """
    print(f"\nProcessing duplicates in table: {table_name}")
    # Get all columns
    cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ?", table_name)
    columns = [row.COLUMN_NAME for row in cursor.fetchall()]
    # Get primary key columns
    cursor.execute("""
        SELECT KU.COLUMN_NAME
        FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
        JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU
            ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
        WHERE TC.TABLE_NAME = ? AND TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
        """, table_name)
    pk_columns = [row.COLUMN_NAME for row in cursor.fetchall()]
    if not pk_columns:
        print(f"No primary key defined for table {table_name}. Skipping.")
        return
    # Non-primary key columns
    non_pk_columns = [col for col in columns if col not in pk_columns]
    if not non_pk_columns:
        print(f"No non-primary key columns to check for duplicates in table {table_name}.")
        return
    # Find duplicates
    columns_joined = ', '.join(non_pk_columns)
    pk_column = pk_columns[0]  # Assuming single-column primary key
    cursor.execute(f"""
        SELECT {columns_joined}, COUNT(*) AS Count, MIN({pk_column}) AS KeepId
        FROM {table_name}
        GROUP BY {columns_joined}
        HAVING COUNT(*) > 1
        """)
    duplicates = cursor.fetchall()
    if not duplicates:
        print(f"No duplicates found in table {table_name}.")
        return
    print(f"Found {len(duplicates)} sets of duplicate records in table {table_name}.")
    for dup in duplicates:
        # Extract values
        dup_values = dup[:-2]
        count = dup.Count
        keep_id = dup.KeepId
        # Find all duplicate IDs except the one to keep
        where_clause = ' AND '.join([f"{col} = ?" if dup_values[idx] is not None else f"{col} IS NULL"
                                     for idx, col in enumerate(non_pk_columns)])
        params = [val for val in dup_values if val is not None]
        cursor.execute(f"""
            SELECT {pk_column}
            FROM {table_name}
            WHERE {where_clause} AND {pk_column} != ?
            """, *params, keep_id)
        duplicate_ids = [row[0] for row in cursor.fetchall()]
        # Update foreign key references
        update_foreign_keys(conn, cursor, table_name, pk_column, keep_id, duplicate_ids)
        # Delete duplicate records
        delete_ids(conn, cursor, table_name, pk_column, duplicate_ids)
    print(f"Duplicates resolved in table {table_name}.")

def update_foreign_keys(conn, cursor, table_name, pk_column, keep_id, duplicate_ids):
    """
    Updates foreign key references from duplicate IDs to the kept ID.
    """
    # Find tables that reference this table
    cursor.execute("""
        SELECT FK.TABLE_NAME, FK.COLUMN_NAME
        FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS C
        INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE FK
            ON C.CONSTRAINT_NAME = FK.CONSTRAINT_NAME
        WHERE C.UNIQUE_CONSTRAINT_NAME IN (
            SELECT CONSTRAINT_NAME
            FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS
            WHERE TABLE_NAME = ? AND CONSTRAINT_TYPE = 'PRIMARY KEY'
        )
        """, table_name)
    referencing_tables = cursor.fetchall()
    for ref in referencing_tables:
        ref_table = ref.TABLE_NAME
        ref_column = ref.COLUMN_NAME
        print(f"Updating foreign keys in {ref_table}.{ref_column}")
        # Update statements
        for dup_id in duplicate_ids:
            cursor.execute(f"""
                UPDATE {ref_table}
                SET {ref_column} = ?
                WHERE {ref_column} = ?
                """, keep_id, dup_id)
            conn.commit()

def delete_ids(conn, cursor, table_name, pk_column, ids_to_delete):
    """
    Deletes records from a table based on a list of primary key IDs.
    """
    if not ids_to_delete:
        return
    print(f"Deleting {len(ids_to_delete)} records from {table_name}")
    placeholders = ', '.join(['?' for _ in ids_to_delete])
    cursor.execute(f"""
        DELETE FROM {table_name}
        WHERE {pk_column} IN ({placeholders})
        """, ids_to_delete)
    conn.commit()

def fix_future_dates(conn, cursor, table_name, date_column):
    """
    Updates date values in the future to the current date or a logical date.
    """
    print(f"\nProcessing future dates in table: {table_name}.{date_column}")
    cursor.execute(f"""
        SELECT COUNT(*) AS FutureDateCount
        FROM {table_name}
        WHERE {date_column} > GETDATE()
        """)
    future_date_count = cursor.fetchone().FutureDateCount
    if future_date_count == 0:
        print(f"No future dates found in {table_name}.{date_column}.")
        return
    print(f"Found {future_date_count} future dates in {table_name}.{date_column}.")
    # Decide how to correct dates
    # Option 1: Set to current date
    # Option 2: Subtract a certain period
    # Here, we'll set to current date
    cursor.execute(f"""
        UPDATE {table_name}
        SET {date_column} = GETDATE()
        WHERE {date_column} > GETDATE()
        """)
    conn.commit()
    print(f"Future dates in {table_name}.{date_column} updated to current date.")

def main():
    # Database connection parameters
    server = 'DPC2023'  # Replace with your server name
    database = 'Chinook'

    # Connect to the database
    conn = connect_to_db(server, database)
    cursor = conn.cursor()

    # Fix duplicate records in specified tables
    tables_with_duplicates = ['Artist', 'Genre', 'MediaType', 'Playlist']
    for table in tables_with_duplicates:
        fix_duplicate_records(conn, cursor, table)

    # Fix future dates in Invoice table
    fix_future_dates(conn, cursor, 'Invoice', 'InvoiceDate')

    # Close connections
    cursor.close()
    conn.close()
    print("\nData quality issues corrected successfully.")

if __name__ == '__main__':
    main()


In [13]:
import pyodbc
import pandas as pd

def connect_to_db(server, database):
    """Establishes a connection to the SQL Server database."""
    conn = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};'
        f'SERVER={server};'
        f'DATABASE={database};'
        'Trusted_Connection=yes;'
    )
    return conn

def get_table_list(cursor):
    """Retrieves a list of tables in the database."""
    cursor.execute("""
    SELECT TABLE_NAME
    FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_TYPE = 'BASE TABLE'
    """)
    tables = [row.TABLE_NAME for row in cursor.fetchall()]
    print("Tables in the database:")
    for table in tables:
        print(f"- {table}")
    return tables

def get_table_schema(cursor, table_name):
    """Retrieves the schema of a table."""
    cursor.execute(f"""
    SELECT 
        COLUMN_NAME, 
        DATA_TYPE, 
        CHARACTER_MAXIMUM_LENGTH, 
        IS_NULLABLE
    FROM INFORMATION_SCHEMA.COLUMNS
    WHERE TABLE_NAME = '{table_name}'
    """)
    schema = cursor.fetchall()
    print(f"\nSchema of {table_name}:")
    for col in schema:
        print(f"- {col.COLUMN_NAME}: {col.DATA_TYPE}({col.CHARACTER_MAXIMUM_LENGTH}), Nullable: {col.IS_NULLABLE}")
    return schema

def get_row_count(cursor, table_name):
    """Retrieves the number of rows in a table."""
    cursor.execute(f"SELECT COUNT(*) AS [RowCount] FROM {table_name}")
    row_count = cursor.fetchone().RowCount
    print(f"{table_name} has {row_count} rows.")
    return row_count

def get_primary_keys(cursor, table_name):
    """Retrieves primary keys of a table."""
    cursor.execute(f"""
    SELECT KU.COLUMN_NAME
    FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
    JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU
        ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
    WHERE TC.TABLE_NAME = '{table_name}' AND TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
    """)
    pk = [row.COLUMN_NAME for row in cursor.fetchall()]
    print(f"Primary Key(s) of {table_name}: {', '.join(pk)}")
    return pk

def get_foreign_keys(cursor, table_name):
    """Retrieves foreign keys of a table."""
    cursor.execute(f"""
    SELECT
        KCU.COLUMN_NAME,
        KCU.CONSTRAINT_NAME,
        RC.UPDATE_RULE,
        RC.DELETE_RULE,
        KCU2.TABLE_NAME AS REFERENCED_TABLE_NAME,
        KCU2.COLUMN_NAME AS REFERENCED_COLUMN_NAME
    FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS AS RC
    JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KCU
        ON KCU.CONSTRAINT_NAME = RC.CONSTRAINT_NAME
    JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KCU2
        ON KCU2.CONSTRAINT_NAME = RC.UNIQUE_CONSTRAINT_NAME
    WHERE KCU.TABLE_NAME = '{table_name}'
    """)
    fks = cursor.fetchall()
    if fks:
        print(f"Foreign Keys of {table_name}:")
        for fk in fks:
            print(f"- {fk.COLUMN_NAME} references {fk.REFERENCED_TABLE_NAME}({fk.REFERENCED_COLUMN_NAME})")
    else:
        print(f"No foreign keys found for {table_name}.")
    return fks

def explore_table_data(conn, table_name, limit=5):
    """Displays sample data from a table."""
    print(f"\nSample data from {table_name}:")
    df = pd.read_sql(f"SELECT TOP {limit} * FROM {table_name}", conn)
    print(df)
    return df

def analyze_table(conn, cursor, table_name):
    """Performs analysis on a table."""
    print(f"\nAnalyzing table: {table_name}")
    schema = get_table_schema(cursor, table_name)
    row_count = get_row_count(cursor, table_name)
    pk = get_primary_keys(cursor, table_name)
    fks = get_foreign_keys(cursor, table_name)
    sample_data = explore_table_data(conn, table_name)
    return {
        'schema': schema,
        'row_count': row_count,
        'primary_keys': pk,
        'foreign_keys': fks,
        'sample_data': sample_data
    }

def main():
    # Database connection parameters
    server = 'DPC2023'  # Replace with your server name
    database = 'Chinook'
    
    # Connect to the database
    conn = connect_to_db(server, database)
    cursor = conn.cursor()
    
    # Retrieve list of tables
    tables = get_table_list(cursor)
    
    # Analyze each table
    table_analysis = {}
    for table in tables:
        table_analysis[table] = analyze_table(conn, cursor, table)
    
    # Close connections
    cursor.close()
    conn.close()
    
    # Begin Dimensional Modeling Steps
    print("\n--- Dimensional Modeling Steps ---")
    # Step 1: Select the Business Process
    print("\nStep 1: Selecting the Business Process")
    print("The main business process is 'Sales Transactions'. We focus on customer purchases and sales of tracks.")
    
    # Step 2: Declare the Grain
    print("\nStep 2: Declaring the Grain")
    print("We define the grain as 'One row per invoice line item (InvoiceLine)', representing each sold track.")
    
    # Step 3: Identify the Dimensions
    print("\nStep 3: Identifying Dimensions")
    print("Based on the analysis, potential dimensions include:")
    print("- Date (from Invoice.InvoiceDate)")
    print("- Customer")
    print("- Employee (Support Rep)")
    print("- Track")
    print("- Album")
    print("- Artist")
    print("- Genre")
    print("- MediaType")
    
    # Step 4: Identify the Facts
    print("\nStep 4: Identifying Facts")
    print("The measurable quantities (facts) include:")
    print("- Quantity (InvoiceLine.Quantity)")
    print("- Unit Price (InvoiceLine.UnitPrice)")
    print("- Total Amount (calculated as Quantity * Unit Price)")
    
    # Summarize the proposed data warehouse schema
    print("\n--- Proposed Data Warehouse Schema ---")
    print("Fact Table:")
    print("- FactSales")
    print("  - Grain: One row per invoice line item")
    print("  - Foreign Keys: DateKey, CustomerKey, EmployeeKey, TrackKey, AlbumKey, ArtistKey, GenreKey, MediaTypeKey")
    print("  - Measures: Quantity, Unit Price, Total Amount")
    
    print("\nDimension Tables:")
    print("- DimDate")
    print("- DimCustomer")
    print("- DimEmployee")
    print("- DimTrack")
    print("- DimAlbum")
    print("- DimArtist")
    print("- DimGenre")
    print("- DimMediaType")
    
    print("\nThis schema will allow us to answer complex analytical queries such as:")
    print("- Grouping customers by geographic region and calculating total spending and average order value.")
    print("- Identifying the top-selling tracks, albums, and artists over a specified period, sorted by revenue or quantity sold.")
    
if __name__ == '__main__':
    main()


Tables in the database:
- Album
- Artist
- Customer
- Employee
- Genre
- Invoice
- InvoiceLine
- MediaType
- Playlist
- PlaylistTrack
- Track
- sysdiagrams

Analyzing table: Album

Schema of Album:
- AlbumId: int(None), Nullable: NO
- Title: nvarchar(160), Nullable: NO
- ArtistId: int(None), Nullable: NO
Album has 1168 rows.
Primary Key(s) of Album: AlbumId
Foreign Keys of Album:
- ArtistId references Artist(ArtistId)

Sample data from Album:
   AlbumId                                  Title  ArtistId
0        1  For Those About To Rock We Salute You         1
1        2                      Balls to the Wall         2
2        3                      Restless and Wild         2
3        4                      Let There Be Rock         1
4        5                               Big Ones         3

Analyzing table: Artist



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.




Schema of Artist:
- ArtistId: int(None), Nullable: NO
- Name: nvarchar(120), Nullable: YES
Artist has 686 rows.
Primary Key(s) of Artist: ArtistId
No foreign keys found for Artist.

Sample data from Artist:
   ArtistId               Name
0         1              AC/DC
1         2             Accept
2         3          Aerosmith
3         4  Alanis Morissette
4         5    Alice In Chains

Analyzing table: Customer

Schema of Customer:
- CustomerId: int(None), Nullable: NO
- FirstName: nvarchar(40), Nullable: NO
- LastName: nvarchar(20), Nullable: NO
- Company: nvarchar(80), Nullable: YES
- Address: nvarchar(70), Nullable: YES
- City: nvarchar(40), Nullable: YES
- State: nvarchar(40), Nullable: YES
- Country: nvarchar(40), Nullable: YES
- PostalCode: nvarchar(10), Nullable: YES
- Phone: nvarchar(24), Nullable: YES
- Fax: nvarchar(24), Nullable: YES
- Email: nvarchar(60), Nullable: NO
- SupportRepId: int(None), Nullable: YES
Customer has 874 rows.



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Primary Key(s) of Customer: CustomerId
Foreign Keys of Customer:
- SupportRepId references Employee(EmployeeId)

Sample data from Customer:
   CustomerId  FirstName     LastName  \
0           1       Luís    Gonçalves   
1           2     Leonie       Köhler   
2           3   François     Tremblay   
3           4      Bjørn       Hansen   
4           5  František  Wichterlová   

                                            Company  \
0  Embraer - Empresa Brasileira de Aeronáutica S.A.   
1                                              None   
2                                              None   
3                                              None   
4                                  JetBrains s.r.o.   

                           Address                 City State         Country  \
0  Av. Brigadeiro Faria Lima, 2170  São José dos Campos    SP          Brazil   
1          Theodor-Heuss-Straße 34            Stuttgart  None         Germany   
2                1498 rue Bélanger     


pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Foreign Keys of Employee:
- ReportsTo references Employee(EmployeeId)

Sample data from Employee:
   EmployeeId LastName FirstName                Title  ReportsTo  BirthDate  \
0           1    Adams    Andrew      General Manager        NaN 1962-02-18   
1           2  Edwards     Nancy        Sales Manager        1.0 1958-12-08   
2           3  Peacock      Jane  Sales Support Agent        2.0 1973-08-29   
3           4     Park  Margaret  Sales Support Agent        2.0 1947-09-19   
4           5  Johnson     Steve  Sales Support Agent        2.0 1965-03-03   

    HireDate              Address      City State Country PostalCode  \
0 2002-08-14  11120 Jasper Ave NW  Edmonton    AB  Canada    T5K 2N1   
1 2002-05-01         825 8 Ave SW   Calgary    AB  Canada    T2P 2T3   
2 2002-04-01        1111 6 Ave SW   Calgary    AB  Canada    T2P 5M5   
3 2003-05-03     683 10 Street SW   Calgary    AB  Canada    T2P 5G3   
4 2003-10-17         7727B 41 Ave   Calgary    AB  Canada    T3B 1Y


pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Primary Key(s) of Genre: GenreId
No foreign keys found for Genre.

Sample data from Genre:
   GenreId                Name
0        1                Rock
1        2                Jazz
2        3               Metal
3        4  Alternative & Punk
4        5       Rock And Roll

Analyzing table: Invoice

Schema of Invoice:
- InvoiceId: int(None), Nullable: NO
- CustomerId: int(None), Nullable: NO
- InvoiceDate: datetime(None), Nullable: NO
- BillingAddress: nvarchar(70), Nullable: YES
- BillingCity: nvarchar(40), Nullable: YES
- BillingState: nvarchar(40), Nullable: YES
- BillingCountry: nvarchar(40), Nullable: YES
- BillingPostalCode: nvarchar(10), Nullable: YES
- Total: numeric(None), Nullable: NO
Invoice has 1642 rows.
Primary Key(s) of Invoice: InvoiceId



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Foreign Keys of Invoice:
- CustomerId references Customer(CustomerId)

Sample data from Invoice:
   InvoiceId  CustomerId InvoiceDate           BillingAddress BillingCity  \
0          1           2  2021-01-01  Theodor-Heuss-Straße 34   Stuttgart   
1          2           4  2021-01-02         Ullevålsveien 14        Oslo   
2          3           8  2021-01-03          Grétrystraat 63    Brussels   
3          4          14  2021-01-06           8210 111 ST NW    Edmonton   
4          5          23  2021-01-11          69 Salem Street      Boston   

  BillingState BillingCountry BillingPostalCode  Total  
0         None        Germany             70174   1.98  
1         None         Norway              0171   3.96  
2         None        Belgium              1000   5.94  
3           AB         Canada           T6G 2C7   8.91  
4           MA            USA              2113  13.86  

Analyzing table: InvoiceLine

Schema of InvoiceLine:
- InvoiceLineId: int(None), Nullable: NO
- I


pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Foreign Keys of InvoiceLine:
- InvoiceId references Invoice(InvoiceId)
- TrackId references Track(TrackId)

Sample data from InvoiceLine:
   InvoiceLineId  InvoiceId  TrackId  UnitPrice  Quantity
0              1          1        2       0.99         1
1              2          1        4       0.99         1
2              3          2        6       0.99         1
3              4          2        8       0.99         1
4              5          2       10       0.99         1

Analyzing table: MediaType

Schema of MediaType:
- MediaTypeId: int(None), Nullable: NO
- Name: nvarchar(120), Nullable: YES
MediaType has 128 rows.
Primary Key(s) of MediaType: MediaTypeId



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



No foreign keys found for MediaType.

Sample data from MediaType:
   MediaTypeId                         Name
0            1              MPEG audio file
1            2     Protected AAC audio file
2            3  Protected MPEG-4 video file
3            4     Purchased AAC audio file
4            5               AAC audio file

Analyzing table: Playlist

Schema of Playlist:
- PlaylistId: int(None), Nullable: NO
- Name: nvarchar(120), Nullable: YES
Playlist has 223 rows.
Primary Key(s) of Playlist: PlaylistId



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



No foreign keys found for Playlist.

Sample data from Playlist:
   PlaylistId        Name
0           1       Music
1           2      Movies
2           3    TV Shows
3           4  Audiobooks
4           5  90’s Music

Analyzing table: PlaylistTrack

Schema of PlaylistTrack:
- PlaylistId: int(None), Nullable: NO
- TrackId: int(None), Nullable: NO
PlaylistTrack has 10581 rows.



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Primary Key(s) of PlaylistTrack: PlaylistId, TrackId
Foreign Keys of PlaylistTrack:
- PlaylistId references Playlist(PlaylistId)
- TrackId references Track(TrackId)

Sample data from PlaylistTrack:
   PlaylistId  TrackId
0           1     3402
1           1     3389
2           1     3390
3           1     3391
4           1     3392

Analyzing table: Track

Schema of Track:
- TrackId: int(None), Nullable: NO
- Name: nvarchar(200), Nullable: NO
- AlbumId: int(None), Nullable: YES
- MediaTypeId: int(None), Nullable: NO
- GenreId: int(None), Nullable: YES
- Composer: nvarchar(220), Nullable: YES
- Milliseconds: int(None), Nullable: NO
- Bytes: int(None), Nullable: YES
- UnitPrice: numeric(None), Nullable: NO
Track has 5554 rows.



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



Primary Key(s) of Track: TrackId
Foreign Keys of Track:
- AlbumId references Album(AlbumId)
- GenreId references Genre(GenreId)
- MediaTypeId references MediaType(MediaTypeId)

Sample data from Track:
   TrackId                                     Name  AlbumId  MediaTypeId  \
0        1  For Those About To Rock (We Salute You)        1            1   
1        2                        Balls to the Wall        2            2   
2        3                          Fast As a Shark        3            2   
3        4                        Restless and Wild        3            2   
4        5                     Princess of the Dawn        3            2   

   GenreId                                           Composer  Milliseconds  \
0        1          Angus Young, Malcolm Young, Brian Johnson        343719   
1        1  U. Dirkschneider, W. Hoffmann, H. Frank, P. Ba...        342562   
2        1  F. Baltes, S. Kaufman, U. Dirkscneider & W. Ho...        230619   
3        1  F. Balte


pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



No foreign keys found for sysdiagrams.

Sample data from sysdiagrams:
        name  principal_id  diagram_id  version  \
0  Diagram_0             1           1        1   

                                          definition  
0  b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00...  

--- Dimensional Modeling Steps ---

Step 1: Selecting the Business Process
The main business process is 'Sales Transactions'. We focus on customer purchases and sales of tracks.

Step 2: Declaring the Grain
We define the grain as 'One row per invoice line item (InvoiceLine)', representing each sold track.

Step 3: Identifying Dimensions
Based on the analysis, potential dimensions include:
- Date (from Invoice.InvoiceDate)
- Customer
- Employee (Support Rep)
- Track
- Album
- Artist
- Genre
- MediaType

Step 4: Identifying Facts
The measurable quantities (facts) include:
- Quantity (InvoiceLine.Quantity)
- Unit Price (InvoiceLine.UnitPrice)
- Total Amount (calculated as Quantity * Unit Price)

--- Proposed D


pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



In [53]:
import subprocess
import sys
import os
import tempfile

def schedule_task(task_name, notebook_path, time='02:00'):
    python_executable = sys.executable  # Path to the current Python interpreter

    # Ensure the notebook path is absolute
    notebook_path = os.path.abspath(notebook_path)

    # Construct the command to execute the notebook
    command_to_run = f'"{python_executable}" -m jupyter nbconvert --ExecutePreprocessor.timeout=-1 --to notebook --execute "{notebook_path}" --output "{notebook_path}"'

    # Create a batch file with the command
    batch_file_path = os.path.join(tempfile.gettempdir(), f'{task_name}.bat')

    with open(batch_file_path, 'w') as batch_file:
        batch_file.write(f'@echo off\n{command_to_run}\n')

    # Construct the command to create a scheduled task
    command = [
        'schtasks',
        '/Create',
        '/SC', 'DAILY',              # Schedule type: Daily
        '/TN', task_name,            # Task name
        '/TR', f'"{batch_file_path}"',  # Task to run
        '/ST', time,                 # Start time
        '/F'                         # Forcefully create the task and overwrite if it exists
    ]

    # Run the command
    try:
        output = subprocess.check_output(command, stderr=subprocess.STDOUT)
        print(f"Task '{task_name}' scheduled successfully to run daily at {time}.")
        print(f"Batch file created at {batch_file_path}")
    except subprocess.CalledProcessError as e:
        print(f"Failed to schedule task '{task_name}'.")
        print(e.output.decode())

def main():
    # Specify the Jupyter notebook filename
    notebook_filename = 'ChinookDW4.ipynb'  # Replace with your notebook filename
    notebook_path = os.path.abspath(notebook_filename)
    task_name = 'ChinookDW_ETL_Task'        # Name for the scheduled task
    run_time = '02:00'                      # Time to run the task daily (24-hour format)

    # Check if the notebook exists
    if not os.path.exists(notebook_path):
        print(f"Notebook not found at {notebook_path}. Please check the path and try again.")
        return

    schedule_task(task_name, notebook_path, run_time)

if __name__ == '__main__':
    main()


Task 'ChinookDW_ETL_Task' scheduled successfully to run daily at 02:00.
Batch file created at C:\Users\LENOVO\AppData\Local\Temp\ChinookDW_ETL_Task.bat


In [None]:
import subprocess
import os

def run_batch_file(batch_file_path):
    # Ensure the batch file path is absolute
    batch_file_path = os.path.abspath(batch_file_path)

    if not os.path.exists(batch_file_path):
        print(f"Batch file not found at {batch_file_path}")
        return

    print(f"Running batch file: {batch_file_path}")

    # Run the batch file and capture output
    try:
        output = subprocess.check_output(batch_file_path, stderr=subprocess.STDOUT, shell=True, universal_newlines=True)
        print("Batch file executed successfully.\n")
        print("Output from batch file execution:\n")
        print(output)
    except subprocess.CalledProcessError as e:
        print("Error during batch file execution:")
        print(e.output)

def main():
    # Specify the batch file path
    batch_file_path = os.path.join(os.environ['TEMP'], 'C:\\Users\\LENOVO\\AppData\\Local\\Temp\\ChinookDW_ETL_Task.bat')  # Adjust if stored elsewhere

    run_batch_file(batch_file_path)

if __name__ == '__main__':
    main()


Running batch file: C:\Users\LENOVO\AppData\Local\Temp\ChinookDW_ETL_Task.bat


In [None]:
import nbformat
from nbclient import NotebookClient
from nbclient.exceptions import CellExecutionError
import os

def run_notebook(notebook_path):
    # Ensure the notebook path is absolute
    notebook_path = os.path.abspath(notebook_path)

    if not os.path.exists(notebook_path):
        print(f"Notebook not found at {notebook_path}")
        return

    print(f"Running notebook: {notebook_path}")

    # Load the notebook
    with open(notebook_path, 'r', encoding='utf-8') as f:
        nb = nbformat.read(f, as_version=4)

    # Create a notebook client
    client = NotebookClient(nb, timeout=-1, kernel_name='python3')

    try:
        # Execute the notebook
        client.execute()
        print("Notebook executed successfully.\n")
    except CellExecutionError as e:
        print("Error during notebook execution:")
        print(str(e))
        return

    # List all outputs
    print("Listing all outputs generated during notebook execution:\n")
    for cell in nb.cells:
        if 'outputs' in cell:
            for output in cell.outputs:
                if output.output_type == 'stream':
                    print(output.text)
                elif output.output_type == 'execute_result':
                    print(output.data.get('text/plain', ''))
                elif output.output_type == 'error':
                    print('Error:', ''.join(output.traceback))
    print("Notebook output listing completed.")

def main():
    # Specify the Jupyter notebook filename
    notebook_filename = 'ChinookDW4.ipynb'  # Replace with your notebook filename
    notebook_path = os.path.abspath(notebook_filename)

    run_notebook(notebook_path)

if __name__ == '__main__':
    main()


In [5]:
import pyodbc
from faker import Faker
import random
from datetime import datetime, timedelta

def connect_to_db(server, database):
    conn = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};'
        f'SERVER={server};'
        f'DATABASE={database};'
        'Trusted_Connection=yes;'
    )
    return conn

def truncate_string(value, max_length):
    if value is None:
        return None
    return value[:max_length]

def insert_artists(cursor, num_records):
    print(f"Inserting {num_records} artists...")
    faker = Faker()
    artist_ids = []
    for _ in range(num_records):
        name = truncate_string(faker.name(), 120)  # NVARCHAR(120)
        cursor.execute("SELECT MAX(ArtistId) FROM Artist")
        max_id_row = cursor.fetchone()
        artist_id = (max_id_row[0] or 0) + 1
        cursor.execute("INSERT INTO Artist (ArtistId, Name) VALUES (?, ?)", artist_id, name)
        artist_ids.append(artist_id)
    return artist_ids

def insert_albums(cursor, artist_ids, num_records):
    print(f"Inserting {num_records} albums...")
    faker = Faker()
    album_ids = []
    for _ in range(num_records):
        title = truncate_string(faker.sentence(nb_words=3), 160)  # NVARCHAR(160)
        artist_id = random.choice(artist_ids)
        cursor.execute("SELECT MAX(AlbumId) FROM Album")
        max_id_row = cursor.fetchone()
        album_id = (max_id_row[0] or 0) + 1
        cursor.execute("INSERT INTO Album (AlbumId, Title, ArtistId) VALUES (?, ?, ?)", album_id, title, artist_id)
        album_ids.append(album_id)
    return album_ids

def insert_genres(cursor, num_records):
    print(f"Inserting {num_records} genres...")
    faker = Faker()
    genre_ids = []
    for _ in range(num_records):
        name = truncate_string(faker.word().capitalize(), 120)  # NVARCHAR(120)
        cursor.execute("SELECT MAX(GenreId) FROM Genre")
        max_id_row = cursor.fetchone()
        genre_id = (max_id_row[0] or 0) + 1
        cursor.execute("INSERT INTO Genre (GenreId, Name) VALUES (?, ?)", genre_id, name)
        genre_ids.append(genre_id)
    return genre_ids

def insert_mediatypes(cursor, num_records):
    print(f"Inserting {num_records} media types...")
    faker = Faker()
    mediatype_ids = []
    for _ in range(num_records):
        name = truncate_string(faker.word().capitalize(), 120)  # NVARCHAR(120)
        cursor.execute("SELECT MAX(MediaTypeId) FROM MediaType")
        max_id_row = cursor.fetchone()
        mediatype_id = (max_id_row[0] or 0) + 1
        cursor.execute("INSERT INTO MediaType (MediaTypeId, Name) VALUES (?, ?)", mediatype_id, name)
        mediatype_ids.append(mediatype_id)
    return mediatype_ids

def insert_tracks(cursor, album_ids, genre_ids, mediatype_ids, num_records):
    print(f"Inserting {num_records} tracks...")
    faker = Faker()
    track_ids = []
    for _ in range(num_records):
        name = truncate_string(faker.sentence(nb_words=4), 200)  # NVARCHAR(200)
        album_id = random.choice(album_ids)
        mediatype_id = random.choice(mediatype_ids)
        genre_id = random.choice(genre_ids)
        composer = truncate_string(faker.name(), 220)  # NVARCHAR(220)
        milliseconds = random.randint(60000, 300000)
        bytes_size = milliseconds * random.randint(50, 150)
        unit_price = round(random.uniform(0.99, 1.99), 2)
        cursor.execute("SELECT MAX(TrackId) FROM Track")
        max_id_row = cursor.fetchone()
        track_id = (max_id_row[0] or 0) + 1
        cursor.execute("""
            INSERT INTO Track (TrackId, Name, AlbumId, MediaTypeId, GenreId, Composer, Milliseconds, Bytes, UnitPrice)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, track_id, name, album_id, mediatype_id, genre_id, composer, milliseconds, bytes_size, unit_price)
        track_ids.append(track_id)
    return track_ids

def insert_employees(cursor, num_records):
    print(f"Inserting {num_records} employees...")
    faker = Faker()
    employee_ids = []
    for _ in range(num_records):
        first_name = truncate_string(faker.first_name(), 20)  # NVARCHAR(20)
        last_name = truncate_string(faker.last_name(), 20)    # NVARCHAR(20)
        title = truncate_string(random.choice(['Sales Manager', 'Sales Support Agent', 'IT Manager', 'IT Staff']), 30)  # NVARCHAR(30)
        reports_to = None  # For simplicity
        birth_date = faker.date_between(start_date='-60y', end_date='-25y')
        hire_date = faker.date_between(start_date='-20y', end_date='today')
        address = truncate_string(faker.street_address(), 70)  # NVARCHAR(70)
        city = truncate_string(faker.city(), 40)               # NVARCHAR(40)
        state = truncate_string(faker.state(), 40)             # NVARCHAR(40)
        country = truncate_string(faker.country(), 40)         # NVARCHAR(40)
        postal_code = truncate_string(faker.postcode(), 10)    # NVARCHAR(10)
        phone = truncate_string(faker.phone_number(), 24)      # NVARCHAR(24)
        fax = truncate_string(faker.phone_number(), 24)        # NVARCHAR(24)
        email = truncate_string(faker.email(), 60)             # NVARCHAR(60)
        cursor.execute("SELECT MAX(EmployeeId) FROM Employee")
        max_id_row = cursor.fetchone()
        employee_id = (max_id_row[0] or 0) + 1
        cursor.execute("""
            INSERT INTO Employee (EmployeeId, LastName, FirstName, Title, ReportsTo, BirthDate, HireDate,
                                  Address, City, State, Country, PostalCode, Phone, Fax, Email)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, employee_id, last_name, first_name, title, reports_to, birth_date, hire_date,
             address, city, state, country, postal_code, phone, fax, email)
        employee_ids.append(employee_id)
    return employee_ids

def insert_customers(cursor, employee_ids, num_records):
    print(f"Inserting {num_records} customers...")
    faker = Faker()
    customer_ids = []
    for _ in range(num_records):
        first_name = truncate_string(faker.first_name(), 40)  # NVARCHAR(40)
        last_name = truncate_string(faker.last_name(), 20)    # NVARCHAR(20)
        company = truncate_string(faker.company(), 80)        # NVARCHAR(80)
        address = truncate_string(faker.street_address(), 70) # NVARCHAR(70)
        city = truncate_string(faker.city(), 40)              # NVARCHAR(40)
        state = truncate_string(faker.state(), 40)            # NVARCHAR(40)
        country = truncate_string(faker.country(), 40)        # NVARCHAR(40)
        postal_code = truncate_string(faker.postcode(), 10)   # NVARCHAR(10)
        phone = truncate_string(faker.phone_number(), 24)     # NVARCHAR(24)
        fax = truncate_string(faker.phone_number(), 24)       # NVARCHAR(24)
        email = truncate_string(faker.email(), 60)            # NVARCHAR(60)
        support_rep_id = random.choice(employee_ids) if employee_ids else None
        cursor.execute("SELECT MAX(CustomerId) FROM Customer")
        max_id_row = cursor.fetchone()
        customer_id = (max_id_row[0] or 0) + 1
        cursor.execute("""
            INSERT INTO Customer (CustomerId, FirstName, LastName, Company, Address, City, State,
                                  Country, PostalCode, Phone, Fax, Email, SupportRepId)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, customer_id, first_name, last_name, company, address, city, state,
             country, postal_code, phone, fax, email, support_rep_id)
        customer_ids.append(customer_id)
    return customer_ids

def insert_invoices(cursor, customer_ids, num_records):
    print(f"Inserting {num_records} invoices...")
    faker = Faker()
    invoice_ids = []
    for _ in range(num_records):
        customer_id = random.choice(customer_ids)
        invoice_date = faker.date_between(start_date='-5y', end_date='today')
        billing_address = truncate_string(faker.street_address(), 70)  # NVARCHAR(70)
        billing_city = truncate_string(faker.city(), 40)               # NVARCHAR(40)
        billing_state = truncate_string(faker.state(), 40)             # NVARCHAR(40)
        billing_country = truncate_string(faker.country(), 40)         # NVARCHAR(40)
        billing_postal_code = truncate_string(faker.postcode(), 10)    # NVARCHAR(10)
        total = round(random.uniform(10.00, 500.00), 2)
        cursor.execute("SELECT MAX(InvoiceId) FROM Invoice")
        max_id_row = cursor.fetchone()
        invoice_id = (max_id_row[0] or 0) + 1
        cursor.execute("""
            INSERT INTO Invoice (InvoiceId, CustomerId, InvoiceDate, BillingAddress, BillingCity,
                                 BillingState, BillingCountry, BillingPostalCode, Total)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, invoice_id, customer_id, invoice_date, billing_address, billing_city,
             billing_state, billing_country, billing_postal_code, total)
        invoice_ids.append(invoice_id)
    return invoice_ids

def insert_invoicelines(cursor, invoice_ids, track_ids, num_records):
    print(f"Inserting {num_records} invoice lines...")
    invoice_line_ids = []
    for _ in range(num_records):
        invoice_id = random.choice(invoice_ids)
        track_id = random.choice(track_ids)
        unit_price = round(random.uniform(0.99, 1.99), 2)
        quantity = random.randint(1, 5)
        cursor.execute("SELECT MAX(InvoiceLineId) FROM InvoiceLine")
        max_id_row = cursor.fetchone()
        invoice_line_id = (max_id_row[0] or 0) + 1
        cursor.execute("""
            INSERT INTO InvoiceLine (InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity)
            VALUES (?, ?, ?, ?, ?)
        """, invoice_line_id, invoice_id, track_id, unit_price, quantity)
        invoice_line_ids.append(invoice_line_id)
    return invoice_line_ids

def insert_playlists(cursor, num_records):
    print(f"Inserting {num_records} playlists...")
    faker = Faker()
    playlist_ids = []
    for _ in range(num_records):
        name = truncate_string(faker.sentence(nb_words=2), 120)  # NVARCHAR(120)
        cursor.execute("SELECT MAX(PlaylistId) FROM Playlist")
        max_id_row = cursor.fetchone()
        playlist_id = (max_id_row[0] or 0) + 1
        cursor.execute("INSERT INTO Playlist (PlaylistId, Name) VALUES (?, ?)", playlist_id, name)
        playlist_ids.append(playlist_id)
    return playlist_ids

def insert_playlisttracks(cursor, playlist_ids, track_ids, num_records):
    print(f"Inserting {num_records} playlist tracks...")
    for _ in range(num_records):
        playlist_id = random.choice(playlist_ids)
        track_id = random.choice(track_ids)
        cursor.execute("""
            SELECT COUNT(*) FROM PlaylistTrack WHERE PlaylistId = ? AND TrackId = ?
        """, playlist_id, track_id)
        exists = cursor.fetchone()[0]
        if not exists:
            cursor.execute("""
                INSERT INTO PlaylistTrack (PlaylistId, TrackId)
                VALUES (?, ?)
            """, playlist_id, track_id)

def main():
    # Database connection parameters
    source_server = 'DPC2023'  # Replace with your server name
    source_database = 'Chinook'
    
    # Connect to source database
    source_conn = connect_to_db(source_server, source_database)
    source_cursor = source_conn.cursor()
    
    # Start transaction
    source_conn.autocommit = False
    try:
        # Insert data
        artist_ids = insert_artists(source_cursor, num_records=10)
        genre_ids = insert_genres(source_cursor, num_records=5)
        mediatype_ids = insert_mediatypes(source_cursor, num_records=3)
        album_ids = insert_albums(source_cursor, artist_ids, num_records=20)
        track_ids = insert_tracks(source_cursor, album_ids, genre_ids, mediatype_ids, num_records=50)
        employee_ids = insert_employees(source_cursor, num_records=5)
        customer_ids = insert_customers(source_cursor, employee_ids, num_records=15)
        invoice_ids = insert_invoices(source_cursor, customer_ids, num_records=30)
        insert_invoicelines(source_cursor, invoice_ids, track_ids, num_records=100)
        playlist_ids = insert_playlists(source_cursor, num_records=5)
        insert_playlisttracks(source_cursor, playlist_ids, track_ids, num_records=50)
        
        # Commit transaction
        source_conn.commit()
        print("Data insertion completed successfully.")
    except Exception as e:
        # Rollback transaction on error
        source_conn.rollback()
        print(f"An error occurred: {e}")
    finally:
        # Close connections
        source_cursor.close()
        source_conn.close()

if __name__ == "__main__":
    main()


Inserting 10 artists...
Inserting 5 genres...
Inserting 3 media types...
Inserting 20 albums...
Inserting 50 tracks...
Inserting 5 employees...
Inserting 15 customers...
Inserting 30 invoices...
Inserting 100 invoice lines...
Inserting 5 playlists...
Inserting 50 playlist tracks...
Data insertion completed successfully.


In [6]:
import pyodbc
import re
from datetime import datetime

def connect_to_db(server, database):
    conn = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};'
        f'SERVER={server};'
        f'DATABASE={database};'
        'Trusted_Connection=yes;'
    )
    return conn

def create_staging_tables(target_cursor, target_conn):
    print("Creating staging tables if they do not exist...")
    staging_tables_sql = [
        """
        IF OBJECT_ID('stg_Artist', 'U') IS NULL
        CREATE TABLE stg_Artist (
            ArtistId INT PRIMARY KEY,
            Name NVARCHAR(120)
        )
        """,
        """
        IF OBJECT_ID('stg_Album', 'U') IS NULL
        CREATE TABLE stg_Album (
            AlbumId INT PRIMARY KEY,
            Title NVARCHAR(160),
            ArtistId INT
        )
        """,
        """
        IF OBJECT_ID('stg_Genre', 'U') IS NULL
        CREATE TABLE stg_Genre (
            GenreId INT PRIMARY KEY,
            Name NVARCHAR(120)
        )
        """,
        """
        IF OBJECT_ID('stg_MediaType', 'U') IS NULL
        CREATE TABLE stg_MediaType (
            MediaTypeId INT PRIMARY KEY,
            Name NVARCHAR(120)
        )
        """,
        """
        IF OBJECT_ID('stg_Track', 'U') IS NULL
        CREATE TABLE stg_Track (
            TrackId INT PRIMARY KEY,
            Name NVARCHAR(200),
            AlbumId INT,
            MediaTypeId INT,
            GenreId INT,
            Composer NVARCHAR(220),
            Milliseconds INT,
            Bytes INT,
            UnitPrice NUMERIC(10,2)
        )
        """,
        """
        IF OBJECT_ID('stg_Employee', 'U') IS NULL
        CREATE TABLE stg_Employee (
            EmployeeId INT PRIMARY KEY,
            LastName NVARCHAR(20),
            FirstName NVARCHAR(20),
            Title NVARCHAR(30),
            ReportsTo INT,
            BirthDate DATETIME,
            HireDate DATETIME,
            Address NVARCHAR(70),
            City NVARCHAR(40),
            State NVARCHAR(40),
            Country NVARCHAR(40),
            PostalCode NVARCHAR(10),
            Phone NVARCHAR(24),
            Fax NVARCHAR(24),
            Email NVARCHAR(60)
        )
        """,
        """
        IF OBJECT_ID('stg_Customer', 'U') IS NULL
        CREATE TABLE stg_Customer (
            CustomerId INT PRIMARY KEY,
            FirstName NVARCHAR(40),
            LastName NVARCHAR(20),
            Company NVARCHAR(80),
            Address NVARCHAR(70),
            City NVARCHAR(40),
            State NVARCHAR(40),
            Country NVARCHAR(40),
            PostalCode NVARCHAR(10),
            Phone NVARCHAR(24),
            Fax NVARCHAR(24),
            Email NVARCHAR(60),
            SupportRepId INT
        )
        """,
        """
        IF OBJECT_ID('stg_Invoice', 'U') IS NULL
        CREATE TABLE stg_Invoice (
            InvoiceId INT PRIMARY KEY,
            CustomerId INT,
            InvoiceDate DATETIME,
            BillingAddress NVARCHAR(70),
            BillingCity NVARCHAR(40),
            BillingState NVARCHAR(40),
            BillingCountry NVARCHAR(40),
            BillingPostalCode NVARCHAR(10),
            Total NUMERIC(10,2)
        )
        """,
        """
        IF OBJECT_ID('stg_InvoiceLine', 'U') IS NULL
        CREATE TABLE stg_InvoiceLine (
            InvoiceLineId INT PRIMARY KEY,
            InvoiceId INT,
            TrackId INT,
            UnitPrice NUMERIC(10,2),
            Quantity INT
        )
        """
    ]
    for sql in staging_tables_sql:
        target_cursor.execute(sql)
    target_conn.commit()
    print("Staging tables created or verified.")

def truncate_staging_tables(target_cursor, target_conn):
    print("Truncating staging tables...")
    tables = ['stg_Artist', 'stg_Album', 'stg_Genre', 'stg_MediaType', 'stg_Track', 'stg_Employee', 'stg_Customer', 'stg_Invoice', 'stg_InvoiceLine']
    for table in tables:
        target_cursor.execute(f"TRUNCATE TABLE {table}")
    target_conn.commit()
    print("Staging tables truncated.")

def preprocess_artist(source_cursor, target_cursor):
    print("Preprocessing Artist data...")
    source_cursor.execute("SELECT ArtistId, Name FROM Artist")
    rows = source_cursor.fetchall()
    for row in rows:
        # Data Cleaning: Standardize names
        name = row.Name.strip().title() if row.Name else None
        target_cursor.execute("INSERT INTO stg_Artist (ArtistId, Name) VALUES (?, ?)", row.ArtistId, name)
    print("Artist data preprocessed and loaded into staging.")

def preprocess_album(source_cursor, target_cursor):
    print("Preprocessing Album data...")
    source_cursor.execute("SELECT AlbumId, Title, ArtistId FROM Album")
    rows = source_cursor.fetchall()
    for row in rows:
        title = row.Title.strip().title() if row.Title else None
        target_cursor.execute("INSERT INTO stg_Album (AlbumId, Title, ArtistId) VALUES (?, ?, ?)", row.AlbumId, title, row.ArtistId)
    print("Album data preprocessed and loaded into staging.")

def preprocess_genre(source_cursor, target_cursor):
    print("Preprocessing Genre data...")
    source_cursor.execute("SELECT GenreId, Name FROM Genre")
    rows = source_cursor.fetchall()
    for row in rows:
        name = row.Name.strip().title() if row.Name else None
        target_cursor.execute("INSERT INTO stg_Genre (GenreId, Name) VALUES (?, ?)", row.GenreId, name)
    print("Genre data preprocessed and loaded into staging.")

def preprocess_mediatype(source_cursor, target_cursor):
    print("Preprocessing MediaType data...")
    source_cursor.execute("SELECT MediaTypeId, Name FROM MediaType")
    rows = source_cursor.fetchall()
    for row in rows:
        name = row.Name.strip().title() if row.Name else None
        target_cursor.execute("INSERT INTO stg_MediaType (MediaTypeId, Name) VALUES (?, ?)", row.MediaTypeId, name)
    print("MediaType data preprocessed and loaded into staging.")

def preprocess_track(source_cursor, target_cursor):
    print("Preprocessing Track data...")
    source_cursor.execute("SELECT TrackId, Name, AlbumId, MediaTypeId, GenreId, Composer, Milliseconds, Bytes, UnitPrice FROM Track")
    rows = source_cursor.fetchall()
    for row in rows:
        name = row.Name.strip().title() if row.Name else None
        composer = row.Composer.strip().title() if row.Composer else None
        target_cursor.execute("""
            INSERT INTO stg_Track (TrackId, Name, AlbumId, MediaTypeId, GenreId, Composer, Milliseconds, Bytes, UnitPrice)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, row.TrackId, name, row.AlbumId, row.MediaTypeId, row.GenreId, composer, row.Milliseconds, row.Bytes, row.UnitPrice)
    print("Track data preprocessed and loaded into staging.")

def preprocess_employee(source_cursor, target_cursor):
    print("Preprocessing Employee data...")
    source_cursor.execute("""
        SELECT EmployeeId, LastName, FirstName, Title, ReportsTo, BirthDate, HireDate,
               Address, City, State, Country, PostalCode, Phone, Fax, Email
        FROM Employee
    """)
    rows = source_cursor.fetchall()
    for row in rows:
        # Data Cleaning: Standardize names and addresses
        last_name = row.LastName.strip().title() if row.LastName else None
        first_name = row.FirstName.strip().title() if row.FirstName else None
        title = row.Title.strip().title() if row.Title else None
        address = row.Address.strip().title() if row.Address else None
        city = row.City.strip().title() if row.City else None
        state = row.State.strip().title() if row.State else None
        country = row.Country.strip().title() if row.Country else None
        email = row.Email.strip().lower() if row.Email else None
        # Data Transformation: Validate email format
        email = email if re.match(r"[^@]+@[^@]+\.[^@]+", email) else None
        target_cursor.execute("""
            INSERT INTO stg_Employee (EmployeeId, LastName, FirstName, Title, ReportsTo, BirthDate, HireDate,
                                      Address, City, State, Country, PostalCode, Phone, Fax, Email)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, row.EmployeeId, last_name, first_name, title, row.ReportsTo, row.BirthDate, row.HireDate,
             address, city, state, country, row.PostalCode, row.Phone, row.Fax, email)
    print("Employee data preprocessed and loaded into staging.")

def preprocess_customer(source_cursor, target_cursor):
    print("Preprocessing Customer data...")
    source_cursor.execute("""
        SELECT CustomerId, FirstName, LastName, Company, Address, City, State, Country, PostalCode,
               Phone, Fax, Email, SupportRepId
        FROM Customer
    """)
    rows = source_cursor.fetchall()
    for row in rows:
        first_name = row.FirstName.strip().title() if row.FirstName else None
        last_name = row.LastName.strip().title() if row.LastName else None
        company = row.Company.strip().title() if row.Company else None
        address = row.Address.strip().title() if row.Address else None
        city = row.City.strip().title() if row.City else None
        state = row.State.strip().title() if row.State else None
        country = row.Country.strip().title() if row.Country else None
        email = row.Email.strip().lower() if row.Email else None
        # Data Transformation: Validate email format
        email = email if re.match(r"[^@]+@[^@]+\.[^@]+", email) else None
        target_cursor.execute("""
            INSERT INTO stg_Customer (CustomerId, FirstName, LastName, Company, Address, City, State,
                                      Country, PostalCode, Phone, Fax, Email, SupportRepId)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, row.CustomerId, first_name, last_name, company, address, city, state,
             country, row.PostalCode, row.Phone, row.Fax, email, row.SupportRepId)
    print("Customer data preprocessed and loaded into staging.")

def preprocess_invoice(source_cursor, target_cursor):
    print("Preprocessing Invoice data...")
    source_cursor.execute("""
        SELECT InvoiceId, CustomerId, InvoiceDate, BillingAddress, BillingCity, BillingState,
               BillingCountry, BillingPostalCode, Total
        FROM Invoice
    """)
    rows = source_cursor.fetchall()
    for row in rows:
        billing_address = row.BillingAddress.strip().title() if row.BillingAddress else None
        billing_city = row.BillingCity.strip().title() if row.BillingCity else None
        billing_state = row.BillingState.strip().title() if row.BillingState else None
        billing_country = row.BillingCountry.strip().title() if row.BillingCountry else None
        target_cursor.execute("""
            INSERT INTO stg_Invoice (InvoiceId, CustomerId, InvoiceDate, BillingAddress, BillingCity,
                                     BillingState, BillingCountry, BillingPostalCode, Total)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, row.InvoiceId, row.CustomerId, row.InvoiceDate, billing_address, billing_city,
             billing_state, billing_country, row.BillingPostalCode, row.Total)
    print("Invoice data preprocessed and loaded into staging.")

def preprocess_invoiceline(source_cursor, target_cursor):
    print("Preprocessing InvoiceLine data...")
    source_cursor.execute("""
        SELECT InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity
        FROM InvoiceLine
    """)
    rows = source_cursor.fetchall()
    for row in rows:
        target_cursor.execute("""
            INSERT INTO stg_InvoiceLine (InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity)
            VALUES (?, ?, ?, ?, ?)
        """, row.InvoiceLineId, row.InvoiceId, row.TrackId, row.UnitPrice, row.Quantity)
    print("InvoiceLine data preprocessed and loaded into staging.")

def main():
    # Database connection parameters
    source_server = 'DPC2023'   # Replace with your source server name
    source_database = 'Chinook'
    target_server = 'DPC2023'   # Replace with your target server name
    target_database = 'ChinookDW4'

    # Connect to source and target databases
    source_conn = connect_to_db(source_server, source_database)
    target_conn = connect_to_db(target_server, target_database)

    source_cursor = source_conn.cursor()
    target_cursor = target_conn.cursor()

    # Create staging tables if they do not exist
    create_staging_tables(target_cursor, target_conn)

    # Truncate staging tables
    truncate_staging_tables(target_cursor, target_conn)

    # Preprocess and load data into staging tables
    preprocess_artist(source_cursor, target_cursor)
    preprocess_album(source_cursor, target_cursor)
    preprocess_genre(source_cursor, target_cursor)
    preprocess_mediatype(source_cursor, target_cursor)
    preprocess_track(source_cursor, target_cursor)
    preprocess_employee(source_cursor, target_cursor)
    preprocess_customer(source_cursor, target_cursor)
    preprocess_invoice(source_cursor, target_cursor)
    preprocess_invoiceline(source_cursor, target_cursor)

    # Commit changes
    target_conn.commit()

    # Close connections
    source_cursor.close()
    target_cursor.close()
    source_conn.close()
    target_conn.close()
    print("Data preprocessing completed successfully.")

if __name__ == "__main__":
    main()


Creating staging tables if they do not exist...
Staging tables created or verified.
Truncating staging tables...
Staging tables truncated.
Preprocessing Artist data...
Artist data preprocessed and loaded into staging.
Preprocessing Album data...
Album data preprocessed and loaded into staging.
Preprocessing Genre data...
Genre data preprocessed and loaded into staging.
Preprocessing MediaType data...
MediaType data preprocessed and loaded into staging.
Preprocessing Track data...
Track data preprocessed and loaded into staging.
Preprocessing Employee data...
Employee data preprocessed and loaded into staging.
Preprocessing Customer data...
Customer data preprocessed and loaded into staging.
Preprocessing Invoice data...
Invoice data preprocessed and loaded into staging.
Preprocessing InvoiceLine data...
InvoiceLine data preprocessed and loaded into staging.
Data preprocessing completed successfully.


In [7]:
import pyodbc
from datetime import datetime


import logging

logging.basicConfig(filename='etl_log.log', level=logging.INFO, 
                    format='%(asctime)s %(levelname)s:%(message)s')

logging.info('ETL process started.')

def truncate_tables(target_cursor, target_conn):
    print("Deleting data from Dimension and Fact Tables...")
    tables = ['FactSales', 'DimCustomer', 'DimEmployee', 'DimDate', 'DimTrack', 'DimMediaType', 'DimGenre', 'DimAlbum', 'DimArtist']
    for table in tables:
        target_cursor.execute(f"DELETE FROM {table}")
        print(f"Data deleted from table {table}.")
    target_conn.commit()

def load_dim_artist(source_cursor, target_cursor, target_conn):
    print("Loading DimArtist...")
    # Fetch existing ArtistIds
    target_cursor.execute("SELECT ArtistId FROM DimArtist")
    existing_artist_ids = set(row.ArtistId for row in target_cursor.fetchall())

    source_cursor.execute("SELECT ArtistId, Name FROM Artist")
    rows = source_cursor.fetchall()
    new_rows = [row for row in rows if row.ArtistId not in existing_artist_ids]

    for row in new_rows:
        target_cursor.execute("INSERT INTO DimArtist (ArtistId, Name) VALUES (?, ?)", row.ArtistId, row.Name)
    target_conn.commit()
    print(f"DimArtist loaded. {len(new_rows)} new records inserted.")

def load_dim_album(source_cursor, target_cursor, target_conn):
    print("Loading DimAlbum...")
    # Fetch existing AlbumIds
    target_cursor.execute("SELECT AlbumId FROM DimAlbum")
    existing_album_ids = set(row.AlbumId for row in target_cursor.fetchall())

    source_cursor.execute("SELECT AlbumId, Title, ArtistId FROM Album")
    rows = source_cursor.fetchall()
    new_rows = [row for row in rows if row.AlbumId not in existing_album_ids]

    for row in new_rows:
        target_cursor.execute("SELECT ArtistKey FROM DimArtist WHERE ArtistId = ?", row.ArtistId)
        artist_key_row = target_cursor.fetchone()
        artist_key = artist_key_row.ArtistKey if artist_key_row else None
        target_cursor.execute("INSERT INTO DimAlbum (AlbumId, Title, ArtistKey) VALUES (?, ?, ?)", row.AlbumId, row.Title, artist_key)
    target_conn.commit()
    print(f"DimAlbum loaded. {len(new_rows)} new records inserted.")

def load_dim_genre(source_cursor, target_cursor, target_conn):
    print("Loading DimGenre...")
    # Fetch existing GenreIds
    target_cursor.execute("SELECT GenreId FROM DimGenre")
    existing_genre_ids = set(row.GenreId for row in target_cursor.fetchall())

    source_cursor.execute("SELECT GenreId, Name FROM Genre")
    rows = source_cursor.fetchall()
    new_rows = [row for row in rows if row.GenreId not in existing_genre_ids]

    for row in new_rows:
        target_cursor.execute("INSERT INTO DimGenre (GenreId, Name) VALUES (?, ?)", row.GenreId, row.Name)
    target_conn.commit()
    print(f"DimGenre loaded. {len(new_rows)} new records inserted.")

def load_dim_mediatype(source_cursor, target_cursor, target_conn):
    print("Loading DimMediaType...")
    # Fetch existing MediaTypeIds
    target_cursor.execute("SELECT MediaTypeId FROM DimMediaType")
    existing_mediatype_ids = set(row.MediaTypeId for row in target_cursor.fetchall())

    source_cursor.execute("SELECT MediaTypeId, Name FROM MediaType")
    rows = source_cursor.fetchall()
    new_rows = [row for row in rows if row.MediaTypeId not in existing_mediatype_ids]

    for row in new_rows:
        target_cursor.execute("INSERT INTO DimMediaType (MediaTypeId, Name) VALUES (?, ?)", row.MediaTypeId, row.Name)
    target_conn.commit()
    print(f"DimMediaType loaded. {len(new_rows)} new records inserted.")

def load_dim_track(source_cursor, target_cursor, target_conn):
    print("Loading DimTrack...")
    # Fetch existing TrackIds
    target_cursor.execute("SELECT TrackId FROM DimTrack")
    existing_track_ids = set(row.TrackId for row in target_cursor.fetchall())

    source_cursor.execute("SELECT TrackId, Name, AlbumId, MediaTypeId, GenreId, Composer, Milliseconds, Bytes FROM Track")
    rows = source_cursor.fetchall()
    new_rows = [row for row in rows if row.TrackId not in existing_track_ids]

    for row in new_rows:
        # Get AlbumKey
        target_cursor.execute("SELECT AlbumKey FROM DimAlbum WHERE AlbumId = ?", row.AlbumId)
        album_key_row = target_cursor.fetchone()
        album_key = album_key_row.AlbumKey if album_key_row else None

        # Get MediaTypeKey
        target_cursor.execute("SELECT MediaTypeKey FROM DimMediaType WHERE MediaTypeId = ?", row.MediaTypeId)
        mediatype_key_row = target_cursor.fetchone()
        mediatype_key = mediatype_key_row.MediaTypeKey if mediatype_key_row else None

        # Get GenreKey
        target_cursor.execute("SELECT GenreKey FROM DimGenre WHERE GenreId = ?", row.GenreId)
        genre_key_row = target_cursor.fetchone()
        genre_key = genre_key_row.GenreKey if genre_key_row else None

        target_cursor.execute("""
            INSERT INTO DimTrack (TrackId, Name, AlbumKey, MediaTypeKey, GenreKey, Composer, Milliseconds, Bytes)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, row.TrackId, row.Name, album_key, mediatype_key, genre_key, row.Composer, row.Milliseconds, row.Bytes)
    target_conn.commit()
    print(f"DimTrack loaded. {len(new_rows)} new records inserted.")

def load_dim_employee(source_cursor, target_cursor, target_conn):
    print("Loading DimEmployee...")
    # Fetch existing EmployeeIds
    target_cursor.execute("SELECT EmployeeId FROM DimEmployee")
    existing_employee_ids = set(row.EmployeeId for row in target_cursor.fetchall())

    source_cursor.execute("SELECT EmployeeId, FirstName, LastName, Title, ReportsTo, HireDate FROM Employee")
    rows = source_cursor.fetchall()
    new_rows = [row for row in rows if row.EmployeeId not in existing_employee_ids]

    for row in new_rows:
        target_cursor.execute("""
            INSERT INTO DimEmployee (EmployeeId, FirstName, LastName, Title, ReportsTo, HireDate)
            VALUES (?, ?, ?, ?, ?, ?)
        """, row.EmployeeId, row.FirstName, row.LastName, row.Title, row.ReportsTo, row.HireDate)
    target_conn.commit()
    print(f"DimEmployee loaded. {len(new_rows)} new records inserted.")

def load_dim_customer(source_cursor, target_cursor, target_conn):
    print("Loading DimCustomer...")
    # Fetch existing CustomerIds
    target_cursor.execute("SELECT CustomerId FROM DimCustomer")
    existing_customer_ids = set(row.CustomerId for row in target_cursor.fetchall())

    source_cursor.execute("""
        SELECT CustomerId, FirstName, LastName, Company, Address, City, State, Country, PostalCode
        FROM Customer
    """)
    rows = source_cursor.fetchall()
    new_rows = [row for row in rows if row.CustomerId not in existing_customer_ids]

    for row in new_rows:
        target_cursor.execute("""
            INSERT INTO DimCustomer (CustomerId, FirstName, LastName, Company, Address, City, State, Country, PostalCode)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, row.CustomerId, row.FirstName, row.LastName, row.Company, row.Address, row.City, row.State, row.Country, row.PostalCode)
    target_conn.commit()
    print(f"DimCustomer loaded. {len(new_rows)} new records inserted.")

def load_dim_date(source_cursor, target_cursor, target_conn):
    print("Loading DimDate...")
    # Fetch existing Dates
    target_cursor.execute("SELECT Date FROM DimDate")
    existing_dates = set(row.Date for row in target_cursor.fetchall())

    source_cursor.execute("SELECT DISTINCT CAST(InvoiceDate AS DATE) AS Date FROM Invoice")
    rows = source_cursor.fetchall()
    new_rows = [row for row in rows if row.Date not in existing_dates]

    for row in new_rows:
        date_value = row.Date
        day = date_value.day
        month = date_value.month
        year = date_value.year
        quarter = (month - 1) // 3 + 1
        target_cursor.execute("""
            INSERT INTO DimDate (Date, Day, Month, Year, Quarter)
            VALUES (?, ?, ?, ?, ?)
        """, date_value, day, month, year, quarter)
    target_conn.commit()
    print(f"DimDate loaded. {len(new_rows)} new records inserted.")

def build_mappings(target_cursor):
    print("Building mappings from natural keys to surrogate keys...")
    mappings = {}

    # ArtistId to ArtistKey
    target_cursor.execute("SELECT ArtistId, ArtistKey FROM DimArtist")
    mappings['Artist'] = {row.ArtistId: row.ArtistKey for row in target_cursor.fetchall()}

    # AlbumId to AlbumKey
    target_cursor.execute("SELECT AlbumId, AlbumKey FROM DimAlbum")
    mappings['Album'] = {row.AlbumId: row.AlbumKey for row in target_cursor.fetchall()}

    # GenreId to GenreKey
    target_cursor.execute("SELECT GenreId, GenreKey FROM DimGenre")
    mappings['Genre'] = {row.GenreId: row.GenreKey for row in target_cursor.fetchall()}

    # MediaTypeId to MediaTypeKey
    target_cursor.execute("SELECT MediaTypeId, MediaTypeKey FROM DimMediaType")
    mappings['MediaType'] = {row.MediaTypeId: row.MediaTypeKey for row in target_cursor.fetchall()}

    # TrackId to TrackKey
    target_cursor.execute("SELECT TrackId, TrackKey FROM DimTrack")
    mappings['Track'] = {row.TrackId: row.TrackKey for row in target_cursor.fetchall()}

    # Date to DateKey
    target_cursor.execute("SELECT Date, DateKey FROM DimDate")
    mappings['Date'] = {row.Date: row.DateKey for row in target_cursor.fetchall()}

    # CustomerId to CustomerKey
    target_cursor.execute("SELECT CustomerId, CustomerKey FROM DimCustomer")
    mappings['Customer'] = {row.CustomerId: row.CustomerKey for row in target_cursor.fetchall()}

    # EmployeeId to EmployeeKey
    target_cursor.execute("SELECT EmployeeId, EmployeeKey FROM DimEmployee")
    mappings['Employee'] = {row.EmployeeId: row.EmployeeKey for row in target_cursor.fetchall()}

    print("Mappings built.")
    return mappings

def load_fact_sales(source_cursor, target_cursor, target_conn, mappings):
    print("Loading FactSales...")
    # Fetch existing InvoiceLineIds
    target_cursor.execute("SELECT InvoiceLineId FROM FactSales")
    existing_invoice_line_ids = set(row.InvoiceLineId for row in target_cursor.fetchall())

    source_cursor.execute("""
    SELECT il.InvoiceLineId, il.InvoiceId, il.TrackId, il.Quantity, il.UnitPrice,
           i.InvoiceDate, i.CustomerId,
           t.AlbumId, t.GenreId, t.MediaTypeId,
           c.SupportRepId
    FROM InvoiceLine il
    JOIN Invoice i ON il.InvoiceId = i.InvoiceId
    JOIN Track t ON il.TrackId = t.TrackId
    JOIN Customer c ON i.CustomerId = c.CustomerId
    """)
    rows = source_cursor.fetchall()
    new_rows = [row for row in rows if row.InvoiceLineId not in existing_invoice_line_ids]

    for row in new_rows:
        InvoiceLineId = row.InvoiceLineId
        InvoiceDate = row.InvoiceDate.date()
        DateKey = mappings['Date'].get(InvoiceDate, None)
        CustomerKey = mappings['Customer'].get(row.CustomerId, None)
        TrackKey = mappings['Track'].get(row.TrackId, None)
        AlbumKey = mappings['Album'].get(row.AlbumId, None)
        GenreKey = mappings['Genre'].get(row.GenreId, None)
        MediaTypeKey = mappings['MediaType'].get(row.MediaTypeId, None)
        EmployeeKey = mappings['Employee'].get(row.SupportRepId, None)
        Quantity = row.Quantity
        UnitPrice = row.UnitPrice
        TotalAmount = Quantity * UnitPrice

        target_cursor.execute("""
            INSERT INTO FactSales (InvoiceLineId, DateKey, CustomerKey, TrackKey, AlbumKey, GenreKey, MediaTypeKey, EmployeeKey, Quantity, UnitPrice, TotalAmount)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, InvoiceLineId, DateKey, CustomerKey, TrackKey, AlbumKey, GenreKey, MediaTypeKey, EmployeeKey, Quantity, UnitPrice, TotalAmount)
    target_conn.commit()
    print(f"FactSales loaded. {len(new_rows)} new records inserted.")

def main():
    # Database connection parameters
    source_server = 'DPC2023'
    source_database = 'Chinook'
    target_server = 'DPC2023'
    target_database = 'ChinookDW4'

    # Connect to source and target databases
    source_conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+source_server+';DATABASE='+source_database+';Trusted_Connection=yes;')
    target_conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+target_server+';DATABASE='+target_database+';Trusted_Connection=yes;')

    source_cursor = source_conn.cursor()
    target_cursor = target_conn.cursor()

    # Prompt user to reset DW
    reset_dw = input("Do you want to reset the Data Warehouse? (yes/no): ").lower()
    if reset_dw == 'yes':
        truncate_tables(target_cursor, target_conn)

    # Load dimension tables
    load_dim_artist(source_cursor, target_cursor, target_conn)
    load_dim_album(source_cursor, target_cursor, target_conn)
    load_dim_genre(source_cursor, target_cursor, target_conn)
    load_dim_mediatype(source_cursor, target_cursor, target_conn)
    load_dim_track(source_cursor, target_cursor, target_conn)
    load_dim_employee(source_cursor, target_cursor, target_conn)
    load_dim_customer(source_cursor, target_cursor, target_conn)
    load_dim_date(source_cursor, target_cursor, target_conn)

    # Build mappings
    mappings = build_mappings(target_cursor)

    # Load FactSales
    load_fact_sales(source_cursor, target_cursor, target_conn, mappings)

    # Close connections
    source_cursor.close()
    target_cursor.close()
    source_conn.close()
    target_conn.close()
    print("ETL process completed.")

if __name__ == "__main__":
    main()
    logging.info('ETL process completed successfully.')


Loading DimArtist...
DimArtist loaded. 600 new records inserted.
Loading DimAlbum...
DimAlbum loaded. 1200 new records inserted.
Loading DimGenre...
DimGenre loaded. 300 new records inserted.
Loading DimMediaType...
DimMediaType loaded. 180 new records inserted.
Loading DimTrack...
DimTrack loaded. 3000 new records inserted.
Loading DimEmployee...
DimEmployee loaded. 300 new records inserted.
Loading DimCustomer...
DimCustomer loaded. 900 new records inserted.
Loading DimDate...
DimDate loaded. 857 new records inserted.
Building mappings from natural keys to surrogate keys...
Mappings built.
Loading FactSales...
FactSales loaded. 6000 new records inserted.
ETL process completed.


In [7]:
!pip install pandas plotly

Defaulting to user installation because normal site-packages is not writeable


In [8]:
import pyodbc
import pandas as pd
import plotly.express as px

def main():
    # Database connection parameters
    server = 'DPC2023'  # Replace with your server name
    database = 'ChinookDW4'
    
    # Connect to the database
    conn = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};'
        f'SERVER={server};'
        f'DATABASE={database};'
        'Trusted_Connection=yes;'
    )
    
    # Define the SQL query
    sql_query = '''
    SELECT 
        DimAlbum.Title AS AlbumTitle,
        SUM(FactSales.Quantity) AS QuantitySold,
        SUM(FactSales.TotalAmount) AS Revenue
    FROM 
        FactSales
    JOIN 
        DimAlbum ON FactSales.AlbumKey = DimAlbum.AlbumKey
    JOIN 
        DimDate ON FactSales.DateKey = DimDate.DateKey
    WHERE 
        DimDate.Year = 2023  -- Specify the desired year or date range here
    GROUP BY 
        DimAlbum.Title
    ORDER BY 
        Revenue DESC;  -- Or use `QuantitySold DESC` to sort by quantity sold
    '''
    
    # Fetch data into a pandas DataFrame
    df = pd.read_sql(sql_query, conn)
    
    # Close the connection
    conn.close()
    
    # Check if data is available
    if df.empty:
        print("No data available for the specified year.")
        return
    
    # Data visualization using Plotly
    fig = px.bar(
        df, 
        x='Revenue', 
        y='AlbumTitle', 
        orientation='h',
        title='Album Revenue in 2023',
        hover_data=['QuantitySold'],
        labels={'AlbumTitle': 'Album Title', 'Revenue': 'Revenue (USD)'},
        template='plotly_dark'
    )
    fig.update_layout(
        yaxis={'categoryorder':'total ascending'},
        height=600,
    )
    fig.show()

if __name__ == '__main__':
    main()


  df = pd.read_sql(sql_query, conn)


In [9]:
import pyodbc
import pandas as pd
import plotly.express as px

def visualize_customer_spending():
    # Database connection parameters
    server = 'DPC2023'  # Replace with your server name
    database = 'ChinookDW4'
    
    # Connect to the database
    conn = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};'
        f'SERVER={server};'
        f'DATABASE={database};'
        'Trusted_Connection=yes;'
    )
    
    # Define the SQL query
    sql_query = '''
    SELECT 
        DimCustomer.Country AS Region,
        COUNT(DISTINCT FactSales.CustomerKey) AS NumberOfCustomers,
        SUM(FactSales.TotalAmount) AS TotalSpending,
        AVG(FactSales.TotalAmount) AS AverageOrderValue
    FROM 
        FactSales
    JOIN 
        DimCustomer ON FactSales.CustomerKey = DimCustomer.CustomerKey
    GROUP BY 
        DimCustomer.Country
    ORDER BY 
        TotalSpending DESC;
    '''
    
    # Fetch data into a pandas DataFrame
    df = pd.read_sql(sql_query, conn)
    
    # Close the connection
    conn.close()
    
    # Check if data is available
    if df.empty:
        print("No data available.")
        return
    
    # Data visualization using Plotly
    # Bar chart of Total Spending by Region
    fig = px.bar(
        df, 
        x='Region', 
        y='TotalSpending', 
        title='Total Customer Spending by Region',
        hover_data=['NumberOfCustomers', 'AverageOrderValue'],
        labels={'TotalSpending': 'Total Spending (USD)', 'Region': 'Region'},
        template='plotly_white'
    )
    fig.update_layout(
        xaxis_tickangle=-45,
        height=600,
    )
    fig.show()
    
    # Optional: Pie chart of Number of Customers by Region
    fig_pie = px.pie(
        df, 
        names='Region', 
        values='NumberOfCustomers',
        title='Number of Customers by Region',
        hover_data=['TotalSpending', 'AverageOrderValue'],
        template='plotly_white'
    )
    fig_pie.update_traces(textposition='inside', textinfo='percent+label')
    fig_pie.show()

if __name__ == '__main__':
    visualize_customer_spending()



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



In [10]:
import pyodbc
import pandas as pd
import plotly.express as px

def visualize_top_tracks():
    # Database connection parameters
    server = 'DPC2023'  # Replace with your server name
    database = 'ChinookDW4'
    
    # Connect to the database
    conn = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};'
        f'SERVER={server};'
        f'DATABASE={database};'
        'Trusted_Connection=yes;'
    )
    
    # Define the SQL query
    sql_query = '''
    SELECT 
        DimTrack.Name AS TrackName,
        SUM(FactSales.Quantity) AS QuantitySold,
        SUM(FactSales.TotalAmount) AS Revenue
    FROM 
        FactSales
    JOIN 
        DimTrack ON FactSales.TrackKey = DimTrack.TrackKey
    JOIN 
        DimDate ON FactSales.DateKey = DimDate.DateKey
    WHERE 
        DimDate.Year = 2023  -- Specify the desired year or range
    GROUP BY 
        DimTrack.Name
    ORDER BY 
        Revenue DESC;  -- Or use `QuantitySold DESC` for top quantity
    '''
    
    # Fetch data into a pandas DataFrame
    df = pd.read_sql(sql_query, conn)
    
    # Close the connection
    conn.close()
    
    # Check if data is available
    if df.empty:
        print("No data available for the specified year.")
        return
    
    # Limit to Top 20 Tracks for better visualization
    df_top = df.head(20)
    
    # Data visualization using Plotly
    fig = px.bar(
        df_top, 
        x='Revenue', 
        y='TrackName', 
        orientation='h',
        title='Top 20 Tracks by Revenue in 2023',
        hover_data=['QuantitySold'],
        labels={'TrackName': 'Track Name', 'Revenue': 'Revenue (USD)'},
        template='plotly_dark'
    )
    fig.update_layout(
        yaxis={'categoryorder':'total ascending'},
        height=700,
    )
    fig.show()

if __name__ == '__main__':
    visualize_top_tracks()



pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.



In [None]:
import pyodbc
import sys

def connect_to_db(server, database):
    """Establishes a connection to the SQL Server database."""
    try:
        conn = pyodbc.connect(
            'DRIVER={ODBC Driver 17 for SQL Server};'
            f'SERVER={server};'
            f'DATABASE={database};'
            'Trusted_Connection=yes;'
        )
        return conn
    except pyodbc.Error as e:
        print("Error connecting to database:", e)
        sys.exit(1)

def get_dimensions(cursor):
    """Retrieves a list of dimension tables in the database."""
    cursor.execute("""
    SELECT TABLE_NAME
    FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_NAME LIKE 'Dim%'
    """)
    dimensions = [row.TABLE_NAME for row in cursor.fetchall()]
    return dimensions

def prompt_user_action(dimensions):
    """Prompts the user to choose normalization or denormalization."""
    print("Available dimensions:")
    for idx, dim in enumerate(dimensions, start=1):
        print(f"{idx}. {dim}")

    action = input("\nDo you want to (1) Normalize or (2) Denormalize dimensions? Enter 1 or 2: ")
    while action not in ['1', '2']:
        action = input("Invalid input. Enter 1 for Normalize or 2 for Denormalize: ")

    if action == '1':
        # Normalization
        print("\nNormalization selected.")
        dim_to_normalize = input("Enter the number of the dimension you want to normalize: ")
        try:
            dim_index = int(dim_to_normalize) - 1
            if dim_index < 0 or dim_index >= len(dimensions):
                raise ValueError
            selected_dim = dimensions[dim_index]
        except ValueError:
            print("Invalid selection.")
            sys.exit(1)
        new_dim_name = input("Enter the name of the new dimension to create from normalization: ").strip()
        return 'normalize', selected_dim, new_dim_name
    else:
        # Denormalization
        print("\nDenormalization selected.")
        dims_to_denormalize = input("Enter the numbers of the dimensions you want to denormalize (separated by commas): ")
        try:
            indices = [int(i.strip()) - 1 for i in dims_to_denormalize.split(',')]
            if any(idx < 0 or idx >= len(dimensions) for idx in indices):
                raise ValueError
            selected_dims = [dimensions[idx] for idx in indices]
        except ValueError:
            print("Invalid selection.")
            sys.exit(1)
        new_dim_name = input("Enter the name of the new dimension to create from denormalization: ").strip()
        return 'denormalize', selected_dims, new_dim_name

def validate_action(action, dims, cursor):
    """Validates if the normalization or denormalization makes sense."""
    if action == 'normalize':
        # Check if the dimension has attributes that can be moved to a new dimension
        dim = dims
        cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{dim}'")
        columns = [row.COLUMN_NAME for row in cursor.fetchall()]
        if len(columns) <= 2:
            print(f"Dimension {dim} cannot be normalized further.")
            return False
        return True
    else:
        # Check if dimensions can be combined
        dims_to_combine = dims
        # Ensure dimensions have no conflicting columns
        all_columns = {}
        for dim in dims_to_combine:
            cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{dim}'")
            columns = [row.COLUMN_NAME for row in cursor.fetchall()]
            for col in columns:
                if col in all_columns:
                    print(f"Conflicting column '{col}' found in dimensions. Cannot denormalize.")
                    return False
                all_columns[col] = dim
        return True

def perform_normalization(cursor, conn, dim_to_normalize, new_dim_name):
    """Performs normalization by creating a new dimension and updating relationships."""
    # Example: Split DimTrack into DimTrack and DimGenre
    # For simplicity, we'll assume the last column is the one to normalize out
    cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{dim_to_normalize}'")
    columns = [row.COLUMN_NAME for row in cursor.fetchall()]
    print(f"Columns in {dim_to_normalize}: {columns}")

    # Prompt user to select columns to move to the new dimension
    print("\nSelect the columns to move to the new dimension:")
    for idx, col in enumerate(columns, start=1):
        print(f"{idx}. {col}")
    cols_to_move_input = input("Enter the numbers of the columns to move (separated by commas): ")
    try:
        indices = [int(i.strip()) - 1 for i in cols_to_move_input.split(',')]
        if any(idx < 0 or idx >= len(columns) for idx in indices):
            raise ValueError
        cols_to_move = [columns[idx] for idx in indices]
    except ValueError:
        print("Invalid selection.")
        sys.exit(1)

    # Proceed with normalization
    cols_to_keep = [col for col in columns if col not in cols_to_move]
    print(f"Columns to keep in {dim_to_normalize}: {cols_to_keep}")
    print(f"Columns to move to {new_dim_name}: {cols_to_move}")

    # Create the new dimension table
    try:
        # Create new dimension table
        create_table_sql = f"SELECT DISTINCT {', '.join(cols_to_move)} INTO {new_dim_name} FROM {dim_to_normalize}"
        cursor.execute(f"IF OBJECT_ID('{new_dim_name}', 'U') IS NOT NULL DROP TABLE {new_dim_name}")
        cursor.execute(create_table_sql)
        conn.commit()
        print(f"Dimension {new_dim_name} created successfully.")
    except pyodbc.Error as e:
        print("Error creating new dimension:", e)
        conn.rollback()
        sys.exit(1)

    # Update the original dimension table
    try:
        # Add foreign key column
        fk_column = f"{new_dim_name}ID"
        cursor.execute(f"ALTER TABLE {dim_to_normalize} ADD {fk_column} INT")
        conn.commit()
        print(f"Foreign key column {fk_column} added to {dim_to_normalize}.")

        # Update foreign key values
        update_fk_sql = f"""
        UPDATE {dim_to_normalize}
        SET {fk_column} = nd.ID
        FROM {new_dim_name} nd
        WHERE {" AND ".join([f"{dim_to_normalize}.{col} = nd.{col}" for col in cols_to_move])}
        """
        cursor.execute(update_fk_sql)
        conn.commit()
        print(f"Foreign key values in {dim_to_normalize} updated.")

        # Remove moved columns from original dimension
        for col in cols_to_move:
            cursor.execute(f"ALTER TABLE {dim_to_normalize} DROP COLUMN {col}")
        conn.commit()
        print(f"Columns {', '.join(cols_to_move)} removed from {dim_to_normalize}.")
    except pyodbc.Error as e:
        print("Error updating original dimension:", e)
        conn.rollback()
        sys.exit(1)

    # Update Fact table foreign keys if necessary (not implemented here)
    # Additional steps would be required to update related fact tables and constraints.

def perform_denormalization(cursor, conn, dims_to_denormalize, new_dim_name):
    """Performs denormalization by combining dimensions into a new dimension."""
    # Get all columns from the dimensions
    all_columns = []
    for dim in dims_to_denormalize:
        cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{dim}'")
        cols = [f"{dim}.{row.COLUMN_NAME}" for row in cursor.fetchall()]
        all_columns.extend(cols)

    # Create the new dimension table
    try:
        join_conditions = []
        for i in range(len(dims_to_denormalize) - 1):
            dim1 = dims_to_denormalize[i]
            dim2 = dims_to_denormalize[i + 1]
            # Assuming the dimensions have a common key for joining
            join_condition = f"{dim1}.CommonKey = {dim2}.CommonKey"
            join_conditions.append(join_condition)

        from_clause = ' JOIN '.join([dim for dim in dims_to_denormalize])
        if join_conditions:
            from_clause += ' ON ' + ' AND '.join(join_conditions)

        create_table_sql = f"SELECT DISTINCT {', '.join(all_columns)} INTO {new_dim_name} FROM {from_clause}"
        cursor.execute(f"IF OBJECT_ID('{new_dim_name}', 'U') IS NOT NULL DROP TABLE {new_dim_name}")
        cursor.execute(create_table_sql)
        conn.commit()
        print(f"Dimension {new_dim_name} created successfully by denormalizing {', '.join(dims_to_denormalize)}.")
    except pyodbc.Error as e:
        print("Error creating new dimension:", e)
        conn.rollback()
        sys.exit(1)

    # Update Fact table foreign keys if necessary (not implemented here)
    # Additional steps would be required to update related fact tables and constraints.

def main():
    # Database connection parameters
    server = 'DPC2023'  # Replace with your server name
    database = 'ChinookDW4'

    # Connect to the database
    conn = connect_to_db(server, database)
    cursor = conn.cursor()

    # Retrieve dimensions
    dimensions = get_dimensions(cursor)
    if not dimensions:
        print("No dimension tables found in the database.")
        sys.exit(1)

    # Prompt user for action
    action, dims, new_dim_name = prompt_user_action(dimensions)

    # Validate action
    is_valid = validate_action(action, dims, cursor)
    if not is_valid:
        print("The requested operation is not valid. Exiting.")
        sys.exit(1)

    # Perform action
    if action == 'normalize':
        perform_normalization(cursor, conn, dims, new_dim_name)
    else:
        perform_denormalization(cursor, conn, dims, new_dim_name)

    # Close connections
    cursor.close()
    conn.close()
    print("\nSchema modification completed successfully.")

if __name__ == '__main__':
    main()


In [24]:
!pip install dash plotly

Defaulting to user installation because normal site-packages is not writeable
Collecting dash
  Downloading dash-2.18.2-py3-none-any.whl (7.8 MB)
     ---------------------------------------- 7.8/7.8 MB 2.8 MB/s eta 0:00:00
Collecting dash-table==5.0.0
  Downloading dash_table-5.0.0-py3-none-any.whl (3.9 kB)
Collecting dash-html-components==2.0.0
  Downloading dash_html_components-2.0.0-py3-none-any.whl (4.1 kB)
Collecting dash-core-components==2.0.0
  Downloading dash_core_components-2.0.0-py3-none-any.whl (3.8 kB)
Collecting retrying
  Downloading retrying-1.3.4-py3-none-any.whl (11 kB)
Installing collected packages: dash-table, dash-html-components, dash-core-components, retrying, dash
Successfully installed dash-2.18.2 dash-core-components-2.0.0 dash-html-components-2.0.0 dash-table-5.0.0 retrying-1.3.4




In [2]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from sqlalchemy.engine import URL

def create_db_engine(server, database):
    """
    Creates a SQLAlchemy engine for connecting to the SQL Server database.
    """
    try:
        # Define the connection string using URL.create for better handling
        connection_url = URL.create(
            "mssql+pyodbc",
            username="",  # If using Windows Authentication, leave blank
            password="",  # If using Windows Authentication, leave blank
            host=server,
            database=database,
            query={
                "driver": "ODBC Driver 17 for SQL Server",
                "trusted_connection": "yes"
            }
        )
        engine = create_engine(connection_url, fast_executemany=True)
        # Test the connection
        with engine.connect() as conn:
            conn.execute("SELECT 1")
        print(f"Successfully connected to '{database}' on server '{server}'.")
        return engine
    except Exception as e:
        print(f"Error connecting to '{database}' on server '{server}': {e}")
        raise

def fetch_table(engine, query, table_name):
    """
    Fetches data from the database using the provided SQL query.
    """
    try:
        df = pd.read_sql(query, engine)
        print(f"Fetched data for table '{table_name}' using query: {query[:50]}...")
        print(f"Columns in '{table_name}': {list(df.columns)}\n")
        print(f"Sample data from '{table_name}':")
        print(df.head())
        print("\n")
        return df
    except Exception as e:
        print(f"Error fetching data for table '{table_name}': {e}")
        raise

def build_olap_cube(fact_sales, dim_date, dim_customer, dim_track, dim_album, dim_artist, dim_genre, dim_mediatype, dim_employee):
    """
    Builds an OLAP cube using pandas pivot tables.
    """
    try:
        # List of required keys and their corresponding DataFrames
        required_keys = {
            'DateKey': dim_date,
            'CustomerKey': dim_customer,
            'TrackKey': dim_track,
            'AlbumKey': dim_album,
            'ArtistKey': dim_artist,
            'GenreKey': dim_genre,
            'MediaTypeKey': dim_mediatype,
            'EmployeeKey': dim_employee
        }

        # Verify that all required keys exist in the respective DataFrames
        missing_columns = []
        for key, df in required_keys.items():
            if key not in df.columns:
                missing_columns.append((key, df))

        if missing_columns:
            for key, df in missing_columns:
                print(f"Error: Column '{key}' is missing in the corresponding DataFrame.")
            raise KeyError("One or more required columns are missing.")

        # Ensure 'AlbumKey' is of the same data type in both DataFrames
        fact_sales['AlbumKey'] = pd.to_numeric(fact_sales['AlbumKey'], errors='coerce')
        dim_album['AlbumKey'] = pd.to_numeric(dim_album['AlbumKey'], errors='coerce')

        # Check for nulls after type conversion
        if fact_sales['AlbumKey'].isnull().any():
            num_nulls = fact_sales['AlbumKey'].isnull().sum()
            print(f"Warning: 'AlbumKey' in FactSales contains {num_nulls} null values. These rows will be excluded.")
            fact_sales = fact_sales.dropna(subset=['AlbumKey'])

        if dim_album['AlbumKey'].isnull().any():
            num_nulls = dim_album['AlbumKey'].isnull().sum()
            print(f"Warning: 'AlbumKey' in DimAlbum contains {num_nulls} null values. These rows will be excluded.")
            dim_album = dim_album.dropna(subset=['AlbumKey'])

        # Convert 'AlbumKey' to integer type
        fact_sales['AlbumKey'] = fact_sales['AlbumKey'].astype(int)
        dim_album['AlbumKey'] = dim_album['AlbumKey'].astype(int)

        # Verify 'AlbumKey' is present after handling
        if 'AlbumKey' not in fact_sales.columns:
            print("Error: 'AlbumKey' is missing in FactSales after processing.")
            raise KeyError("'AlbumKey' is missing in FactSales after processing.")

        if 'AlbumKey' not in dim_album.columns:
            print("Error: 'AlbumKey' is missing in DimAlbum.")
            raise KeyError("'AlbumKey' is missing in DimAlbum.")

        # Merge fact table with DimDate
        df = fact_sales.merge(dim_date, how='left', on='DateKey')
        print("Merged with DimDate.")
        print(f"Columns after merging with DimDate: {list(df.columns)}\n")

        # Merge with DimCustomer
        df = df.merge(dim_customer, how='left', on='CustomerKey')
        print("Merged with DimCustomer.")
        print(f"Columns after merging with DimCustomer: {list(df.columns)}\n")

        # Merge with DimTrack
        df = df.merge(dim_track, how='left', on='TrackKey', suffixes=('', '_dim_track'))
        print("Merged with DimTrack.")
        print(f"Columns after merging with DimTrack: {list(df.columns)}\n")

        # Merge with DimAlbum
        df = df.merge(dim_album, how='left', on='AlbumKey', suffixes=('', '_dim_album'))
        print("Merged with DimAlbum.")
        print(f"Columns after merging with DimAlbum: {list(df.columns)}\n")

        # Merge with DimArtist
        df = df.merge(dim_artist, how='left', on='ArtistKey')
        print("Merged with DimArtist.")
        print(f"Columns after merging with DimArtist: {list(df.columns)}\n")

        # Merge with DimGenre
        df = df.merge(dim_genre, how='left', on='GenreKey')
        print("Merged with DimGenre.")
        print(f"Columns after merging with DimGenre: {list(df.columns)}\n")

        # Merge with DimMediaType
        df = df.merge(dim_mediatype, how='left', on='MediaTypeKey')
        print("Merged with DimMediaType.")
        print(f"Columns after merging with DimMediaType: {list(df.columns)}\n")

        # Merge with DimEmployee
        df = df.merge(dim_employee, how='left', on='EmployeeKey')
        print("Merged with DimEmployee.")
        print(f"Columns after merging with DimEmployee: {list(df.columns)}\n")

        print("Successfully merged fact and dimension tables.")

        # Check if 'TotalAmount' exists
        if 'TotalAmount' not in df.columns:
            print("Error: Column 'TotalAmount' is missing in the merged DataFrame.")
            raise KeyError("Column 'TotalAmount' is missing.")

        # Create a pivot table (OLAP cube)
        cube = pd.pivot_table(
            df,
            index=['Year', 'Quarter', 'Month', 'Day'],
            columns=['GenreName', 'MediaTypeName', 'ArtistName'],
            values='TotalAmount',
            aggfunc=np.sum,
            fill_value=0
        )
        print("OLAP cube created successfully.")
        return cube, df
    except KeyError as ke:
        print(f"Key Error: {ke}")
        raise
    except Exception as e:
        print(f"Error building OLAP cube: {e}")
        raise

def perform_olap_operations(cube, df):
    """
    Demonstrates various OLAP operations on the cube.
    """
    try:
        # Example 1: Slice - Total sales for Genre 'Rock' across all Media Types and Artists for Year 2023
        print("\nPerforming Slice Operation: Total Rock Sales in 2023")
        if 2023 not in cube.index.get_level_values('Year'):
            print("Year 2023 not found in the data.")
        else:
            try:
                rock_sales_2023 = cube.loc[2023].xs('Rock', level='GenreName', axis=1).sum(axis=1)
                print("Total Rock Sales in 2023:")
                print(rock_sales_2023)
            except KeyError:
                print("Genre 'Rock' not found in the data.")

        # Example 2: Dice - Total sales for Genre 'Rock' and 'Jazz' in Q1 and Q2 of 2023
        print("\nPerforming Dice Operation: Total Rock and Jazz Sales in Q1 and Q2 of 2023")
        genres = ['Rock', 'Jazz']
        quarters = [1, 2]
        dice_sales = df[
            (df['GenreName'].isin(genres)) &
            (df['Year'] == 2023) &
            (df['Quarter'].isin(quarters))
        ].groupby(['GenreName', 'Quarter'])['TotalAmount'].sum()
        if dice_sales.empty:
            print("No sales data found for the specified genres and quarters.")
        else:
            print("Total Rock and Jazz Sales in Q1 and Q2 of 2023:")
            print(dice_sales)

        # Example 3: Drill Down - Monthly sales for Genre 'Rock' in 2023
        print("\nPerforming Drill Down Operation: Monthly Rock Sales in 2023")
        rock_monthly_sales = df[
            (df['GenreName'] == 'Rock') &
            (df['Year'] == 2023)
        ].groupby(['Month'])['TotalAmount'].sum()
        if rock_monthly_sales.empty:
            print("No Rock sales data found for 2023.")
        else:
            print("Monthly Rock Sales in 2023:")
            print(rock_monthly_sales)

        # Example 4: Roll Up - Quarterly sales for all genres in 2023
        print("\nPerforming Roll Up Operation: Quarterly Sales in 2023")
        quarterly_sales = df[
            (df['Year'] == 2023)
        ].groupby(['Quarter'])['TotalAmount'].sum()
        if quarterly_sales.empty:
            print("No sales data found for 2023.")
        else:
            print("Quarterly Sales in 2023:")
            print(quarterly_sales)

    except Exception as e:
        print(f"Error performing OLAP operations: {e}")
        raise

def main():
    # Database connection parameters
    source_server = 'DPC2023'  # Replace with your server name
    target_database = 'ChinookDW4'  # Replace with your target database name

    try:
        # Create SQLAlchemy engine
        engine = create_db_engine(source_server, target_database)

        # Define SQL queries to fetch dimension and fact tables
        queries = {
            'FactSales': "SELECT * FROM FactSales",
            'DimDate': "SELECT * FROM DimDate",
            'DimCustomer': "SELECT * FROM DimCustomer",
            'DimTrack': "SELECT * FROM DimTrack",
            'DimAlbum': "SELECT * FROM DimAlbum",
            'DimArtist': "SELECT ArtistKey, Name AS ArtistName FROM DimArtist",
            'DimGenre': "SELECT GenreKey, Name AS GenreName FROM DimGenre",
            'DimMediaType': "SELECT MediaTypeKey, Name AS MediaTypeName FROM DimMediaType",
            'DimEmployee': "SELECT EmployeeKey, FirstName, LastName FROM DimEmployee"
        }

        # Fetch all tables with diagnostic print statements
        print("\nFetching FactSales Table...")
        fact_sales = fetch_table(engine, queries['FactSales'], 'FactSales')

        print("Fetching DimDate Table...")
        dim_date = fetch_table(engine, queries['DimDate'], 'DimDate')

        print("Fetching DimCustomer Table...")
        dim_customer = fetch_table(engine, queries['DimCustomer'], 'DimCustomer')

        print("Fetching DimTrack Table...")
        dim_track = fetch_table(engine, queries['DimTrack'], 'DimTrack')

        print("Fetching DimAlbum Table...")
        dim_album = fetch_table(engine, queries['DimAlbum'], 'DimAlbum')

        print("Fetching DimArtist Table...")
        dim_artist = fetch_table(engine, queries['DimArtist'], 'DimArtist')

        print("Fetching DimGenre Table...")
        dim_genre = fetch_table(engine, queries['DimGenre'], 'DimGenre')

        print("Fetching DimMediaType Table...")
        dim_mediatype = fetch_table(engine, queries['DimMediaType'], 'DimMediaType')

        print("Fetching DimEmployee Table...")
        dim_employee = fetch_table(engine, queries['DimEmployee'], 'DimEmployee')

        print("\nAll tables fetched successfully.")

        # Build OLAP cube
        cube, merged_df = build_olap_cube(
            fact_sales,
            dim_date,
            dim_customer,
            dim_track,
            dim_album,
            dim_artist,
            dim_genre,
            dim_mediatype,
            dim_employee
        )

        # Save the cube to a CSV file (optional)
        try:
            cube.to_csv('olap_cube.csv')
            print("OLAP cube saved to 'olap_cube.csv'.")
        except Exception as e:
            print(f"Error saving OLAP cube to CSV: {e}")

        # Perform OLAP operations
        perform_olap_operations(cube, merged_df)

        print("\nOLAP process completed successfully.")

    except Exception as e:
        print(f"An error occurred in the OLAP process: {e}")
        print("Please check the connection parameters and ensure that the database is accessible.")

if __name__ == "__main__":
    main()


Successfully connected to 'ChinookDW4' on server 'DPC2023'.

Fetching FactSales Table...
Fetched data for table 'FactSales' using query: SELECT * FROM FactSales...
Columns in 'FactSales': ['SalesKey', 'InvoiceLineId', 'DateKey', 'CustomerKey', 'TrackKey', 'AlbumKey', 'GenreKey', 'MediaTypeKey', 'EmployeeKey', 'Quantity', 'UnitPrice', 'TotalAmount']

Sample data from 'FactSales':
   SalesKey  InvoiceLineId  DateKey  CustomerKey  TrackKey  AlbumKey  \
0     20261            579     3322         2393     31580      3146   
1     20262              1     3224         2348     31581      3147   
2     20263           1154     3420         2379     31581      3147   
3     20264           1728     3516         2359     31582      3148   
4     20265              2     3224         2348     31583      3148   

   GenreKey  MediaTypeKey  EmployeeKey  Quantity  UnitPrice  TotalAmount  
0       231            49           82         1       0.99         0.99  
1       231            50          