In [2]:
import csv
import re

def generate_insert_statements(
    csv_file, 
    create_stream_statement, 
    output_file, 
    table_name):
    # Extract content within parentheses
    stream_content = re.search(r'\((.*?)\)', create_stream_statement, re.DOTALL).group(1)
    
    # Split content into individual column definitions
    columns = re.findall(r'([a-zA-Z_]+)\s+([a-zA-Z_]+)', stream_content)

    # Extract column names and data types, stripping BOM if present
    column_names_create_stream = [col[0].strip() for col in columns]
    data_types_create_stream = {col[0].strip(): col[1] for col in columns}

    # Open the CSV file
    with open(csv_file, newline='', encoding='utf-8-sig') as csvfile:
        reader = csv.DictReader(csvfile)
        insert_statements = []

        # Iterate over each row in the CSV
        for row in reader:
            columns = []
            values = []

            # Iterate over each column in the row
            for field, value in row.items():
                if field in column_names_create_stream:
                    # Column name exists in create_stream_statement
                    data_type = data_types_create_stream[field]
                    if value is None:
                        columns.append(field)
                        values.append("")
                    else:
                        if data_type == "DOUBLE" or data_type == "INTEGER":
                            value_str = str(value)
                            if value_str.strip() == "":
                                columns.append(field)
                                values.append("")
                            else:
                                columns.append(field)
                                values.append(value_str)
                        elif data_type == "TIMESTAMP" or data_type == "DATE":
                            columns.append(field)
                            values.append("'" + str(value) + "'")
                        elif data_type == "VARCHAR":
                            columns.append(field)
                            values.append("'" + value.replace("'", "''") + "'")

            # Construct the INSERT INTO statement with the specified table name
            insert_statement = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(values)});"
            insert_statements.append(insert_statement)

    # Write insert statements to output file
    with open(output_file, 'w') as f:
        for statement in insert_statements:
            f.write(statement + '\n')

In [2]:
create_stream_statement = """
CREATE TABLE account (
    id INTEGER,
    cust_id VARCHAR KEY,
    acquisition_cost FLOAT,
    internet_banking_indicator VARCHAR,
    date_first_account_opened VARCHAR,
    date_last_account_opened VARCHAR,
    pursuit VARCHAR,
    primary_advisor_organization_id INTEGER,
    primary_branch_proximity INTEGER,
    primary_spoken_language VARCHAR,
    primary_written_language VARCHAR,
    satisfaction_rating_from_survey VARCHAR,
    secondary_advisor_id INTEGER,
    secondary_advisor_organization_id INTEGER,
    special_te VARCHAR);
"""

input_data_file = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/data/Account.csv'

output_file_location = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/statements/inserts/account_inserts.sql'

generate_insert_statements(
    input_data_file, 
    create_stream_statement, 
    output_file_location,
    "account"
    )

In [3]:
create_stream_statement = """
CREATE TABLE customer (
    id INTEGER,
    cust_id VARCHAR,
    gender VARCHAR,
    first_name VARCHAR,
    last_name VARCHAR,
    email VARCHAR,
    ssn VARCHAR,
    age_range VARCHAR, 
    annual_income VARCHAR,
    birth_year INTEGER,
    current_employment_start_date VARCHAR,
    customer_behavior VARCHAR,
    education_level VARCHAR,
    employment_status VARCHAR,
    marital_status VARCHAR,
    monthly_net_income INTEGER,
    profession VARCHAR,
    retirement_age INTEGER,
    customer_status VARCHAR,
    wallet_share_percentage INTEGER);
"""


input_data_file = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/data/Customer.csv'

output_file_location = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/statements/inserts/customer_inserts.sql'

generate_insert_statements(
    input_data_file, 
    create_stream_statement, 
    output_file_location,
    "customer"
    )

In [7]:
create_stream_statement = """
CREATE STREAM financials (
    id INTEGER,
    cust_id VARCHAR KEY,
    monthly_housing_cost INTEGER,
    contact_preference VARCHAR,
    credit_authority_level VARCHAR,
    credit_score INTEGER,
    credit_utilization DOUBLE,
    debt_service_coverage_ratio INTEGER
)
"""

input_data_file = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/data/Financials.csv'

output_file_location = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/statements/inserts/financials_inserts.sql'

generate_insert_statements(
    input_data_file, 
    create_stream_statement, 
    output_file_location,
    "financials"
    )

In [3]:
create_stream_statement = """
CREATE STREAM household (
    id INTEGER,
    cust_id VARCHAR KEY,
    household_id VARCHAR,
    household_address VARCHAR,
    household_city VARCHAR,
    household_country VARCHAR,
    household_state VARCHAR,
    household_zip_code VARCHAR,
    address_last_changed_date VARCHAR,
    number_of_dependent_adults INTEGER,
    number_of_dependent_children INTEGER,
    family_size INTEGER,
    head_of_household_indicator VARCHAR,
    home_owner_indicator VARCHAR,
    urban_code VARCHAR,
    primary_advisor_id INTEGER
)
"""

input_data_file = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/data/Household.csv'

output_file_location = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/statements/inserts/household_inserts.sql'

generate_insert_statements(
    input_data_file, 
    create_stream_statement, 
    output_file_location,
    "household"
    )

In [4]:
create_stream_statement = """
CREATE TABLE marketing (
    id INTEGER, 
    cust_id VARCHAR KEY,
    advertising_indicator VARCHAR,
    attachment_allowed_indicator VARCHAR,
    preferred_communication_form VARCHAR,
    importance_level_code VARCHAR,
    influence_score BIGINT,
    market_group VARCHAR,
    loyalty_rating_code BIGINT,
    recorded_voice_sample_id BIGINT,
    referrals_value_code VARCHAR,
    relationship_start_date VARCHAR);
"""

input_data_file = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/data/Marketing.csv'

output_file_location = '/Users/briandunn/Desktop/Apache Kafka- Kafka Connect/Mock Marketing Schema/statements/inserts/marketing_inserts.sql'

generate_insert_statements(
    input_data_file, 
    create_stream_statement, 
    output_file_location,
    "marketing"
    )