In [1]:
import pandas as pd
import random
from faker import Faker
import string
from datetime import timedelta

fake = Faker()

# Function to generate a random 10-character alphanumeric string
def generate_id(length=10):
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length))

# Country to currency and city mapping
country_currency_mapping = {
    'United States': {'currency': 'USD', 'cities': ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix']},
    'India': {'currency': 'INR', 'cities': ['Delhi', 'Mumbai', 'Hyderabad', 'Chennai', 'Bangalore']},
    'United Kingdom': {'currency': 'GBP', 'cities': ['London', 'Birmingham', 'Manchester', 'Glasgow', 'Liverpool']},
    'Japan': {'currency': 'JPY', 'cities': ['Tokyo', 'Osaka', 'Nagoya', 'Sapporo', 'Fukuoka']},
    'Germany': {'currency': 'EUR', 'cities': ['Berlin', 'Hamburg', 'Munich', 'Cologne', 'Frankfurt']},
}

# Generate random country not in the list
def generate_anomalous_country():
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))

# Customer data generation
def generate_customers(num_customers, anomaly_percentage):
    customers = []
    countries = list(country_currency_mapping.keys())
    num_anomalies = int(num_customers * anomaly_percentage / 100)

    for i in range(num_customers):
        customer_id = generate_id()
        first_name = fake.first_name()
        last_name = fake.last_name()
        date_of_birth = fake.date_of_birth(minimum_age=18, maximum_age=90)
        gender = random.choice(['Male', 'Female'])
        email = fake.email()
        phone_number = fake.phone_number()
        address = fake.street_address()
        
        # Introduce anomalies in country
        if i < num_anomalies // 2:
            country = generate_anomalous_country()  # Anomalous country
            city = "Unknown"
        else:
            country = random.choice(countries)
            city = random.choice(country_currency_mapping[country]['cities'])

        # Introduce uncleansed data
        if i < num_anomalies // 4:
            date_of_birth = fake.date_of_birth(minimum_age=150, maximum_age=200)  # Invalid age
        elif i < num_anomalies // 4 * 2:
            first_name = f"{first_name}#$!"
        elif i < num_anomalies // 4 * 3:
            last_name = None  # Null last_name
        elif i < num_anomalies // 4 * 4:
            date_of_birth = None  # Null date_of_birth

        occupation = fake.job()
        income_bracket = random.choice(['Low', 'Medium', 'High'])
        customer_since = fake.date_between(start_date='-2y', end_date='now')

        customers.append({
            'customer_id': customer_id,
            'first_name': first_name,
            'last_name': last_name,
            'date_of_birth': date_of_birth,
            'gender': gender,
            'email': email,
            'phone_number': phone_number,
            'address': address,
            'city': city,
            'country': country,
            'occupation': occupation,
            'income_bracket': income_bracket,
            'customer_since': customer_since
        })

    return pd.DataFrame(customers)

# Account data generation
def generate_accounts(customers_df):
    accounts = []
    account_types = ['checking', 'savings', 'credit card', 'loan']
    account_statuses = ['active', 'dormant', 'closed']

    for _, customer in customers_df.iterrows():
        for _ in range(random.randint(1, 3)):  # Each customer can have 1 to 3 accounts
            account_id = generate_id()
            customer_id = customer['customer_id']
            account_type = random.choice(account_types)
            account_status = random.choice(account_statuses)
            open_date = fake.date_between(start_date=customer['customer_since'])
            current_balance = round(random.uniform(0.0, 100000.0), 2)
            currency = country_currency_mapping.get(customer['country'], {'currency': 'USD'})['currency']
            credit_limit = round(random.uniform(1000.0, 50000.0), 2) if account_type == 'credit card' else 0.0

            accounts.append({
                'account_id': account_id,
                'customer_id': customer_id,
                'account_type': account_type,
                'account_status': account_status,
                'open_date': open_date,
                'current_balance': current_balance,
                'currency': currency,
                'credit_limit': credit_limit
            })

    return pd.DataFrame(accounts)

# Transaction data generation with anomalies
def generate_transaction_data(num_transactions, customers_df, accounts_df, anomaly_percentage):
    transactions = []
    channels = ['online', 'mobile', 'ATM', 'in-branch']
    transaction_types = ['purchase', 'transfer', 'withdrawal', 'deposit']
    merchants = ['Amazon', 'Walmart', 'Target', 'Best Buy', 'Costco']
    categories = ['Retail', 'Grocery', 'Electronics', 'Clothing', 'Miscellaneous']
    
    num_anomalies = int(num_transactions * anomaly_percentage / 100)
    anomaly_counter = 0

    for i in range(num_transactions):
        customer = customers_df.sample(1).iloc[0]
        account = accounts_df[accounts_df['customer_id'] == customer['customer_id']].sample(1).iloc[0]
        transaction_id = generate_id()
        customer_id = customer['customer_id']
        transaction_date = fake.date_time_between(start_date='-2y', end_date='now')
        amount = round(random.uniform(1.0, 10000.0), 2)
        country_info = country_currency_mapping.get(customer['country'], {'currency': 'USD', 'cities': ['New York']})
        currency = country_info['currency']
        transaction_type = random.choice(transaction_types)
        channel = random.choice(channels)
        merchant_name = random.choice(merchants)
        merchant_category = random.choice(categories)
        location_country = customer['country']
        location_city = random.choice(country_info['cities'])
        is_flagged = False

        # Introduce transaction anomalies
        if anomaly_counter < num_anomalies:
            if anomaly_counter < num_anomalies // 3:
                if transaction_type == 'purchase':
                    amount = round(random.uniform(200000.0, 500000.0), 2)  # High-value purchase anomaly
                    is_flagged = True
            elif anomaly_counter < num_anomalies // 3 * 2:
                if transaction_type == 'transfer':
                    amount = round(random.uniform(100000.0, 300000.0), 2)  # High-value transfer anomaly
                    is_flagged = True
            else:
                if transaction_type == 'withdrawal':
                    same_customer_withdrawals = transactions[-5:]  # Get the last 5 transactions
                    if all(t['transaction_type'] == 'withdrawal' and 
                           t['customer_id'] == customer_id and 
                           (transaction_date - t['transaction_date']).total_seconds() <= 600
                           for t in same_customer_withdrawals):
                        is_flagged = True  # Flagged due to frequent withdrawals
            anomaly_counter += 1

        transactions.append({
            'transaction_id': transaction_id,
            'customer_id': customer_id,
            'transaction_date': transaction_date,
            'amount': amount,
            'currency': currency,
            'transaction_type': transaction_type,
            'channel': channel,
            'merchant_name': merchant_name,
            'merchant_category': merchant_category,
            'location_country': location_country,
            'location_city': location_city,
            'is_flagged': is_flagged
        })

    return pd.DataFrame(transactions)

# Credit data generation
def generate_credit_data(customers_df):
    credit_data = []
    for _, customer in customers_df.iterrows():
        credit_score = random.randint(300, 850)
        number_of_credit_accounts = random.randint(1, 10)
        total_credit_limit = round(random.uniform(1000.0, 50000.0), 2)
        total_credit_used = round(random.uniform(0.0, total_credit_limit), 2)
        number_of_late_payments = random.randint(0, 5)
        bankruptcies = random.randint(0, 1)

        credit_data.append({
            'customer_id': customer['customer_id'],
            'credit_score': credit_score,
            'number_of_credit_accounts': number_of_credit_accounts,
            'total_credit_limit': total_credit_limit,
            'total_credit_used': total_credit_used,
            'number_of_late_payments': number_of_late_payments,
            'bankruptcies': bankruptcies
        })

    return pd.DataFrame(credit_data)

# Generate all data
def generate_data(num_customers, num_transactions, anomaly_percentage):
    customers_df = generate_customers(num_customers, anomaly_percentage)
    accounts_df = generate_accounts(customers_df)
    transactions_df = generate_transaction_data(num_transactions, customers_df, accounts_df, anomaly_percentage)
    credit_data_df = generate_credit_data(customers_df)
    
    return customers_df, accounts_df, transactions_df, credit_data_df

# Save data to CSV
def save_data_to_csv(customers_df, accounts_df, transactions_df, credit_data_df):
    customers_df.to_csv('C:/snowflake1/data/customers.csv', index=False)
    accounts_df.to_csv('C:/snowflake1/data/accounts.csv', index=False)
    transactions_df.to_csv('C:/snowflake1/data/transactions.csv', index=False)
    credit_data_df.to_csv('C:/snowflake1/data/credit_data.csv', index=False)

# Main function
if __name__ == "__main__":
    num_customers = 100  # Number of customers
    num_transactions = 1000  # Number of transactions
    anomaly_percentage = 5  # Percentage of anomalies and uncleansed data

    customers_df, accounts_df, transactions_df, credit_data_df = generate_data(num_customers, num_transactions, anomaly_percentage)
    save_data_to_csv(customers_df, accounts_df, transactions_df, credit_data_df)


In [2]:
pip install apache-airflow




In [9]:
pip install dbt-core

Collecting dbt-coreNote: you may need to restart the kernel to use updated packages.

  Downloading dbt_core-1.8.4-py3-none-any.whl.metadata (3.9 kB)
Collecting agate<1.10,>=1.7.0 (from dbt-core)
  Downloading agate-1.9.1-py2.py3-none-any.whl.metadata (3.2 kB)
Collecting mashumaro<4.0,>=3.9 (from mashumaro[msgpack]<4.0,>=3.9->dbt-core)
  Downloading mashumaro-3.13.1-py3-none-any.whl.metadata (114 kB)
     ---------------------------------------- 0.0/114.3 kB ? eta -:--:--
     --------- --------------------------- 30.7/114.3 kB 660.6 kB/s eta 0:00:01
     -------------------------------------- 114.3/114.3 kB 1.3 MB/s eta 0:00:00
Collecting logbook<1.6,>=1.5 (from dbt-core)
  Downloading Logbook-1.5.3.tar.gz (85 kB)
     ---------------------------------------- 0.0/85.8 kB ? eta -:--:--
     ---------------------------------------- 85.8/85.8 kB ? eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting protobuf<5,>=4.0.