In [174]:
schema = {
  "Columns": [
    {
      "column_name": "UserID",
      "datatype": "integer",
      "data_category": "numeric",
      "possible_values": [],
      "conditions": {
        "range": "1-1000",
        "uniqueness": "true"
      }
    },
    {
      "column_name": "Username",
      "datatype": "string",
      "data_category": "categorical",
      "possible_values": [
        "Alice",
        "Bob",
        "Charlie",
        "David",
        "Eve",
        "Frank",
        "Grace",
        "Heidi",
        "Ivan",
        "Judy"
      ],
      "conditions": {
        "uniqueness": "true"
      }
    },
    {
      "column_name": "Email",
      "datatype": "string",
      "data_category": "categorical",
      "possible_values": [],
      "conditions": {
        "pattern": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
      }
    },
    {
      "column_name": "Age",
      "datatype": "integer",
      "data_category": "numeric",
      "possible_values": [],
      "conditions": {
        "range": "18-99"
      }
    },
    {
      "column_name": "Country",
      "datatype": "string",
      "data_category": "categorical",
      "possible_values": [
        "USA",
        "Canada",
        "UK",
        "Germany",
        "France",
        "Australia",
        "India",
        "Brazil",
        "Japan",
        "South Africa"
      ],
      "conditions": {}
    },
    {
      "column_name": "SignUpDate",
      "datatype": "date",
      "data_category": "categorical",
      "possible_values": [
        "2022-01-01",
        "2022-03-15",
        "2022-06-30",
        "2022-09-10",
        "2022-12-25",
        "2023-02-14",
        "2023-05-01",
        "2023-07-04",
        "2023-09-30",
        "2023-11-11"
      ],
      "conditions": {}
    },
    {
      "column_name": "SubscriptionType",
      "datatype": "string",
      "data_category": "categorical",
      "possible_values": [
        "Free",
        "Basic",
        "Premium",
        "Enterprise",
        "Student",
        "Family",
        "Lifetime",
        "Annual",
        "Monthly",
        "Trial"
      ],
      "conditions": {}
    },
    {
      "column_name": "Status",
      "datatype": "string",
      "data_category": "categorical",
      "possible_values": [
        "Active",
        "Inactive",
        "Pending",
        "Suspended",
        "Cancelled",
        "Expired",
        "Banned",
        "Verified",
        "Unverified",
        "Deleted"
      ],
      "conditions": {}
    },
    {
      "column_name": "Balance",
      "datatype": "float",
      "data_category": "numeric",
      "possible_values": [],
      "conditions": {
        "range": "0.00-10000.00"
      }
    },
    {
      "column_name": "LastLogin",
      "datatype": "datetime",
      "data_category": "categorical",
      "possible_values": [
        "2023-01-01 10:00:00",
        "2023-02-15 08:30:00",
        "2023-03-20 15:45:00",
        "2023-04-25 12:00:00",
        "2023-05-30 17:30:00",
        "2023-07-04 14:20:00",
        "2023-08-15 09:00:00",
        "2023-09-10 19:10:00",
        "2023-10-31 23:59:59",
        "2023-12-25 06:30:00"
      ],
      "conditions": {}
    }
  ]
}


In [175]:
import random
import re
import string

def generate_email_sample(regex, length):
    # Step 1: Define the characters allowed in different parts of the email
    allowed_chars_local = string.ascii_letters + string.digits + "._%+-"
    allowed_chars_domain = string.ascii_letters + string.digits + ".-"
    
    # Step 2: Ensure at least 10 characters before the first `+`
    local_part_length = 10  # Start with 10 characters
    local_part = ''.join(random.choices(allowed_chars_local, k=local_part_length))
    
    # Step 3: Calculate remaining length for domain and TLD
    remaining_length = length - local_part_length - 1  # Subtract 1 for the `@`
    
    # Step 4: Reserve 4 characters for ".com" (or any TLD)
    tld = ".com"
    tld_length = len(tld)
    domain_part_length = remaining_length - tld_length
    
    # Step 5: Generate the domain part
    domain_part = ''.join(random.choices(allowed_chars_domain, k=domain_part_length))
    
    # Step 6: Combine to form the email
    email = local_part + "@" + domain_part + tld
    
    # Step 7: Validate that the generated email matches the regex
    if re.match(regex, email):
        return email
    else:
        # In case of mismatch, retry or adjust as needed
        return None

def generate_multiple_email_samples(regex, length, n):
    samples = []
    while len(samples) < n:
        email = generate_email_sample(regex, length)
        if email:  # Only add valid emails
            samples.append(email)
    return samples

In [176]:
print([random.uniform(0, 1) for i in range(0,20)])

[0.8085428032516538, 0.4930917708996744, 0.9160790912629925, 0.5094766492274416, 0.6443747802848737, 0.7779664382893824, 0.415069128570539, 0.025845985512135905, 0.609048799898896, 0.9074517814131824, 0.8704496084147008, 0.7913445288786698, 0.3176821399801345, 0.4458677689305579, 0.3743408144347511, 0.07067725137639402, 0.10300175196151484, 0.10361372046244666, 0.04058728452009408, 0.18467011027039326]


In [177]:
def generate_default_possible_values(column, num_samples=20):
    """
    Generates default possible values if none are provided in the schema.
    """
    datatype = column['datatype']
    conditions = column.get('conditions', {})
    
    if 'range' in conditions and datatype == 'integer':
        # Generate default integers within the specified range
        range_start, range_end = map(int, conditions['range'].split('-'))
        return list(range(range_start, range_end + 1))
    elif 'range' in conditions and (datatype == "float" or datatypr == 'double'):
        range_start, range_end = map(float, conditions['range'].split('-'))
        return [random.uniform(range_start, range_end) for i in range(0,num_samples)]
    elif datatype == 'integer':
        # Default to generating a range of integers if no conditions are provided
        return list(range(1, num_samples + 1))
    
    elif 'pattern' in conditions and datatype == 'string' and column['data_category'] == 'categorical':
        pattern = conditions['pattern']
        print("in")
        
        if pattern == "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$":
            print("in")
            # Special case for email pattern
            print(pattern)
            return generate_multiple_email_samples(pattern, 20, num_samples)
        
        generated_values = []
        
        # Simple approach to generate strings based on common regex patterns
        for _ in range(num_samples):
            value = ''
            if re.match(r'^[a-zA-Z0-9_]{3,20}$', pattern):
                length = random.randint(3, 20)
                value = ''.join(random.choices(string.ascii_letters + string.digits + '_', k=length))
            elif re.match(r'^[a-zA-Z]{2,10}$', pattern):
                length = random.randint(2, 10)
                value = ''.join(random.choices(string.ascii_letters, k=length))
            elif re.match(r'^\d+$', pattern):
                length = random.randint(1, 5)
                value = ''.join(random.choices(string.digits, k=length))
            else:
                # Fallback for complex patterns - generate random alphanumeric strings
                length = random.randint(3, 20)
                value = ''.join(random.choices(string.ascii_letters + string.digits, k=length))

            generated_values.append(value)
        
        return generated_values
    
    elif datatype == 'string':
        # Default to generating random alphanumeric strings
        return [''.join(random.choices(string.ascii_letters + string.digits, k=10)) for _ in range(num_samples)]
    
    return []

def preprocess_schema(schema, num_samples=20):
    processed_columns = []
    encoders = {}
    max_values = 0
    
    # Determine the maximum number of possible values in the schema
    for column in schema['Columns']:
        possible_values = column['possible_values']

        if not possible_values:  # If possible_values is empty or None, generate defaults
            possible_values = generate_default_possible_values(column, num_samples)
            column['possible_values'] = possible_values  # Assign the generated values back to the schema
        else:
            # Handle the range case in possible_values
            if isinstance(possible_values, str) and possible_values.startswith("range"):
                range_start, range_end = map(int, possible_values[6:-1].split(','))
                possible_values = list(range(range_start, range_end + 1))
                column['possible_values'] = possible_values  # Ensure the list is assigned back to the schema

        if len(possible_values) > max_values:
            max_values = len(possible_values)

    for column in schema['Columns']:
        column_name = column['column_name']
        datatype = column['datatype']
        possible_values = column['possible_values']
        conditions = column.get('conditions', {})
        
#         # Preprocess conditions
#         if 'range' in conditions:
#             if isinstance(conditions['range'], str):
#                 range_start, range_end = map(int, conditions['range'].split('-'))
#                 conditions['range'] = list(range(range_start, range_end + 1))
#             elif isinstance(conditions['range'], list) and len(conditions['range']) == 2:
#                 range_start, range_end = conditions['range']
#                 conditions['range'] = list(range(range_start, range_end + 1))

#         if 'pattern' in conditions:
#             try:
#                 re.compile(conditions['pattern'])
#             except re.error:
#                 raise ValueError(f"Invalid regex pattern in column {column_name}: {conditions['pattern']}")

#         if 'uniqueness' in conditions:
#             conditions['uniqueness'] = bool(conditions['uniqueness'])
        
        # Pad possible values to match the maximum number of values
        if len(possible_values) < max_values:
            possible_values = (possible_values * (max_values // len(possible_values) + 1))[:max_values]
            column['possible_values'] = possible_values  # Ensure the padded list is assigned back to the schema

        if column['data_category'] == 'categorical':
            # Apply label encoding for categorical data
            encoder = LabelEncoder()
            encoded_values = encoder.fit_transform(possible_values)
            processed_columns.append({
                'column_name': column_name,
                'datatype': 'integer',  # Categorical data is now encoded as integers
                'possible_values': encoded_values.tolist(),
                'data_category': 'numeric',  # Encoded data is now numeric
                'conditions': conditions  # Include processed conditions
            })
            encoders[column_name] = encoder
        else:
            # Pad numeric possible values if necessary
            processed_columns.append({
                'column_name': column_name,
                'datatype': datatype,
                'possible_values': possible_values,
                'data_category': column['data_category'],
                'conditions': conditions  # Include processed conditions
            })

    # Update schema with processed columns
    processed_schema = {
        'Columns': processed_columns
    }

    return processed_schema, encoders

In [178]:
import numpy as np
import pandas as pd

def decode_data(encoded_df, encoders, original_schema):
    decoded_df = encoded_df.copy()

    for column in original_schema['Columns']:
        column_name = column['column_name']
        possible_values = column['possible_values']

        # Handle the range case
        if isinstance(possible_values, str) and possible_values.startswith("range"):
            range_start, range_end = map(int, possible_values[6:-1].split(','))
            possible_values = list(range(range_start, range_end + 1))

        if column_name in encoders:
            encoder = encoders[column_name]

            # Decode the encoded values back to the original categorical values
            decoded_column = encoder.inverse_transform(
                np.clip(encoded_df[column_name].astype(int), 0, len(encoder.classes_) - 1)
            )

            # Map the padded values back to the original values
            unique_original_values = list(dict.fromkeys(possible_values))  # Maintain order, remove duplicates
            unique_decoded_values = list(dict.fromkeys(decoded_column))  # Remove duplicates from decoded values
            value_map = {decoded_val: original_val for decoded_val, original_val in zip(unique_decoded_values, unique_original_values)}

            # Apply mapping and handle any unmapped values
            decoded_column_series = pd.Series(decoded_column)
            decoded_df[column_name] = decoded_column_series.map(value_map).fillna(decoded_column_series).values
        else:
            # For numeric columns with a range, ensure values are within the range
            if isinstance(possible_values, list) and len(possible_values) > 0:
                min_value, max_value = min(possible_values), max(possible_values)
                decoded_df[column_name] = np.clip(decoded_df[column_name], min_value, max_value)

    return decoded_df


In [179]:
from ctgan import CTGAN

def ctgan_generate_synthetic_data(schema, num_samples):
    # Preprocess the schema to prepare it for CTGAN
    processed_schema, encoders = preprocess_schema(schema, num_samples)

    # Prepare data for CTGAN
    df = pd.DataFrame()
    
    for column in processed_schema['Columns']:  # Use 'Columns' instead of 'columns'
        column_name = column['column_name']
        possible_values = column['possible_values']
        print(column_name,len(possible_values))

        if len(possible_values) == 0:
            # Generate random data if possible_values is empty
            df[column_name] = generate_default_possible_values(column, num_samples)
        else:
            # Repeat possible values to match num_samples for the initial fit
            if len(possible_values) < num_samples:
                repeated_values = (possible_values * (num_samples // len(possible_values) + 1))[:num_samples]
            else:
                repeated_values = possible_values[:num_samples]

            df[column_name] = repeated_values

    # Fit the CTGAN model
    ctgan = CTGAN()
    ctgan.fit(df, epochs=300)

    # Generate synthetic data
    synthetic_data = ctgan.sample(num_samples)

    # Decode the synthetic data if needed
    decoded_data = decode_data(synthetic_data, encoders, schema)
    
    return decoded_data


In [180]:
generated_data = ctgan_generate_synthetic_data(schema, 10)
generated_data.head()

in
in
^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$
UserID 1000
Username 1000
Email 1000
Age 1000
Country 1000
SignUpDate 1000
SubscriptionType 1000
Status 1000
Balance 1000
LastLogin 1000


Unnamed: 0,UserID,Username,Email,Age,Country,SignUpDate,SubscriptionType,Status,Balance,LastLogin
0,9,Alice,fZDzEvcXKR@iwYRY.com,24,USA,2022-01-01,Free,Active,7944.988059,2023-01-01 10:00:00
1,6,Bob,DkSpYe0WFb@dlyNn.com,21,Canada,2022-03-15,Basic,Inactive,4737.022254,2023-02-15 08:30:00
2,3,Charlie,dauqerRr1r@-6fGe.com,20,UK,2022-01-01,Premium,Pending,6199.765939,2023-03-20 15:45:00
3,8,Alice,fZDzEvcXKR@iwYRY.com,26,Canada,2022-06-30,Enterprise,Suspended,4464.679521,2023-01-01 10:00:00
4,3,Alice,f+vm%V0qpP@k7rFZ.com,27,Germany,2022-03-15,Student,Cancelled,8524.282929,2023-04-25 12:00:00
