# Synthetic Tabular Data Generation Job

This notebook generates synthetic tabular data using:
- **dbldatagen** for structured data generation
- **ai_query()** for GenAI Text columns
- **Databricks volumes** for storage

## Parameters
The following parameters are passed from the app via job widgets:


In [None]:
# Cell 1: Import required libraries
import json
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, lit
from dbldatagen import DataGenerator, fakerText

print("üì¶ Libraries imported successfully")
print(f"   - Execution time: {datetime.now()}")


In [None]:
# Cell 2: Get job parameters via widgets
# Databricks automatically creates widgets from job parameters

try:
    # Create widgets with default values (these will be overridden by job parameters)
    dbutils.widgets.text("table_name", "sample_table", "Table Name")
    dbutils.widgets.text("row_count", "1000", "Row Count")
    dbutils.widgets.text("columns", "[]", "Columns JSON")
    dbutils.widgets.text("company_name", "Sample Company", "Company Name")
    dbutils.widgets.text("company_sector", "Technology", "Company Sector")
    dbutils.widgets.text("timestamp", datetime.now().strftime("%Y%m%d_%H%M%S"), "Timestamp")
    dbutils.widgets.text("endpoint_name", "databricks-gpt-oss-120b", "LLM Endpoint")
    dbutils.widgets.text("volume_path", "conor_smith.synthetic_data_app.synthetic_data_volume", "Volume Path")
    
    # Get parameter values
    table_name = dbutils.widgets.get("table_name")
    row_count = int(dbutils.widgets.get("row_count"))
    columns_json = dbutils.widgets.get("columns")
    company_name = dbutils.widgets.get("company_name")
    company_sector = dbutils.widgets.get("company_sector")
    timestamp = dbutils.widgets.get("timestamp")
    endpoint_name = dbutils.widgets.get("endpoint_name")
    volume_path = dbutils.widgets.get("volume_path")
    
    print("üéØ Job Parameters Retrieved:")
    print(f"   - Table name: {table_name}")
    print(f"   - Row count: {row_count}")
    print(f"   - Company: {company_name} ({company_sector})")
    print(f"   - Timestamp: {timestamp}")
    print(f"   - Endpoint: {endpoint_name}")
    print(f"   - Volume: {volume_path}")
    print(f"   - Columns JSON length: {len(columns_json)} characters")
    
except Exception as e:
    print(f"‚ùå Error getting parameters: {e}")
    # Fallback to default values
    table_name = "sample_table"
    row_count = 1000
    columns_json = "[]"
    company_name = "Sample Company"
    company_sector = "Technology"
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    endpoint_name = "databricks-gpt-oss-120b"
    volume_path = "conor_smith.synthetic_data_app.synthetic_data_volume"
    print("‚ö†Ô∏è  Using fallback default values")


In [None]:
# Cell 3: Parse and validate column configurations
try:
    columns = json.loads(columns_json)
    print(f"‚úÖ Parsed {len(columns)} column configurations")
    
    # Show column details
    for i, col in enumerate(columns):
        col_name = col.get('name', 'unnamed')
        col_type = col.get('data_type', 'Unknown')
        print(f"   {i+1}. {col_name} ({col_type})")
        
        # Show additional details for specific types
        if col_type == 'Integer':
            min_val = col.get('min_value', 'not set')
            max_val = col.get('max_value', 'not set')
            print(f"      ‚Üí Range: {min_val} to {max_val}")
        elif col_type == 'GenAI Text':
            prompt = col.get('prompt', 'not set')
            max_tokens = col.get('max_tokens', 'not set')
            print(f"      ‚Üí Prompt: {prompt[:50]}{'...' if len(prompt) > 50 else ''}")
            print(f"      ‚Üí Max tokens: {max_tokens}")
        elif col_type == 'Custom Values':
            values = col.get('custom_values', [])
            weights = col.get('use_weights', False)
            print(f"      ‚Üí Values: {values[:3]}{'...' if len(values) > 3 else ''}")
            print(f"      ‚Üí Weighted: {weights}")
    
    # Add sample columns if none provided
    if len(columns) == 0:
        print("‚ö†Ô∏è  No columns configured, adding sample columns for testing")
        columns = [
            {"name": "id", "data_type": "Integer", "min_value": 1, "max_value": 1000},
            {"name": "first_name", "data_type": "First Name"},
            {"name": "last_name", "data_type": "Last Name"},
            {"name": "bio", "data_type": "GenAI Text", "prompt": "Write a short professional bio for <first_name> <last_name>", "max_tokens": 100}
        ]
        print(f"   ‚Üí Added {len(columns)} sample columns")
        
except Exception as e:
    print(f"‚ùå Error parsing columns: {e}")
    print(f"   Raw columns_json: {columns_json}")
    # Use fallback columns
    columns = [
        {"name": "id", "data_type": "Integer", "min_value": 1, "max_value": 100},
        {"name": "name", "data_type": "First Name"}
    ]
    print(f"   ‚Üí Using {len(columns)} fallback columns")


In [None]:
# Cell 4: Initialize Spark and create DataGenerator
try:
    # Get Spark session
    spark = SparkSession.getActiveSession()
    if spark is None:
        spark = SparkSession.builder.appName("TabularDataGeneration").getOrCreate()
    
    print("‚ö° Spark session initialized")
    print(f"   - Spark version: {spark.version}")
    
    # Set partition parameters for optimal performance
    partitions_requested = min(8, max(1, row_count // 1000))  
    spark.conf.set("spark.sql.shuffle.partitions", str(partitions_requested))
    
    print(f"üîß Spark optimized for {row_count} rows ‚Üí {partitions_requested} partitions")
    
    # Create DataGenerator
    data_gen = DataGenerator(spark, rows=row_count, partitions=partitions_requested)
    print(f"üèóÔ∏è  DataGenerator created")
    
except Exception as e:
    print(f"‚ùå Error initializing Spark/DataGenerator: {e}")
    raise e


In [None]:
# Cell 5: Add columns to DataGenerator
genai_columns = []

for col_config in columns:
    col_name = col_config.get('name', 'unnamed_column')
    col_type = col_config.get('data_type', 'Integer')
    
    print(f"üìä Adding column '{col_name}' ({col_type})")
    
    try:
        if col_type == 'Integer':
            min_val = col_config.get('min_value', 1)
            max_val = col_config.get('max_value', 100)
            data_gen = data_gen.withColumn(col_name, "integer", minValue=min_val, maxValue=max_val)
            print(f"   ‚úÖ Integer: {min_val} to {max_val}")
            
        elif col_type == 'First Name':
            try:
                data_gen = data_gen.withColumn(col_name, text=fakerText("first_name"))
                print(f"   ‚úÖ First name with faker")
            except Exception:
                first_names = ["James", "Mary", "John", "Patricia", "Robert", "Jennifer", "Michael", "Linda", 
                              "William", "Elizabeth", "David", "Barbara", "Richard", "Susan", "Joseph", "Jessica"]
                data_gen = data_gen.withColumn(col_name, values=first_names)
                print(f"   ‚úÖ First name with predefined list ({len(first_names)} names)")
            
        elif col_type == 'Last Name':
            try:
                data_gen = data_gen.withColumn(col_name, text=fakerText("last_name"))
                print(f"   ‚úÖ Last name with faker")
            except Exception:
                last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis",
                             "Rodriguez", "Martinez", "Hernandez", "Lopez", "Gonzalez", "Wilson"]
                data_gen = data_gen.withColumn(col_name, values=last_names)
                print(f"   ‚úÖ Last name with predefined list ({len(last_names)} names)")
            
        elif col_type == 'GenAI Text':
            # Add placeholder, will process with ai_query later
            data_gen = data_gen.withColumn(col_name, "string", values=[""])
            genai_columns.append(col_config)
            print(f"   ‚úÖ GenAI placeholder (will use ai_query)")
            
        elif col_type == 'Custom Values':
            custom_values = col_config.get('custom_values', [''])
            use_weights = col_config.get('use_weights', False)
            custom_weights = col_config.get('custom_weights', [1])
            
            filtered_values = [v for v in custom_values if v.strip()]
            if not filtered_values:
                filtered_values = ['DefaultValue']
            
            if use_weights and len(custom_weights) >= len(filtered_values):
                filtered_weights = custom_weights[:len(filtered_values)]
                data_gen = data_gen.withColumn(col_name, values=filtered_values, weights=filtered_weights)
                print(f"   ‚úÖ Custom values with weights: {len(filtered_values)} values")
            else:
                data_gen = data_gen.withColumn(col_name, values=filtered_values)
                print(f"   ‚úÖ Custom values: {len(filtered_values)} values")
        
        else:
            print(f"   ‚ö†Ô∏è  Unknown column type '{col_type}', skipping")
            
    except Exception as col_error:
        print(f"   ‚ùå Error adding column '{col_name}': {col_error}")

print(f"\nüìã Summary: {len(columns)} total columns, {len(genai_columns)} GenAI columns")


In [None]:
# Cell 6: Generate initial DataFrame and process GenAI columns
# Build the initial DataFrame
print(f"üèóÔ∏è  Building DataFrame with {row_count} rows...")
df = data_gen.build()

print(f"‚úÖ DataFrame created: {df.count()} rows √ó {len(df.columns)} columns")
print(f"   - Columns: {df.columns}")

# Show sample data
print(f"\nüìä Sample Data (first 3 rows):")
df.show(3, truncate=False)

# Process GenAI Text columns with ai_query
if genai_columns:
    print(f"\nü§ñ Processing {len(genai_columns)} GenAI Text columns with ai_query")
    
    def substitute_column_references_spark(prompt_template, columns):
        """Create Spark SQL expression for column substitution."""
        import re
        column_refs = re.findall(r'<([^<>]+)>', prompt_template)
        
        if not column_refs:
            return f"'{prompt_template}'"
        
        # Build concat expression for dynamic prompt
        parts = []
        current_pos = 0
        
        for match in re.finditer(r'<([^<>]+)>', prompt_template):
            col_name = match.group(1)
            start_pos = match.start()
            end_pos = match.end()
            
            # Add text before column reference
            if start_pos > current_pos:
                literal_text = prompt_template[current_pos:start_pos]
                if literal_text:
                    parts.append(f"'{literal_text}'")
            
            # Add column reference
            valid_columns = [col.get('name', 'unnamed_column') for col in columns]
            if col_name in valid_columns:
                parts.append(f"coalesce(cast({col_name} as string), 'NULL')")
            else:
                parts.append(f"'<{col_name}>'")
            
            current_pos = end_pos
        
        # Add remaining text
        if current_pos < len(prompt_template):
            literal_text = prompt_template[current_pos:]
            if literal_text:
                parts.append(f"'{literal_text}'")
        
        return f"concat({', '.join(parts)})" if len(parts) > 1 else parts[0]
    
    # Process each GenAI column
    for col_config in genai_columns:
        col_name = col_config.get('name', 'unnamed_column')
        prompt_template = col_config.get('prompt', '')
        
        if prompt_template:
            print(f"\nüéØ Processing GenAI column '{col_name}'")
            print(f"   - Prompt: {prompt_template[:80]}{'...' if len(prompt_template) > 80 else ''}")
            
            try:
                # Enhanced prompt for table context
                enhanced_prompt = f"{prompt_template} Note: This will be text data in a table so omit all special formatting."
                
                # Create dynamic prompt with column substitution
                prompt_expression = substitute_column_references_spark(enhanced_prompt, columns)
                print(f"   - Spark expression created")
                
                # Execute ai_query
                print(f"   - Executing ai_query with endpoint: {endpoint_name}")
                df = df.withColumn(
                    col_name,
                    expr(f"ai_query(endpoint => '{endpoint_name}', request => {prompt_expression})")
                )
                
                print(f"   ‚úÖ ai_query completed for '{col_name}'")
                
                # Show sample generated text
                sample_rows = df.select(col_name).limit(2).collect()
                for i, row in enumerate(sample_rows):
                    text_sample = str(row[col_name])[:100] + ('...' if len(str(row[col_name])) > 100 else '')
                    print(f"   - Sample {i+1}: {text_sample}")
                
            except Exception as ai_error:
                print(f"   ‚ùå Error processing GenAI column '{col_name}': {ai_error}")
    
    print(f"\n‚úÖ All GenAI columns processed")
else:
    print(f"\n‚ÑπÔ∏è  No GenAI columns to process")


In [None]:
# Cell 7: Save to Volume and complete job
try:
    filename = f"{table_name}_{timestamp}.csv"
    print(f"üíæ Saving data to volume...")
    print(f"   - Filename: {filename}")
    print(f"   - Volume: {volume_path}")
    
    # Convert to Pandas for clean CSV creation
    print(f"üìä Converting to Pandas...")
    pandas_df = df.toPandas()
    print(f"   ‚úÖ Pandas DataFrame: {len(pandas_df)} rows √ó {len(pandas_df.columns)} columns")
    
    # Write to temporary location
    temp_path = f"/tmp/{filename}"
    pandas_df.to_csv(temp_path, index=False)
    print(f"   ‚úÖ Temporary file created: {temp_path}")
    
    # Copy to volume
    volume_file_path = f"/Volumes/{volume_path}/{filename}"
    print(f"üì§ Copying to volume: {volume_file_path}")
    
    try:
        dbutils.fs.cp(f"file://{temp_path}", volume_file_path)
        print(f"   ‚úÖ Successfully saved to volume!")
        
        # Verify file
        try:
            file_info = dbutils.fs.ls(volume_file_path)
            file_size = file_info[0].size if file_info else 0
            print(f"   üìã File size: {file_size:,} bytes")
        except:
            print(f"   ‚ö†Ô∏è  Could not verify file size")
        
        # Clean up temp file
        dbutils.fs.rm(f"file://{temp_path}")
        print(f"   üßπ Cleaned up temporary file")
        
    except Exception as volume_error:
        print(f"   ‚ùå Error copying to volume: {volume_error}")
        print(f"   üìÅ File remains at: {temp_path}")
        volume_file_path = temp_path
    
    # Final summary
    print(f"\nüéâ Job Completed Successfully!")
    print(f"üìã Final Summary:")
    print(f"   - Table: {table_name}")
    print(f"   - Rows: {df.count():,}")
    print(f"   - Columns: {len(df.columns)}")
    print(f"   - GenAI columns: {len(genai_columns)}")
    print(f"   - Company: {company_name} ({company_sector})")
    print(f"   - File: {volume_file_path}")
    print(f"   - Completed: {datetime.now()}")
    
    # Show final sample
    print(f"\nüìä Final Data Sample:")
    df.show(5, truncate=False)
    
except Exception as save_error:
    print(f"‚ùå Error during save: {save_error}")
    print(f"   Job may have succeeded but file save failed")
    raise save_error
