In [None]:
%pip install pandas faker

In [None]:
pip install sqlalchemy pyodbc 

In [None]:
import random
import shutil, os
from faker import Faker
import pandas as pd
import datetime
from datetime import date, timedelta
from sqlalchemy import create_engine

fake = Faker()

In [None]:
import json
products = [
    { "id": 1, "pname": "Laptop", "price": 12000 },
    { "id": 2, "pname": "Mouse", "price": 250 },
    { "id": 3, "pname": "Keyboard", "price": 500 },
    { "id": 4, "pname": "Monitor", "price": 3000 },
    { "id": 5, "pname": "Headphones", "price": 800 },
    { "id": 6, "pname": "Printer", "price": 4500 },
    { "id": 7, "pname": "USB Flash Drive", "price": 150 }
]
with open("products.json", "w") as file:
    json.dump(products, file, indent=4)


In [43]:
def new_transaction_data_generator(number_of_records,max_id):
    return [
            [
                i,
                random.randint(1,2001),
                random.randint(1, 7),
                random.randint(1, 10),
                date.today(),
                random.choice(['cairo', 'alex', 'tanta'])
            ]
            for i in range(max_id+1, max_id + number_of_records + 1)
        ]

In [None]:
columns_names = ["transaction_id", "customer_id", "product_id", "quantity","transaction_date", "branch"]

def create_transactions_dfs(num_of_files, number_of_records):
    base_date = fake.date_between("-15d","-4d")
    file_counter = 1
    current_id = 0
    
    for i in range(num_of_files):
        current_date = base_date + timedelta(days=i // 3)
        
        transaction_df = pd.DataFrame(
            columns=columns_names,
            data= new_transaction_data_generator(number_of_records,current_id)
        )
        
        transaction_df.to_csv(f"transactions{file_counter}.csv", index=False)
        file_counter += 1
        current_id += number_of_records

create_transactions_dfs(num_of_files=12, number_of_records=1000)


In [None]:
def get_max_id(file):
    if not os.path.exists(file):
        return 0
    existing_df = pd.read_csv(file)
    return len(existing_df)

In [None]:
main_file = "transactions.csv"
def concat_files(sec_file):
    # If the main file does not exist, just copy secondary to main
    if not os.path.exists(main_file):
        shutil.copy(sec_file, main_file)
        return
    
    # Read 
    main_df = pd.read_csv(main_file)
    # skip el line bta3 el header w ent bt read
    sec_df = pd.read_csv(sec_file,skiprows=1,names=columns_names)

    # Append directly (IDs already unique)
    df_final = pd.concat([main_df, sec_df], ignore_index=True)
    df_final.to_csv(main_file, index=False)




In [None]:
for i in range(1, 13):
    sec_file = f"transactions{i}.csv"
    concat_files(sec_file)


In [None]:
def create_customer_records(num_of_customers):
    Ccolumn_names=['id','first_name','last_name','Caddress']
    customers_df = pd.DataFrame(columns=Ccolumn_names,
                                data =[[i,
                                fake.first_name(),
                                fake.last_name() ,
                                fake.address().replace('\n','')]
                                for i in range(1, num_of_customers+1)] )    
    
    customers_df.to_csv('Customers.csv',index=False)
create_customer_records(2000)    

In [None]:
transaction_df = pd.read_csv('transactions.csv')
customers_df = pd.read_csv('Customers.csv')
products_df = pd.read_json('products.json')
orders_df = pd.read_csv('Orders.csv')

In [None]:
def merge_orders(transaction_df,customers_df,products_df):
    merged_df = pd.merge(
        transaction_df, customers_df,
        left_on='customer_id', right_on='id',
        how='inner', suffixes=('', '_cust')
    )

    
    orders_df = pd.merge(
        merged_df, products_df,
        left_on='product_id', right_on='id',
        how='inner', suffixes=('', '_prod')
    )

    orders_df['amount'] = (orders_df['price'] * orders_df['quantity']).round().astype('int64')
    orders_df['customer_full_name'] = orders_df['first_name'] + ' ' + orders_df['last_name']

    orders_df = orders_df[
        ['transaction_id', 'pname', 'amount', 'customer_full_name',
        'transaction_date', 'branch']
    ].rename(columns={'pname': 'product_name'})

    orders_df.to_csv('Orders.csv', index=False)

    return orders_df


    

In [None]:
def read_csv_into_dfs(file):
    return pd.read_csv(file)
    

In [None]:
engine = create_engine(
    "mssql+pyodbc://@DESKTOP-1UV6SC4\\SQLEXPRESS/ETLTask"
    "?driver=ODBC+Driver+18+for+SQL+Server"
    "&trusted_connection=yes"
    "&Encrypt=no"
)


In [None]:
def load_into_ssms(engine):
    orders_df.to_sql("TOrders" , con=engine , if_exists="append" , index= False)
    customers_df.to_sql("Tcustomers" , con=engine , if_exists="append" , index= False)


 # * Insert additional records in transactions.csv
 # * Insert only recirds that doesn't already exist

In [75]:
def add_transactions_toCSV(number_of_records, file_path="transactions.csv"):
    columns_names = ["transaction_id", "customer_id", "product_id", "quantity","transaction_date", "branch"]
    if os.path.exists(file_path):
        existing_df = pd.read_csv(file_path, usecols=["transaction_id"])
        max_id = existing_df["transaction_id"].max()
        write_header = False
        
    else:
        max_id = 0
        write_header = True

    new_data = new_transaction_data_generator(number_of_records,max_id)

    new_df = pd.DataFrame(new_data, columns=columns_names)

    new_df.to_csv(file_path, index=False, header=write_header, mode='a')
add_transactions_toCSV(500)   


In [76]:
from sqlalchemy import inspect, text
def insert_into_orders_db(file, engine):
    transaction_df = read_csv_into_dfs(file)

    inspector = inspect(engine)
    table_exists = inspector.has_table("TOrders")

    if not table_exists:
        # Create table
        create_table_sql = """
        CREATE TABLE TOrders (
            transaction_id INT PRIMARY KEY,
            product_name NVARCHAR(100),
            amount INT,
            customer_full_name NVARCHAR(200),
            transaction_date DATE,
            branch NVARCHAR(100)
        );
        """
        with engine.begin() as conn:
            conn.execute(text(create_table_sql))

        # Merge & insert all transactions
        orders_df = merge_orders(transaction_df, customers_df, products_df)
        orders_df.to_sql("TOrders", engine, if_exists="append", index=False)

    else:
        with engine.connect() as conn:
            row_count = conn.execute(text("SELECT COUNT(*) FROM TOrders")).scalar()

        if row_count == 0:
            orders_df = merge_orders(transaction_df, customers_df, products_df)
            orders_df.to_sql("TOrders", engine, if_exists="append", index=False)
        else:
            # Get max transaction_id already in DB
            max_id_df = pd.read_sql("SELECT MAX(transaction_id) AS max_id FROM TOrders", con=engine)
            max_id = max_id_df.iloc[0, 0]

            # Filter only new transactions
            transaction_new = transaction_df[transaction_df["transaction_id"] > max_id]

            if not transaction_new.empty:
                orders_df = merge_orders(transaction_new, customers_df, products_df)
                orders_df.to_sql("TOrders", engine, if_exists="append", index=False)
                print(f"Inserted {len(orders_df)} new transactions.")
            else:
                print("No new transactions to insert.")
insert_into_orders_db("transactions.csv", engine)                


Inserted 500 new transactions.


In [77]:
#update csv (read, transform, write)
def update_csv(file, column_name, column_value, target_column, new_data):
    # Read before update
    before_update_df = read_csv_into_dfs(file)

    # Apply update
    after_update_df = before_update_df.copy()
    after_update_df.loc[after_update_df[column_name] == column_value, target_column] = new_data
    
    # Save back to CSV
    after_update_df.to_csv(file, index=False, header=True)

    # Sync changes to DB (merge old vs new)
    merge_transactions(engine, before_update_df, after_update_df)

    
update_csv("transactions.csv","transaction_id",12511,"branch","alex") 
   

In [None]:
def merge_transactions(engine, old_df, new_df):
    # Compare new vs old on transaction_id
    merged = pd.merge(new_df, old_df, 
                      on="transaction_id", how="left", suffixes=("_new", "_old"))

    # ---- Insert new transactions ----
    new_records = merged[merged["customer_id_old"].isna()]
    if not new_records.empty:
        orders_new = merge_orders(new_records[columns_names], customers_df, products_df)
        orders_new.to_sql("TOrders", engine, if_exists="append", index=False)

    # ---- Update old transactions ----
    old_changed = merged[~merged["customer_id_old"].isna()]
    for _, row in old_changed.iterrows():
        # Check if any column value actually changed
        if (
            row["customer_id_new"] != row["customer_id_old"] or
            row["product_id_new"] != row["product_id_old"] or
            row["quantity_new"]   != row["quantity_old"] or
            row["transaction_date_new"] != row["transaction_date_old"] or
            row["branch_new"] != row["branch_old"]
        ):
            update_sql = f"""
            UPDATE TOrders
            SET product_name = '{products_df.loc[products_df['id']==row['product_id_new'],'pname'].values[0]}',
                amount = {row['quantity_new']} * (
                    SELECT price FROM Tproducts WHERE id = {row['product_id_new']}
                ),
                customer_full_name = (
                    SELECT first_name + ' ' + last_name FROM Tcustomers WHERE id = {row['customer_id_new']}
                ),
                transaction_date = '{row['transaction_date_new']}',
                branch = '{row['branch_new']}'
            WHERE transaction_id = {row['transaction_id']};
            """
            with engine.begin() as conn:
                conn.execute(text(update_sql))
    tdf = read_csv_into_dfs("transactions.csv")
    cdf = read_csv_into_dfs("Customers.csv")
    pdf = pd.read_json("products.json")
    merge_orders(tdf,cdf,pdf)            


In [79]:
def update_product_price(engine, product_id, new_price):
    update_sql = f"""
    UPDATE TOrders
    SET amount = {new_price} * (
        SELECT quantity 
        FROM transactions t
        WHERE t.transaction_id = TOrders.transaction_id
          AND t.product_id = {product_id}
    )
    WHERE transaction_id IN (
        SELECT transaction_id 
        FROM transactions 
        WHERE product_id = {product_id}
    );
    """
    with engine.begin() as conn:
        conn.execute(text(update_sql))

In [None]:
%pip install pandas faker sqlalchemy pyodbc 
from sqlalchemy import inspect, text
import random
import shutil, os
from faker import Faker
import pandas as pd
import datetime
import json
from datetime import date, timedelta
from sqlalchemy import create_engine

fake = Faker()


products = [
    { "id": 1, "pname": "Laptop", "price": 12000 },
    { "id": 2, "pname": "Mouse", "price": 250 },
    { "id": 3, "pname": "Keyboard", "price": 500 },
    { "id": 4, "pname": "Monitor", "price": 3000 },
    { "id": 5, "pname": "Headphones", "price": 800 },
    { "id": 6, "pname": "Printer", "price": 4500 },
    { "id": 7, "pname": "USB Flash Drive", "price": 150 }
]
with open("products.json", "w") as file:
    json.dump(products, file, indent=4)

def new_transaction_data_generator(number_of_records,max_id):
    return [
            [
                i,
                random.randint(1,2001),
                random.randint(1, 7),
                random.randint(1, 10),
                date.today(),
                random.choice(['cairo', 'alex', 'tanta'])
            ]
            for i in range(max_id+1, max_id + number_of_records + 1)
        ]    

columns_names = ["transaction_id", "customer_id", "product_id", "quantity","transaction_date", "branch"]

def create_transactions_dfs(num_of_files, number_of_records):
    base_date = fake.date_between("-15d","-4d")
    file_counter = 1
    current_id = 0
    
    for i in range(num_of_files):
        current_date = base_date + timedelta(days=i // 3)
        
        transaction_df = pd.DataFrame(
            columns=columns_names,
            data= new_transaction_data_generator(number_of_records,current_id)
        )
        
        transaction_df.to_csv(f"transactions{file_counter}.csv", index=False)
        file_counter += 1
        current_id += number_of_records

create_transactions_dfs(num_of_files=12, number_of_records=1000)

def get_max_id(file):
    if not os.path.exists(file):
        return 0
    existing_df = pd.read_csv(file)
    return len(existing_df)

main_file = "transactions.csv"
def concat_files(sec_file):
    # If the main file does not exist, just copy secondary to main
    if not os.path.exists(main_file):
        shutil.copy(sec_file, main_file)
        return
    
    # Read 
    main_df = pd.read_csv(main_file)
    # skip el line bta3 el header w ent bt read
    sec_df = pd.read_csv(sec_file,skiprows=1,names=columns_names)

    # Append directly (IDs already unique)
    df_final = pd.concat([main_df, sec_df], ignore_index=True)
    df_final.to_csv(main_file, index=False)

for i in range(1, 13):
    sec_file = f"transactions{i}.csv"
    concat_files(sec_file)

def create_customer_records(num_of_customers):
    Ccolumn_names=['id','first_name','last_name','Caddress']
    customers_df = pd.DataFrame(columns=Ccolumn_names,
                                data =[[i,
                                fake.first_name(),
                                fake.last_name() ,
                                fake.address().replace('\n','')]for i in range(1, num_of_customers+1)] )    
    
    customers_df.to_csv('Customers.csv',index=False)

transaction_df = pd.read_csv('transactions.csv')
customers_df = pd.read_csv('Customers.csv')
products_df = pd.read_json('products.json')
orders_df = pd.read_csv('Orders.csv')

def merge_orders(transaction_df,customers_df,products_df):
    merged_df = pd.merge(
        transaction_df, customers_df,
        left_on='customer_id', right_on='id',
        how='inner', suffixes=('', '_cust')
    )

    
    orders_df = pd.merge(
        merged_df, products_df,
        left_on='product_id', right_on='id',
        how='inner', suffixes=('', '_prod')
    )

    orders_df['amount'] = (orders_df['price'] * orders_df['quantity']).round().astype('int64')
    orders_df['customer_full_name'] = orders_df['first_name'] + ' ' + orders_df['last_name']

    orders_df = orders_df[
        ['transaction_id', 'pname', 'amount', 'customer_full_name', 'transaction_date', 'branch']
    ].rename(columns={'pname': 'product_name'})

    orders_df.to_csv('Orders.csv', index=False)

    return orders_df

def read_csv_into_dfs(file):
    return pd.read_csv(file)

engine = create_engine(
    "mssql+pyodbc://@DESKTOP-1UV6SC4\\SQLEXPRESS/ETLTask"
    "?driver=ODBC+Driver+18+for+SQL+Server"
    "&trusted_connection=yes"
    "&Encrypt=no"
)

def load_into_ssms(engine):
    orders_df.to_sql("TOrders" , con=engine , if_exists="append" , index= False)
    customers_df.to_sql("Tcustomers" , con=engine , if_exists="append" , index= False)  

def add_transactions_toCSV(number_of_records, file_path="transactions.csv"):
    columns_names = ["transaction_id", "customer_id", "product_id", "quantity","transaction_date", "branch"]
    if os.path.exists(file_path):
        existing_df = pd.read_csv(file_path, usecols=["transaction_id"])
        max_id = existing_df["transaction_id"].max()
        write_header = False
        
    else:
        max_id = 0
        write_header = True

    new_data = new_transaction_data_generator(number_of_records,max_id)

    new_df = pd.DataFrame(new_data, columns=columns_names)

    new_df.to_csv(file_path, index=False, header=write_header, mode='a')  


def insert_into_orders_db(file, engine):
    transaction_df = read_csv_into_dfs(file)

    inspector = inspect(engine)
    table_exists = inspector.has_table("TOrders")

    if not table_exists:
        # Create table
        create_table_sql = """
        CREATE TABLE TOrders (
            transaction_id INT PRIMARY KEY,
            product_name NVARCHAR(100),
            amount INT,
            customer_full_name NVARCHAR(200),
            transaction_date DATE,
            branch NVARCHAR(100)
        );
        """
        with engine.begin() as conn:
            conn.execute(text(create_table_sql))

        # Merge & insert all transactions
        orders_df = merge_orders(transaction_df, customers_df, products_df)
        orders_df.to_sql("TOrders", engine, if_exists="append", index=False)

    else:
        with engine.connect() as conn:
            row_count = conn.execute(text("SELECT COUNT(*) FROM TOrders")).scalar()

        if row_count == 0:
            orders_df = merge_orders(transaction_df, customers_df, products_df)
            orders_df.to_sql("TOrders", engine, if_exists="append", index=False)
        else:
            # Get max transaction_id already in DB
            max_id_df = pd.read_sql("SELECT MAX(transaction_id) AS max_id FROM TOrders", con=engine)
            max_id = max_id_df.iloc[0, 0]

            # Filter only new transactions
            transaction_new = transaction_df[transaction_df["transaction_id"] > max_id]

            if not transaction_new.empty:
                orders_df = merge_orders(transaction_new, customers_df, products_df)
                orders_df.to_sql("TOrders", engine, if_exists="append", index=False)
                print(f"Inserted {len(orders_df)} new transactions.")
            else:
                print("No new transactions to insert.")

#update csv (read, transform, write)
def update_csv(file, column_name, column_value, target_column, new_data):
    # Read before update
    before_update_df = read_csv_into_dfs(file)

    # Apply update
    after_update_df = before_update_df.copy()
    after_update_df.loc[after_update_df[column_name] == column_value, target_column] = new_data
    
    # Save back to CSV
    after_update_df.to_csv(file, index=False, header=True)

    # Sync changes to DB (merge old vs new)
    merge_transactions(engine, before_update_df, after_update_df)        

def merge_transactions(engine, old_df, new_df):
    # Compare new vs old on transaction_id
    merged = pd.merge(new_df, old_df, 
                      on="transaction_id", how="left", suffixes=("_new", "_old"))

    # ---- Insert new transactions ----
    new_records = merged[merged["customer_id_old"].isna()]
    if not new_records.empty:
        orders_new = merge_orders(new_records[columns_names], customers_df, products_df)
        orders_new.to_sql("TOrders", engine, if_exists="append", index=False)

    # ---- Update old transactions ----
    old_changed = merged[~merged["customer_id_old"].isna()]
    for _, row in old_changed.iterrows():
        # Check if any column value actually changed
        if (
            row["customer_id_new"] != row["customer_id_old"] or
            row["product_id_new"] != row["product_id_old"] or
            row["quantity_new"]   != row["quantity_old"] or
            row["transaction_date_new"] != row["transaction_date_old"] or
            row["branch_new"] != row["branch_old"]
        ):
            update_sql = f"""
            UPDATE TOrders
            SET product_name = '{products_df.loc[products_df['id']==row['product_id_new'],'pname'].values[0]}',
                amount = {row['quantity_new']} * (
                    SELECT price FROM Tproducts WHERE id = {row['product_id_new']}
                ),
                customer_full_name = (
                    SELECT first_name + ' ' + last_name FROM Tcustomers WHERE id = {row['customer_id_new']}
                ),
                transaction_date = '{row['transaction_date_new']}',
                branch = '{row['branch_new']}'
            WHERE transaction_id = {row['transaction_id']};
            """
            with engine.begin() as conn:
                conn.execute(text(update_sql))
    tdf = read_csv_into_dfs("transactions.csv")
    cdf = read_csv_into_dfs("Customers.csv")
    pdf = pd.read_json("products.json")
    merge_orders(tdf,cdf,pdf)            

def update_product_price(engine, product_id, new_price):
    update_sql = f"""
    UPDATE TOrders
    SET amount = {new_price} * (
        SELECT quantity 
        FROM transactions t
        WHERE t.transaction_id = TOrders.transaction_id
          AND t.product_id = {product_id}
    )
    WHERE transaction_id IN (
        SELECT transaction_id 
        FROM transactions 
        WHERE product_id = {product_id}
    );
    """
    with engine.begin() as conn:
        conn.execute(text(update_sql))    
