In [1]:
import pandas as pd


csv_file_path = 'fact_sales_data.csv' 
df = pd.read_csv(csv_file_path)

insert_statements = []

for index, row in df.iterrows():
    insert_statement = f"""
    INSERT INTO fact_sales (transaction_id, customer_key, address_key, contact_key, payment_method_key, interaction_key, interaction_type_key, transaction_date_key, amount, created_at)
    VALUES ({row['transaction_id']}, {row['customer_key']}, {row['address_key']}, {row['contact_key']}, 
    {row['payment_method_key']}, {row['interaction_key']}, {row['interaction_type_key']}, 
    {row['transaction_date_key']}, {row['amount']}, '{row['created_at']}');
    """
    insert_statements.append(insert_statement)
with open("insert_statements.sql", "w") as file:
    for statement in insert_statements:
        file.write(statement)

print("Insert statements have been written to 'insert_statements.sql'")


Insert statements have been written to 'insert_statements.sql'


In [6]:
import pyodbc

csv_file_path = 'fact_sales_data.csvا
df = pd.read_csv(csv_file_path)


conn = pyodbc.connect('DRIVER={SQL Server};'
                      'SERVER=DESKTOP-67E98DV;'
                      'DATABASE=SalesDWH;'
                      'Trusted_Connection=yes;') 
cursor = conn.cursor()


cursor.execute("SELECT payment_method_key FROM dim_payment_method")
available_payment_keys = [row[0] for row in cursor.fetchall()]


invalid_keys = df[~df['payment_method_key'].isin(available_payment_keys)]

if not invalid_keys.empty:
    print("Found invalid payment_method_key values:")
    print(invalid_keys)
else:
    print("All payment_method_key values are valid.")


Found invalid payment_method_key values:
     transaction_id  customer_key  address_key  contact_key  \
0                 1            46           98           78   
1                 2            34          171          197   
2                 3           108           80           87   
3                 4            85           60          160   
4                 5           138          187           10   
..              ...           ...          ...          ...   
995             996           140           71           72   
996             997            85          178           83   
997             998            12          103           16   
998             999            85          119           63   
999            1000            51            3          130   

     payment_method_key  interaction_key  interaction_type_key  \
0                     8               92                     1   
1                     3               57                     4   
2   

In [16]:
import pyodbc

def connect_to_server(server_name, database_name):
    try:
        connection = pyodbc.connect(f'DRIVER={{SQL Server}};'
                                    f'SERVER={server_name};'
                                    f'DATABASE={database_name};'
                                    'Trusted_Connection=yes;')
        print(f"Connected to {database_name} on {server_name} successfully!")
        return connection
    except Exception as e:
        print(f"Failed to connect to {database_name} on {server_name}: {e}")
        return None


def fetch_data_from_db(connection, query):
    cursor = connection.cursor()
    cursor.execute(query)
    return cursor.fetchall()


def get_date_key(dwh_conn, transaction_date):
    query = f"SELECT date_key FROM dim_date WHERE full_date = '{transaction_date}'"
    cursor = dwh_conn.cursor()
    cursor.execute(query)
    result = cursor.fetchone()
    return result[0] if result else None


def customer_key_exists(dwh_conn, customer_key):
    query = f"SELECT 1 FROM dim_customer WHERE customer_key = ?"
    cursor = dwh_conn.cursor()
    cursor.execute(query, customer_key)
    result = cursor.fetchone()
    return result is not None


def transform_sales_data_with_check(dwh_conn, data):
    transformed_data = []
    for row in data:
        
        date_key = get_date_key(dwh_conn, row[2])  
        
       
        if customer_key_exists(dwh_conn, row[1]):  
            transformed_row = (
                row[0],  
                row[1], 
                None,    
                None,    
                row[4], 
                None, 
                None,  
                date_key,  
                row[3]   
            )
            transformed_data.append(transformed_row)
        else:
            print(f"CustomerID {row[1]} not found in dim_customer. Skipping this record.")
    return transformed_data

def load_data_to_dwh(connection, insert_query, data):
    cursor = connection.cursor()
    cursor.executemany(insert_query, data)
    connection.commit()
    print(f"{len(data)} rows inserted successfully!")


database_conn = connect_to_server('DESKTOP-67E98DV\\SQLEXPRESS', 'CustomerDataManagement') 
dwh_conn = connect_to_server('DESKTOP-67E98DV', 'SalesDWH')  

if database_conn and dwh_conn:

    sales_query = "SELECT TransactionID, CustomerID, TransactionDate, Amount, PaymentMethodID FROM Transactions"
    sales_data = fetch_data_from_db(database_conn, sales_query)

   
    transformed_sales_data = transform_sales_data_with_check(dwh_conn, sales_data)

  
    sales_insert_query = """
 x    INSERT INTO fact_sales (transaction_id, customer_key, address_key, contact_key, payment_method_key, interaction_key, interaction_type_key, transaction_date_key, amount)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """
    load_data_to_dwh(dwh_conn, sales_insert_query, transformed_sales_data)


    database_conn.close()
    dwh_conn.close()

else:
    print("Error in database connections. Please check your server details.")


Connected to CustomerDataManagement on DESKTOP-67E98DV\SQLEXPRESS successfully!
Connected to SalesDWH on DESKTOP-67E98DV successfully!
CustomerID 1 not found in dim_customer. Skipping this record.
CustomerID 2 not found in dim_customer. Skipping this record.
CustomerID 3 not found in dim_customer. Skipping this record.
CustomerID 4 not found in dim_customer. Skipping this record.
CustomerID 5 not found in dim_customer. Skipping this record.
CustomerID 6 not found in dim_customer. Skipping this record.
CustomerID 7 not found in dim_customer. Skipping this record.
CustomerID 8 not found in dim_customer. Skipping this record.
CustomerID 9 not found in dim_customer. Skipping this record.
CustomerID 10 not found in dim_customer. Skipping this record.
CustomerID 11 not found in dim_customer. Skipping this record.
CustomerID 12 not found in dim_customer. Skipping this record.
CustomerID 13 not found in dim_customer. Skipping this record.
CustomerID 14 not found in dim_customer. Skipping this 

ProgrammingError: The second parameter to executemany must not be empty.

In [27]:
import pyodbc
from datetime import datetime, timedelta


def create_connection(server, database):
    connection_string = f"""
    DRIVER={{ODBC Driver 17 for SQL Server}};
    SERVER={server};
    DATABASE={database};
    Trusted_Connection=yes;
    """
    try:
        connection = pyodbc.connect(connection_string)
        print(f"Connected to {database} on {server} successfully!")
        return connection
    except pyodbc.Error as e:
        print(f"Error while connecting to database: {e}")
        return None


def generate_date_data(start_date, end_date):
    date_data = []
    current_date = start_date
    while current_date <= end_date:
        date_key = int(current_date.strftime("%Y%m%d"))  
        date_data.append([
            date_key,
            current_date.strftime("%Y-%m-%d"),  
            current_date.year, 
            (current_date.month - 1) // 3 + 1,  
            current_date.month, 
            current_date.strftime("%B"),  
        ])
        current_date += timedelta(days=1)
    return date_data


def load_date_data_to_dwh(connection, date_data):
    cursor = connection.cursor()
    

    cursor.execute("SET IDENTITY_INSERT dim_date ON")
    
    date_insert_query = """
    INSERT INTO dim_date (date_key, full_date, year, quarter, month_num, month_name)
    VALUES (?, ?, ?, ?, ?, ?)
    """
    
    cursor.executemany(date_insert_query, date_data)
    connection.commit()  
    
    
    cursor.execute("SET IDENTITY_INSERT dim_date OFF")
    connection.commit()  
    
    print(f"{len(date_data)} rows inserted successfully into dim_date!")


def check_data_in_dim_date(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM dim_date")
    rows = cursor.fetchall()
    for row in rows:
        print(row)


if __name__ == "__main__":
    
    dwh_conn = create_connection('DESKTOP-67E98DV', 'SalesDWH')
    
    start_date = datetime(2020, 1, 1)
    end_date = datetime(2023, 12, 31)
    
    
    date_data = generate_date_data(start_date, end_date)
    
    load_date_data_to_dwh(dwh_conn, date_data)
    
    check_data_in_dim_date(dwh_conn)
    
    
    dwh_conn.close()


Connected to SalesDWH on DESKTOP-67E98DV successfully!
1461 rows inserted successfully into dim_date!
(20200101, datetime.date(2020, 1, 1), 2020, 1, 1, 'January')
(20200102, datetime.date(2020, 1, 2), 2020, 1, 1, 'January')
(20200103, datetime.date(2020, 1, 3), 2020, 1, 1, 'January')
(20200104, datetime.date(2020, 1, 4), 2020, 1, 1, 'January')
(20200105, datetime.date(2020, 1, 5), 2020, 1, 1, 'January')
(20200106, datetime.date(2020, 1, 6), 2020, 1, 1, 'January')
(20200107, datetime.date(2020, 1, 7), 2020, 1, 1, 'January')
(20200108, datetime.date(2020, 1, 8), 2020, 1, 1, 'January')
(20200109, datetime.date(2020, 1, 9), 2020, 1, 1, 'January')
(20200110, datetime.date(2020, 1, 10), 2020, 1, 1, 'January')
(20200111, datetime.date(2020, 1, 11), 2020, 1, 1, 'January')
(20200112, datetime.date(2020, 1, 12), 2020, 1, 1, 'January')
(20200113, datetime.date(2020, 1, 13), 2020, 1, 1, 'January')
(20200114, datetime.date(2020, 1, 14), 2020, 1, 1, 'January')
(20200115, datetime.date(2020, 1, 15), 

In [32]:
import pyodbc
from datetime import datetime, timedelta
import random

# إنشاء اتصال بقاعدة البيانات
def create_connection(server, database):
    connection_string = f"""
    DRIVER={{ODBC Driver 17 for SQL Server}};
    SERVER={server};
    DATABASE={database};
    Trusted_Connection=yes;
    """
    try:
        connection = pyodbc.connect(connection_string)
        print(f"Connected to {database} on {server} successfully!")
        return connection
    except pyodbc.Error as e:
        print(f"Error while connecting to database: {e}")
        return None


def generate_interaction_data(num_records):
    interaction_data = []
    start_date = datetime(2020, 1, 1)
    
    for i in range(1, num_records + 1):
        interaction_key = i
        interaction_id = i  
        interaction_date = start_date + timedelta(days=i)  
        
       
        customer_id = random.randint(1, 100)  
        
        interaction_type_id = random.randint(1, 5) 
        notes = f"Interaction note {i}"
        
        interaction_data.append([
            interaction_key,
            interaction_id,  
            customer_id,
            interaction_date.strftime("%Y-%m-%d"),
            interaction_type_id,
            notes
        ])
    
    return interaction_data


def load_interaction_data_to_dwh(connection, interaction_data):
    cursor = connection.cursor()
    
   
    cursor.execute("SET IDENTITY_INSERT dim_interaction ON")
    
    interaction_insert_query = """
    INSERT INTO dim_interaction (interaction_key, interaction_id, customer_id, interaction_date, interaction_type_id, notes)
    VALUES (?, ?, ?, ?, ?, ?)
    """
    
    cursor.executemany(interaction_insert_query, interaction_data)
    connection.commit()  
    
  
    cursor.execute("SET IDENTITY_INSERT dim_interaction OFF")
    connection.commit()  
    
    print(f"{len(interaction_data)} rows inserted successfully into dim_interaction!")

# التحقق من البيانات في جدول dim_interaction
def check_data_in_dim_interaction(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM dim_interaction")
    rows = cursor.fetchall()
    for row in rows:
        print(row)

# تنفيذ العملية
if __name__ == "__main__":
    # اتصال بقاعدة البيانات
    dwh_conn = create_connection('DESKTOP-67E98DV', 'SalesDWH')
    
    # توليد بيانات التفاعل
    interaction_data = generate_interaction_data(1000)  # على سبيل المثال، إنشاء 1000 سجل
    
    # تحميل البيانات إلى جدول dim_interaction
    load_interaction_data_to_dwh(dwh_conn, interaction_data)
    
    # التحقق من البيانات في الجدول
    check_data_in_dim_interaction(dwh_conn)
    
    # إغلاق الاتصال
    dwh_conn.close()


Connected to SalesDWH on DESKTOP-67E98DV successfully!
1000 rows inserted successfully into dim_interaction!
(1, 1, 9, datetime.date(2020, 1, 2), 4, 'Interaction note 1')
(2, 2, 66, datetime.date(2020, 1, 3), 2, 'Interaction note 2')
(3, 3, 52, datetime.date(2020, 1, 4), 3, 'Interaction note 3')
(4, 4, 49, datetime.date(2020, 1, 5), 2, 'Interaction note 4')
(5, 5, 25, datetime.date(2020, 1, 6), 1, 'Interaction note 5')
(6, 6, 39, datetime.date(2020, 1, 7), 3, 'Interaction note 6')
(7, 7, 22, datetime.date(2020, 1, 8), 3, 'Interaction note 7')
(8, 8, 89, datetime.date(2020, 1, 9), 3, 'Interaction note 8')
(9, 9, 65, datetime.date(2020, 1, 10), 4, 'Interaction note 9')
(10, 10, 70, datetime.date(2020, 1, 11), 4, 'Interaction note 10')
(11, 11, 81, datetime.date(2020, 1, 12), 2, 'Interaction note 11')
(12, 12, 3, datetime.date(2020, 1, 13), 1, 'Interaction note 12')
(13, 13, 75, datetime.date(2020, 1, 14), 3, 'Interaction note 13')
(14, 14, 70, datetime.date(2020, 1, 15), 4, 'Interaction 

In [52]:
import pyodbc

# الاتصال بقاعدة البيانات المصدر
database_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV\\SQLEXPRESS;'
    'DATABASE=CustomerDataManagement;'
    'Trusted_Connection=yes;'
)

# الاتصال بقاعدة البيانات الوجهة
dwh_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV;'
    'DATABASE=SalesDWH;'
    'Trusted_Connection=yes;'
)

def fetch_payment_method_data(connection):
    query = """
    SELECT PaymentMethodID, MethodName  -- العمود الصحيح هو MethodName
    FROM PaymentMethods
    """
    cursor = connection.cursor()
    cursor.execute(query)
    return cursor.fetchall()


def load_payment_method_data_to_dwh(connection, payment_method_data):
    cursor = connection.cursor()
    
    payment_method_insert_query = """
    INSERT INTO dim_payment_method (payment_method_id, method_name)  -- العمود الصحيح هو method_name
    VALUES (?, ?)
    """
    
    cursor.executemany(payment_method_insert_query, payment_method_data)
    connection.commit()
    print(f"{len(payment_method_data)} rows inserted successfully into dim_payment_method!")

# 3. تنفيذ الوظائف
payment_method_data = fetch_payment_method_data(database_conn)
load_payment_method_data_to_dwh(dwh_conn, payment_method_data)

# 4. إغلاق الاتصالات
database_conn.close()
dwh_conn.close()


1000 rows inserted successfully into dim_payment_method!


In [53]:
import pyodbc

database_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV\\SQLEXPRESS;'
    'DATABASE=CustomerDataManagement;'
    'Trusted_Connection=yes;'
)

dwh_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV;'
    'DATABASE=SalesDWH;'
    'Trusted_Connection=yes;'
)


def fetch_sales_data(connection):
    query = """
    SELECT 
        TransactionID, 
        CustomerID, 
        TransactionDate, 
        Amount, 
        PaymentMethodID 
    FROM Transactions
    """
    cursor = connection.cursor()
    cursor.execute(query)
    return cursor.fetchall()


def map_keys_to_dwh(connection, sales_data):
    cursor = connection.cursor()
    mapped_data = []

    for row in sales_data:
        transaction_id = row[0]
        customer_id = row[1]
        transaction_date = row[2]
        amount = row[3]
        payment_method_id = row[4]

        
        cursor.execute("SELECT customer_key FROM dim_customer WHERE customer_id = ?", customer_id)
        customer_key = cursor.fetchone()[0]

        
        cursor.execute("SELECT payment_method_key FROM dim_payment_method WHERE payment_method_id = ?", payment_method_id)
        payment_method_key = cursor.fetchone()[0]

        
        cursor.execute("SELECT date_key FROM dim_date WHERE full_date = ?", transaction_date)
        transaction_date_key = cursor.fetchone()[0]

        
        mapped_data.append((
            transaction_id,
            customer_key,
            payment_method_key,
            transaction_date_key,
            amount
        ))

    return mapped_data


def load_sales_data_to_dwh(connection, sales_data):
    cursor = connection.cursor()

    sales_insert_query = """
    INSERT INTO fact_sales (transaction_id, customer_key, payment_method_key, transaction_date_key, amount)
    VALUES (?, ?, ?, ?, ?)
    """

    cursor.executemany(sales_insert_query, sales_data)
    connection.commit()
    print(f"{len(sales_data)} rows inserted successfully into fact_sales!")


sales_data = fetch_sales_data(database_conn)
mapped_sales_data = map_keys_to_dwh(dwh_conn, sales_data)
load_sales_data_to_dwh(dwh_conn, mapped_sales_data)

database_conn.close()
dwh_conn.close()


1000 rows inserted successfully into fact_sales!


In [1]:
import pyodbc

# الاتصال بقاعدة البيانات
database_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV\\SQLEXPRESS;'
    'DATABASE=CustomerDataManagement;'
    'Trusted_Connection=yes;'
)

dwh_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV;'
    'DATABASE=SalesDWH;'
    'Trusted_Connection=yes;'
)

# 1. جلب بيانات Transactions من قاعدة البيانات
def fetch_sales_data(connection):
    query = """
    SELECT 
        TransactionID, 
        CustomerID, 
        TransactionDate, 
        Amount, 
        PaymentMethodID 
    FROM Transactions
    """
    cursor = connection.cursor()
    cursor.execute(query)
    return cursor.fetchall()

# 2. ربط المفاتيح من جداول الـ DWH
def map_keys_to_dwh(connection, sales_data):
    cursor = connection.cursor()
    mapped_data = []

    for row in sales_data:
        transaction_id = row[0]
        customer_id = row[1]
        transaction_date = row[2]
        amount = row[3]
        payment_method_id = row[4]

        # جلب customer_key من dim_customer
        cursor.execute("SELECT customer_key FROM dim_customer WHERE customer_id = ?", customer_id)
        customer_key = cursor.fetchone()[0]

        # جلب payment_method_key من dim_payment_method
        cursor.execute("SELECT payment_method_key FROM dim_payment_method WHERE payment_method_id = ?", payment_method_id)
        payment_method_key = cursor.fetchone()[0]

        # جلب transaction_date_key من dim_date
        cursor.execute("SELECT date_key FROM dim_date WHERE full_date = ?", transaction_date)
        transaction_date_key = cursor.fetchone()[0]

        # جلب address_key من dim_customer_address
        cursor.execute("SELECT address_key FROM dim_customer_address WHERE customer_id = ?", customer_id)
        address_key = cursor.fetchone()
        address_key = address_key[0] if address_key else None  # معالجة القيم الفارغة

        # جلب contact_key من dim_customer_contact
        cursor.execute("SELECT contact_key FROM dim_customer_contact WHERE customer_id = ?", customer_id)
        contact_key = cursor.fetchone()
        contact_key = contact_key[0] if contact_key else None  # معالجة القيم الفارغة

        # جلب interaction_key و interaction_type_key من dim_interaction
        cursor.execute("SELECT interaction_key, interaction_type_id FROM dim_interaction WHERE customer_id = ?", customer_id)
        interaction_row = cursor.fetchone()
        interaction_key = interaction_row[0] if interaction_row else None
        interaction_type_key = interaction_row[1] if interaction_row else None

        # التحقق من أن interaction_type_key موجودة في dim_interaction_type
        if interaction_type_key:
            cursor.execute("SELECT interaction_type_key FROM dim_interaction_type WHERE interaction_type_key = ?", interaction_type_key)
            interaction_type_exists = cursor.fetchone()
            if not interaction_type_exists:
                interaction_type_key = None  # التعامل مع المفاتيح غير الموجودة

        # إضافة البيانات المرتبطة إلى القائمة
        mapped_data.append((
            transaction_id,
            customer_key,
            address_key,
            contact_key,
            payment_method_key,
            interaction_key,
            interaction_type_key,
            transaction_date_key,
            amount
        ))

    return mapped_data

# 3. تحميل البيانات إلى fact_sales في الـ DWH
def load_sales_data_to_dwh(connection, sales_data):
    cursor = connection.cursor()

    sales_insert_query = """
    INSERT INTO fact_sales (transaction_id, customer_key, address_key, contact_key, payment_method_key, interaction_key, interaction_type_key, transaction_date_key, amount)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """

    cursor.executemany(sales_insert_query, sales_data)
    connection.commit()
    print(f"{len(sales_data)} rows inserted successfully into fact_sales!")

# 4. تنفيذ الوظائف
sales_data = fetch_sales_data(database_conn)
mapped_sales_data = map_keys_to_dwh(dwh_conn, sales_data)
load_sales_data_to_dwh(dwh_conn, mapped_sales_data)

# 5. إغلاق الاتصالات
database_conn.close()
dwh_conn.close()


1000 rows inserted successfully into fact_sales!


In [2]:
import pyodbc

# الاتصال بقاعدة البيانات الأصلية
database_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV\\SQLEXPRESS;'
    'DATABASE=CustomerDataManagement;'
    'Trusted_Connection=yes;'
)

# الاتصال بـ DWH
dwh_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV;'
    'DATABASE=SalesDWH;'
    'Trusted_Connection=yes;'
)

# 1. جلب بيانات Transactions من قاعدة البيانات الأصلية
def fetch_sales_data(connection):
    query = """
    SELECT 
        t.TransactionID, 
        t.CustomerID, 
        t.TransactionDate, 
        t.Amount, 
        t.PaymentMethodID 
    FROM Transactions t
    LEFT JOIN dim_customer_address dca ON t.CustomerID = dca.customer_id
    LEFT JOIN dim_interaction di ON t.CustomerID = di.customer_id
    """
    cursor = connection.cursor()
    cursor.execute(query)
    return cursor.fetchall()

# 2. ربط المفاتيح من جداول الـ DWH
def map_keys_to_dwh(connection, sales_data):
    cursor = connection.cursor()
    mapped_data = []

    for row in sales_data:
        transaction_id = row[0]
        customer_id = row[1]
        transaction_date = row[2]
        amount = row[3]
        payment_method_id = row[4]

        # جلب customer_key من dim_customer
        cursor.execute("SELECT customer_key FROM dim_customer WHERE customer_id = ?", customer_id)
        customer_key = cursor.fetchone()[0]

        # جلب payment_method_key من dim_payment_method
        cursor.execute("SELECT payment_method_key FROM dim_payment_method WHERE payment_method_id = ?", payment_method_id)
        payment_method_key = cursor.fetchone()[0]

        # جلب transaction_date_key من dim_date
        cursor.execute("SELECT date_key FROM dim_date WHERE full_date = ?", transaction_date)
        transaction_date_key = cursor.fetchone()[0]

        # جلب address_key من dim_customer_address
        cursor.execute("SELECT address_key FROM dim_customer_address WHERE customer_id = ?", customer_id)
        address_key = cursor.fetchone()
        address_key = address_key[0] if address_key else None  # معالجة القيم الفارغة

        # جلب contact_key من dim_customer_contact
        cursor.execute("SELECT contact_key FROM dim_customer_contact WHERE customer_id = ?", customer_id)
        contact_key = cursor.fetchone()
        contact_key = contact_key[0] if contact_key else None  # معالجة القيم الفارغة

        # جلب interaction_key و interaction_type_key من dim_interaction
        cursor.execute("SELECT interaction_key, interaction_type_id FROM dim_interaction WHERE customer_id = ?", customer_id)
        interaction_row = cursor.fetchone()
        interaction_key = interaction_row[0] if interaction_row else None
        interaction_type_key = interaction_row[1] if interaction_row else None

        # إضافة البيانات المرتبطة إلى القائمة
        mapped_data.append((
            transaction_id,
            customer_key,
            address_key,
            contact_key,
            payment_method_key,
            interaction_key,
            interaction_type_key,
            transaction_date_key,
            amount
        ))

    return mapped_data

# 3. تحميل البيانات إلى fact_sales في الـ DWH
def load_sales_data_to_dwh(connection, sales_data):
    cursor = connection.cursor()

    sales_insert_query = """
    INSERT INTO fact_sales (transaction_id, customer_key, address_key, contact_key, payment_method_key, interaction_key, interaction_type_key, transaction_date_key, amount)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """

    cursor.executemany(sales_insert_query, sales_data)
    connection.commit()
    print(f"{len(sales_data)} rows inserted successfully into fact_sales!")

# 4. تنفيذ الوظائف
sales_data = fetch_sales_data(database_conn)
mapped_sales_data = map_keys_to_dwh(dwh_conn, sales_data)
load_sales_data_to_dwh(dwh_conn, mapped_sales_data)

# 5. إغلاق الاتصالات
database_conn.close()
dwh_conn.close()


ProgrammingError: ('42S02', "[42S02] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid object name 'dim_customer_address'. (208) (SQLExecDirectW)")

In [3]:
import pyodbc

# الاتصال بقاعدة البيانات الأصلية
database_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV\\SQLEXPRESS;'
    'DATABASE=CustomerDataManagement;'
    'Trusted_Connection=yes;'
)

# الاتصال بـ DWH
dwh_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV;'
    'DATABASE=SalesDWH;'
    'Trusted_Connection=yes;'
)

# 1. جلب بيانات Transactions من قاعدة البيانات الأصلية
def fetch_sales_data(connection):
    query = """
    SELECT 
        TransactionID, 
        CustomerID, 
        TransactionDate, 
        Amount, 
        PaymentMethodID 
    FROM Transactions
    """
    cursor = connection.cursor()
    cursor.execute(query)
    return cursor.fetchall()

# 2. ربط المفاتيح من جداول الـ DWH
def map_keys_to_dwh(connection, sales_data):
    cursor = connection.cursor()
    mapped_data = []

    for row in sales_data:
        transaction_id = row[0]
        customer_id = row[1]
        transaction_date = row[2]
        amount = row[3]
        payment_method_id = row[4]

        # جلب customer_key من dim_customer
        cursor.execute("SELECT customer_key FROM dim_customer WHERE customer_id = ?", customer_id)
        customer_key = cursor.fetchone()
        customer_key = customer_key[0] if customer_key else None

        # جلب payment_method_key من dim_payment_method
        cursor.execute("SELECT payment_method_key FROM dim_payment_method WHERE payment_method_id = ?", payment_method_id)
        payment_method_key = cursor.fetchone()
        payment_method_key = payment_method_key[0] if payment_method_key else None

        # جلب transaction_date_key من dim_date
        cursor.execute("SELECT date_key FROM dim_date WHERE full_date = ?", transaction_date)
        transaction_date_key = cursor.fetchone()
        transaction_date_key = transaction_date_key[0] if transaction_date_key else None

        # جلب address_key من dim_customer_address
        cursor.execute("SELECT address_key FROM dim_customer_address WHERE customer_id = ?", customer_id)
        address_key = cursor.fetchone()
        address_key = address_key[0] if address_key else None

        # جلب contact_key من dim_customer_contact
        cursor.execute("SELECT contact_key FROM dim_customer_contact WHERE customer_id = ?", customer_id)
        contact_key = cursor.fetchone()
        contact_key = contact_key[0] if contact_key else None

        # جلب interaction_key و interaction_type_key من dim_interaction
        cursor.execute("SELECT interaction_key, interaction_type_id FROM dim_interaction WHERE customer_id = ?", customer_id)
        interaction_row = cursor.fetchone()
        interaction_key = interaction_row[0] if interaction_row else None
        interaction_type_key = interaction_row[1] if interaction_row else None

        # إضافة البيانات المرتبطة إلى القائمة
        mapped_data.append((
            transaction_id,
            customer_key,
            address_key,
            contact_key,
            payment_method_key,
            interaction_key,
            interaction_type_key,
            transaction_date_key,
            amount
        ))

    return mapped_data

# 3. تحميل البيانات إلى fact_sales في الـ DWH
def load_sales_data_to_dwh(connection, sales_data):
    cursor = connection.cursor()

    sales_insert_query = """
    INSERT INTO fact_sales (transaction_id, customer_key, address_key, contact_key, payment_method_key, interaction_key, interaction_type_key, transaction_date_key, amount)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """

    cursor.executemany(sales_insert_query, sales_data)
    connection.commit()
    print(f"{len(sales_data)} rows inserted successfully into fact_sales!")

# 4. تنفيذ الوظائف
sales_data = fetch_sales_data(database_conn)
mapped_sales_data = map_keys_to_dwh(dwh_conn, sales_data)
load_sales_data_to_dwh(dwh_conn, mapped_sales_data)

# 5. إغلاق الاتصالات
database_conn.close()
dwh_conn.close()


IntegrityError: ('23000', '[23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]The INSERT statement conflicted with the FOREIGN KEY constraint "fk_fact_sales_dim_interaction_type". The conflict occurred in database "SalesDWH", table "dbo.dim_interaction_type", column \'interaction_type_key\'. (547) (SQLExecDirectW); [23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]The statement has been terminated. (3621)')

In [4]:
import pyodbc

# الاتصال بقاعدة البيانات الأصلية (CustomerDataManagement)
database_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV\\SQLEXPRESS;'
    'DATABASE=CustomerDataManagement;'
    'Trusted_Connection=yes;'
)

# الاتصال بقاعدة بيانات الـ DWH (SalesDWH)
dwh_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV;'
    'DATABASE=SalesDWH;'
    'Trusted_Connection=yes;'
)

# 1. جلب بيانات `InteractionTypes` من قاعدة البيانات الأصلية
def fetch_interaction_type_data(connection):
    query = """
    SELECT InteractionTypeID, TypeName
    FROM InteractionTypes
    """
    cursor = connection.cursor()
    cursor.execute(query)
    return cursor.fetchall()

# 2. تحميل البيانات إلى `dim_interaction_type` في الـ DWH
def load_interaction_type_data_to_dwh(connection, interaction_type_data):
    cursor = connection.cursor()

    interaction_type_insert_query = """
    INSERT INTO dim_interaction_type (interaction_type_id, type_name)
    VALUES (?, ?)
    """

    cursor.executemany(interaction_type_insert_query, interaction_type_data)
    connection.commit()
    print(f"{len(interaction_type_data)} rows inserted successfully into dim_interaction_type!")

# 3. تنفيذ الوظائف
interaction_type_data = fetch_interaction_type_data(database_conn)
load_interaction_type_data_to_dwh(dwh_conn, interaction_type_data)

# 4. إغلاق الاتصالات
database_conn.close()
dwh_conn.close()


1000 rows inserted successfully into dim_interaction_type!


In [6]:
import pyodbc

# الاتصال بقاعدة البيانات
database_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV\\SQLEXPRESS;'
    'DATABASE=CustomerDataManagement;'
    'Trusted_Connection=yes;'
)

dwh_conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=DESKTOP-67E98DV;'
    'DATABASE=SalesDWH;'
    'Trusted_Connection=yes;'
)

# 1. جلب بيانات Transactions من قاعدة البيانات
def fetch_sales_data(connection):
    query = """
    SELECT 
        t.TransactionID, 
        t.CustomerID, 
        t.TransactionDate, 
        t.Amount, 
        t.PaymentMethodID
    FROM Transactions t
    """
    cursor = connection.cursor()
    cursor.execute(query)
    return cursor.fetchall()

# 2. ربط المفاتيح من جداول الـ DWH ومعالجة القيم المفقودة
def map_keys_to_dwh(connection, sales_data):
    cursor = connection.cursor()
    mapped_data = []

    for row in sales_data:
        transaction_id = row[0]
        customer_id = row[1]
        transaction_date = row[2]
        amount = row[3]
        payment_method_id = row[4]

        # جلب customer_key من dim_customer
        cursor.execute("SELECT customer_key FROM dim_customer WHERE customer_id = ?", customer_id)
        customer_key_row = cursor.fetchone()
        customer_key = customer_key_row[0] if customer_key_row else None

        # جلب payment_method_key من dim_payment_method
        cursor.execute("SELECT payment_method_key FROM dim_payment_method WHERE payment_method_id = ?", payment_method_id)
        payment_method_key_row = cursor.fetchone()
        payment_method_key = payment_method_key_row[0] if payment_method_key_row else None

        # جلب transaction_date_key من dim_date
        cursor.execute("SELECT date_key FROM dim_date WHERE full_date = ?", transaction_date)
        transaction_date_key_row = cursor.fetchone()
        transaction_date_key = transaction_date_key_row[0] if transaction_date_key_row else None

        # جلب address_key من dim_customer_address
        cursor.execute("SELECT address_key FROM dim_customer_address WHERE customer_id = ?", customer_id)
        address_key_row = cursor.fetchone()
        address_key = address_key_row[0] if address_key_row else None

        # جلب contact_key من dim_customer_contact
        cursor.execute("SELECT contact_key FROM dim_customer_contact WHERE customer_id = ?", customer_id)
        contact_key_row = cursor.fetchone()
        contact_key = contact_key_row[0] if contact_key_row else None

        # جلب interaction_key و interaction_type_key من dim_interaction
        cursor.execute("SELECT interaction_key, interaction_type_id FROM dim_interaction WHERE customer_id = ?", customer_id)
        interaction_row = cursor.fetchone()
        interaction_key = interaction_row[0] if interaction_row else None
        interaction_type_key = interaction_row[1] if interaction_row else None

        # التحقق من وجود interaction_type_key في dim_interaction_type
        if interaction_type_key:
            cursor.execute("SELECT interaction_type_key FROM dim_interaction_type WHERE interaction_type_key = ?", interaction_type_key)
            interaction_type_row = cursor.fetchone()
            if not interaction_type_row:
                interaction_type_key = None  # تعيين None إذا لم يتم العثور على نوع التفاعل

        # إضافة البيانات المرتبطة إلى القائمة
        mapped_data.append((
            transaction_id,
            customer_key,
            address_key,
            contact_key,
            payment_method_key,
            interaction_key,
            interaction_type_key,
            transaction_date_key,
            amount
        ))

    return mapped_data

# 3. تحميل البيانات إلى fact_sales في الـ DWH
def load_sales_data_to_dwh(connection, sales_data):
    cursor = connection.cursor()

    sales_insert_query = """
    INSERT INTO fact_sales (transaction_id, customer_key, address_key, contact_key, payment_method_key, interaction_key, interaction_type_key, transaction_date_key, amount)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """

    cursor.executemany(sales_insert_query, sales_data)
    connection.commit()
    print(f"{len(sales_data)} rows inserted successfully into fact_sales!")

# 4. تنفيذ الوظائف
sales_data = fetch_sales_data(database_conn)
mapped_sales_data = map_keys_to_dwh(dwh_conn, sales_data)
load_sales_data_to_dwh(dwh_conn, mapped_sales_data)

# 5. إغلاق الاتصالات
database_conn.close()
dwh_conn.close()


1000 rows inserted successfully into fact_sales!
