In [None]:
import mysql.connector
import random
import string
from faker import Faker
import pycountry
import phonenumbers
import csv
from tqdm import tqdm
from dotenv import load_dotenv
import os

# Set up Faker and country data
fake = Faker()
countries = [country.name for country in pycountry.countries]
Faker.seed(42)

In [2]:
# create connection
load_dotenv()

def create_connection():
    try:
        # Get credentials from environment variables
        db_user = os.getenv("db_user")
        db_password = os.getenv("db_password")
        db_host = os.getenv("db_host")
        db_name = os.getenv("db_name")

        # Create the connection
        connection = mysql.connector.connect(
            host=db_host,
            user=db_user,
            password=db_password,
            database=db_name
        )
        return connection
    except mysql.connector.Error as err:
        print(f"Error: {err}")
        return None

In [13]:
def check_if_tables_exist():
    try:
        connection = create_connection()
        if connection is None:
            print("Connection failed. Exiting.")
            return
        
        cursor = connection.cursor()

        # Check if any tables exist in the current database
        cursor.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE()")
        table_count = cursor.fetchone()[0]

        if table_count > 0:
            print(f"There are {table_count} tables in the database.")
        else:
            print("No tables exist in the database.")

        cursor.close()
        connection.close()

    except mysql.connector.Error as err:
        print(f"Error: {err}")

# Check if tables exist
check_if_tables_exist()

There are 8 tables in the database.


In [14]:
def remove_all_constraints_and_drop_tables():
    try:
        connection = create_connection()
        if connection is None:
            print("Connection failed. Exiting.")
            return
        
        cursor = connection.cursor()

        # Disable foreign key checks
        cursor.execute("SET foreign_key_checks = 0")

        # Get all the foreign key constraints
        cursor.execute("""
        SELECT CONSTRAINT_NAME, TABLE_NAME
        FROM information_schema.KEY_COLUMN_USAGE
        WHERE CONSTRAINT_SCHEMA = DATABASE() AND REFERENCED_TABLE_NAME IS NOT NULL
        """)
        constraints = cursor.fetchall()

        # Drop all foreign key constraints
        for constraint in constraints:
            constraint_name, table_name = constraint
            cursor.execute(f"ALTER TABLE {table_name} DROP FOREIGN KEY {constraint_name}")
            print(f"Dropped foreign key constraint: {constraint_name} on table {table_name}")

        # Get all the table names from the information_schema
        cursor.execute("SHOW TABLES")
        tables = cursor.fetchall()

        # Drop all tables
        for table in tables:
            table_name = table[0]
            cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
            print(f"Dropped table: {table_name}")

        # Enable foreign key checks back
        cursor.execute("SET foreign_key_checks = 1")

        connection.commit()
        cursor.close()
        connection.close()

        print("All constraints removed and tables dropped successfully.")
    
    except mysql.connector.Error as err:
        print(f"Error: {err}")

# Remove all constraints and drop all tables
remove_all_constraints_and_drop_tables()

Dropped foreign key constraint: order_items_ibfk_1 on table order_items
Dropped foreign key constraint: order_items_ibfk_2 on table order_items
Dropped foreign key constraint: orders_ibfk_1 on table orders
Dropped foreign key constraint: orders_ibfk_2 on table orders
Dropped foreign key constraint: payments_ibfk_1 on table payments
Dropped foreign key constraint: products_ibfk_1 on table products
Dropped foreign key constraint: products_ibfk_2 on table products
Dropped table: categories
Dropped table: customers
Dropped table: order_items
Dropped table: orders
Dropped table: payments
Dropped table: products
Dropped table: shippers
Dropped table: suppliers
All constraints removed and tables dropped successfully.


In [15]:
import phonenumbers
import pycountry

# Build a dictionary of country name to dialing code
def get_all_country_dialing_codes():
    country_code_map = {}
    for country in pycountry.countries:
        try:
            country_alpha2 = country.alpha_2
            example_number = phonenumbers.example_number_for_type(country_alpha2, phonenumbers.PhoneNumberType.MOBILE)
            if example_number:
                dialing_code = str(phonenumbers.country_code_for_region(country_alpha2))
                country_code_map[country.name] = dialing_code
        except:
            continue
    return country_code_map

# Generate the full dictionary
country_codes = get_all_country_dialing_codes()

In [None]:
# Country codes for phone number generation
country_codes = get_all_country_dialing_codes()


# Connect to MySQL
connection = mysql.connector.connect( 
    host=os.getenv("db_host"),
    user=os.getenv("db_user"),
    password=os.getenv("db_password"),
    database=os.getenv("db_name")
)

cursor = connection.cursor()

# Drop if exists and recreate sales_data table
cursor.execute("DROP TABLE IF EXISTS sales_data")
cursor.execute("""
CREATE TABLE sales_data (
    customer_id VARCHAR(10),
    name VARCHAR(100),
    email VARCHAR(100),
    phone VARCHAR(20),
    address VARCHAR(255),
    date_of_birth DATE,
    gender ENUM('Male', 'Female', 'Non-binary'),
    country VARCHAR(100),
    city VARCHAR(100),
    registration_date DATE,
    loyalty_points INT,
    order_id VARCHAR(10),
    order_date DATE,
    order_status ENUM('Pending', 'Shipped', 'Delivered', 'Cancelled', 'Returned'),
    shipper_name VARCHAR(100),
    shipping_address VARCHAR(255),
    order_amount DECIMAL(10,2),
    shipping_cost DECIMAL(10,2),
    product_id VARCHAR(10),
    product_name VARCHAR(100),
    category VARCHAR(50),
    price DECIMAL(10,2),
    stock_quantity INT,
    supplier VARCHAR(100),
    warranty_period VARCHAR(50),
    country_of_origin VARCHAR(100),
    order_item_id VARCHAR(10),
    quantity INT,
    unit_price DECIMAL(10,2),
    discount DECIMAL(5,2),
    payment_id VARCHAR(10),
    payment_date DATE,
    amount DECIMAL(10,2),
    payment_method ENUM('Credit Card', 'PayPal', 'Bank Transfer', 'Mobile Money'),
    payment_status ENUM('Completed', 'Pending', 'Failed'),
    payment_description VARCHAR(255)
)
""")

In [17]:
# Data generators
def random_id(prefix, length=6):
    return prefix + ''.join(random.choices(string.digits, k=length))

def generate_email(first_name, last_name):
    domains = ['gmail.com', 'yahoo.com', 'outlook.com', 'hotmail.com', 'aol.com', 'zoho.com']
    return f"{first_name.lower()}.{last_name.lower()}@{random.choice(domains)}"


def generate_phone_number(dialing_code):
    # Create a random 9-digit number
    number = ''.join(random.choices(string.digits, k=9))
    return f"+{dialing_code}{number}"



def generate_sales_data_row():
    first_name = fake.first_name()
    last_name = fake.last_name()
    email = generate_email(first_name, last_name)
    phone = generate_phone_number(random.choice(list(country_codes.values())))
    address = fake.address().replace("\n", ", ")[:255]
    dob = fake.date_of_birth(minimum_age=18, maximum_age=70)
    gender = random.choice(['Male', 'Female', 'Non-binary'])
    country = random.choice(countries)
    city = fake.city()
    registration_date = fake.date_between(start_date='-5y', end_date='today')
    loyalty_points = random.randint(0, 1000)
    order_id = random_id("ORD")
    order_date = fake.date_between(start_date=registration_date, end_date='today')
    order_status = random.choice(['Pending', 'Shipped', 'Delivered', 'Cancelled', 'Returned'])
    shipper_name = fake.company()[:100]
    shipping_address = fake.address().replace("\n", ", ")[:255]
    order_amount = round(random.uniform(20, 1000), 2)
    shipping_cost = round(random.uniform(5, 50), 2)
    product_id = random_id("PRO")
    product_name = fake.word().title() + " " + fake.word().title()
    category = random.choice(['Electronics', 'Clothing', 'Home', 'Sports', 'Books', 'Beauty', 'Toys', 'Furniture', 'Automotive'])[:50]
    price = round(random.uniform(10, 500), 2)
    stock_quantity = random.randint(0, 100)
    supplier = fake.company()[:100]
    warranty_period = random.choice(['6 months', '1 year', '2 years', '3 years', 'Lifetime'])[:50]
    country_of_origin = random.choice(countries)[:100]
    order_item_id = random_id("ITM")
    quantity = random.randint(1, 5)
    unit_price = round(random.uniform(10, 500), 2)
    discount = round(random.uniform(0, 50), 2)
    payment_id = random_id("PAY")
    payment_date = fake.date_between(start_date=order_date, end_date='today')
    amount = round(unit_price * quantity - discount, 2)
    payment_method = random.choice(['Credit Card', 'PayPal', 'Bank Transfer', 'Mobile Money'])
    payment_status = random.choice(['Completed', 'Pending', 'Failed'])
    payment_description = fake.sentence(nb_words=6)[:255]

    return (
        random_id("CST"), first_name[:50] + ' ' + last_name[:50], email[:100], phone, address,
        dob, gender, country[:100], city[:100], registration_date, loyalty_points,
        order_id, order_date, order_status, shipper_name, shipping_address,
        order_amount, shipping_cost, product_id, product_name[:100], category,
        price, stock_quantity, supplier, warranty_period, country_of_origin,
        order_item_id, quantity, unit_price, discount,
        payment_id, payment_date, amount, payment_method, payment_status,
        payment_description
    )
    

# Insert multiple rows
insert_query = """
INSERT INTO sales_data VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""

for _ in tqdm(range(200000), desc="Generating rows for MySQL", ncols=100):  # Generate 200,000 rows
    row = generate_sales_data_row()
    cursor.execute(insert_query, row)

connection.commit()
print("✅ Data inserted into `sales_data` successfully!")
cursor.close()
connection.close()

Generating rows for MySQL: 100%|███████████████████████████| 200000/200000 [06:36<00:00, 503.85it/s]


✅ Data inserted into `sales_data` successfully!


In [18]:
# Saving the data to CSV
csv_filename = "sales_data.csv"
with open(csv_filename, mode="w", newline="") as file:
    writer = csv.writer(file)
    csv_columns = ['customer_id', 'name', 'email', 'phone', 'address', 'date_of_birth', 'gender', 'country', 'city', 
                   'registration_date', 'loyalty_points', 'order_id', 'order_date', 'order_status', 'shipper_name', 
                   'shipping_address', 'order_amount', 'shipping_cost', 'product_id', 'product_name', 'category', 
                   'price', 'stock_quantity', 'supplier', 'warranty_period', 'country_of_origin', 'order_item_id', 
                   'quantity', 'unit_price', 'discount', 'payment_id', 'payment_date', 'amount', 'payment_method', 
                   'payment_status', 'payment_description']
    writer.writerow(csv_columns)  # Write header
    for _ in tqdm(range(200000), desc="Saving to CSV", ncols=100):  # Generate 200,000 rows
        row = generate_sales_data_row()
        writer.writerow(row)

print(f"✅ Data saved to {csv_filename} successfully!")

Saving to CSV: 100%|███████████████████████████████████████| 200000/200000 [07:13<00:00, 461.06it/s]

✅ Data saved to sales_data.csv successfully!



