### Use on BASF Workspace or Databricks managed Workspace

In [0]:
%pip install faker
dbutils.library.restartPython()

In [0]:
is_basf_workspace = True
schema = 'chem_manufacturing'

In [0]:
from databricks.sdk import WorkspaceClient

ws = WorkspaceClient()
current_user = ws.current_user.me().user_name

if is_basf_workspace == True:
  first_name = current_user.split('@')[0]
  formatted_name = first_name

else:
  first_name, last_name = current_user.split('@')[0].split('.')
  formatted_name = f"{first_name[0]}_{last_name}"

catalog = f'dbdemos_{formatted_name}'
print(f"Catalog name: {catalog}")



In [0]:

if is_basf_workspace == True:
    create_catalog_sql = f"CREATE CATALOG IF NOT EXISTS {catalog} MANAGED LOCATION 'abfss://lab@edlxopsml.dfs.core.windows.net/xops-ml'"
    print(create_catalog_sql)

else:
    create_catalog_sql = f"CREATE CATALOG IF NOT EXISTS {catalog}"
    print(create_catalog_sql)

def setup_catalog_and_schema(catalog, schema):
    spark.sql(create_catalog_sql) if 'MANAGED LOCATION' in create_catalog_sql else spark.sql(f'CREATE CATALOG IF NOT EXISTS {catalog}')
    spark.sql(f"USE CATALOG {catalog}")
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
    spark.sql(f"USE SCHEMA {schema}")

setup_catalog_and_schema(catalog, schema)

In [0]:
# Databricks notebook for generating synthetic chemistry manufacturing data
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
import random
from faker import Faker

# Initialize faker for generating realistic text
fake = Faker()
random.seed(42)
np.random.seed(42)

# Helper functions
def random_date(start_date, end_date):
    """Generate a random date between start_date and end_date"""
    time_delta = end_date - start_date
    days_delta = time_delta.days
    random_days = random.randint(0, days_delta)
    return start_date + timedelta(days=random_days)

def generate_chemical_formula():
    """Generate a plausible chemical formula"""
    elements = ["C", "H", "O", "N", "Cl", "S", "P", "F", "Si", "Al", "Ti", "Zn", "Cu", "Mg", "K", "Na", "Fe"]
    formula_parts = []
    
    # Add 2-5 elements with random counts
    num_elements = random.randint(2, 5)
    for _ in range(num_elements):
        element = random.choice(elements)
        count = random.randint(1, 12)
        if count == 1:
            formula_parts.append(element)
        else:
            formula_parts.append(f"{element}{count}")
    
    return "".join(formula_parts)

def generate_product_description(product_name, category, formula, applications):
    """Generate a detailed product description"""
    templates = [
        f"{product_name} is a high-quality {category.lower()} with the chemical formula {formula}. It is primarily used for {applications}. This product offers exceptional stability and reactivity for industrial applications.",
        f"Our premium {product_name} ({formula}) is a specialized {category.lower()} designed for {applications}. It provides consistent results in both laboratory and industrial settings.",
        f"{product_name} is an advanced {category.lower()} compound ({formula}) that delivers superior performance in {applications}. It's manufactured using our proprietary process to ensure the highest quality standards.",
        f"A versatile {category.lower()} with formula {formula}, {product_name} is ideal for {applications}. It features excellent thermal stability and batch-to-batch consistency.",
        f"{product_name} ({formula}) is our flagship {category.lower()} product developed specifically for {applications}. Its unique properties make it the preferred choice for demanding chemical processes."
    ]
    
    base_description = random.choice(templates)
    
    # Add some technical details
    properties = [
        "exhibits excellent thermal stability",
        "features high selectivity",
        "provides exceptional yield improvement",
        "demonstrates superior chemical resistance",
        "offers enhanced reactivity",
        "maintains stability under extreme conditions",
        "shows minimal side reactions",
        "enables accelerated reaction rates",
        "ensures consistent quality across batches",
        "reduces energy requirements in processing"
    ]
    
    additional_info = f" The product {random.choice(properties)} and {random.choice(properties).replace('exhibits ', '').replace('features ', '').replace('provides ', '').replace('demonstrates ', '').replace('offers ', '')}."
    
    return base_description + additional_info

def generate_process_description(product_name, category):
    """Generate a manufacturing process description"""
    process_types = {
        "Catalyst": ["continuous flow", "batch precipitation", "sol-gel", "impregnation", "coprecipitation"],
        "Polymer": ["emulsion polymerization", "solution polymerization", "suspension polymerization", "bulk polymerization", "interfacial polymerization"],
        "Solvent": ["continuous distillation", "extractive distillation", "azeotropic distillation", "reactive distillation", "batch purification"]
    }
    
    process = random.choice(process_types.get(category, ["chemical synthesis", "continuous production", "batch reaction"]))
    
    templates = [
        f"The manufacturing process for {product_name} involves a carefully controlled {process} procedure. The process begins with purified raw materials that undergo reaction under precise temperature and pressure conditions.",
        f"{product_name} is produced using an optimized {process} technique. The production requires strict monitoring of reaction parameters to achieve the desired product specifications.",
        f"Our proprietary {process} method for manufacturing {product_name} ensures consistent quality and high yield. The process involves multiple stages of reaction and purification.",
        f"Production of {product_name} follows a standardized {process} protocol. Critical process parameters are continuously monitored to maintain product consistency.",
        f"The synthesis of {product_name} utilizes advanced {process} technology. Each batch undergoes rigorous in-process testing to verify conformance to specifications."
    ]
    
    return random.choice(templates)

def generate_research_note(product_name, formula):
    """Generate a research note about improving the product"""
    improvement_areas = ["yield optimization", "catalyst efficiency", "reaction selectivity", "energy efficiency", "impurity reduction", "stability enhancement", "scale-up challenges", "alternative synthesis route", "green chemistry approaches", "cost reduction"]
    area = random.choice(improvement_areas)
    
    templates = [
        f"Research Note: {area.title()} for {product_name}\n\nOur team is investigating methods to improve the {area} of {product_name} ({formula}). Preliminary experiments suggest that modifying reaction temperature and residence time could lead to significant improvements.",
        f"Project Summary: {area.title()} in {product_name} Production\n\nThis research aims to address {area} in the manufacturing of {product_name}. We've identified several key variables that affect product quality and are designing experiments to optimize these parameters.",
        f"Laboratory Study: {area.title()} for {formula}\n\nA systematic study of {area} for {product_name} is underway. Initial findings show promise in adjusting catalyst loading and reaction pH to achieve better results.",
        f"Process Development: Enhancing {area.title()} of {product_name}\n\nOur development work focuses on enhancing {area} for {product_name}. Recent trials with modified process conditions have shown a 15% improvement in target metrics.",
        f"R&D Initiative: {area.title()} Study for {formula}\n\nThis ongoing study examines various approaches to {area} for {product_name}. Computational modeling combined with experimental verification is being used to identify optimal process conditions."
    ]
    
    return random.choice(templates)

# 1. Generate Products Table
def create_products_table(num_products=50):
    """Generate synthetic products data"""
    categories = ["Catalyst", "Polymer", "Solvent", "Additive", "Intermediate"]
    application_areas = [
        "pharmaceutical synthesis", "petrochemical processing", "fine chemicals production",
        "polymer manufacturing", "food processing", "electronic materials", "agricultural chemicals",
        "water treatment", "cosmetics production", "adhesives and sealants", "textile processing",
        "metal treatment", "paint and coatings", "paper manufacturing", "fuel production"
    ]
    
    storage_conditions = [
        "Store at room temperature in a dry place",
        "Keep in tightly closed container in a cool, well-ventilated area",
        "Store under inert gas atmosphere below 30°C",
        "Refrigerate at 2-8°C",
        "Store away from light and moisture at 15-25°C",
        "Keep frozen below -18°C",
        "Store in a flammables cabinet below 25°C"
    ]
    
    products = []
    
    for i in range(1, num_products + 1):
        product_id = f"P{i:04d}"
        category = random.choice(categories)
        
        if category == "Catalyst":
            name_prefix = random.choice(["Cat", "Cata", "Catalex", "ReactCat", "SynthCat", "ProCat", "MetalCat"])
            name_suffix = random.choice(["-", " "]) + random.choice(["A", "B", "C", "X", "Z", "Pro", "Ultra", "Max"]) + str(random.randint(10, 999))
            product_name = name_prefix + name_suffix
        elif category == "Polymer":
            name_prefix = random.choice(["Poly", "Polyflex", "DuraPol", "ElastoPol", "ThermoFlex", "PolyForm", "SynthPol"])
            name_suffix = random.choice(["-", " "]) + random.choice(["A", "B", "C", "X", "Z", "Pro", "Ultra", "Max"]) + str(random.randint(10, 999))
            product_name = name_prefix + name_suffix
        elif category == "Solvent":
            name_prefix = random.choice(["Sol", "SolPure", "CleanSol", "EcoSolv", "PureSolv", "SolTech", "SynthaSolv"])
            name_suffix = random.choice(["-", " "]) + random.choice(["A", "B", "C", "X", "Z", "Pro", "Ultra", "Max"]) + str(random.randint(10, 999))
            product_name = name_prefix + name_suffix
        else:
            name_prefix = random.choice(["Chem", "SynthChem", "ProChem", "ReactChem", "ChemPure", "ChemSynth", "ChemTech"])
            name_suffix = random.choice(["-", " "]) + random.choice(["A", "B", "C", "X", "Z", "Pro", "Ultra", "Max"]) + str(random.randint(10, 999))
            product_name = name_prefix + name_suffix
        
        chemical_formula = generate_chemical_formula()
        molecular_weight = round(random.uniform(100, 500), 2)
        density = round(random.uniform(0.8, 2.5), 3)
        melting_point = round(random.uniform(-50, 250), 1)
        boiling_point = melting_point + round(random.uniform(50, 300), 1)
        
        # Generate random applications (2-4 applications)
        num_applications = random.randint(2, 4)
        product_applications = ", ".join(random.sample(application_areas, num_applications)) 
        description = generate_product_description(product_name, category, chemical_formula, product_applications)
        
        # Concat descriptions for similarity search
        storage = random.choice(storage_conditions)
        full_description = description + " " + product_applications + " " + storage

        # Generate random creation date in the last 5 years
        creation_date = random_date(datetime.now() - timedelta(days=5*365), datetime.now() - timedelta(days=30))
        
        price_per_unit = round(random.uniform(50, 5000), 2)
        
        products.append({
            "product_id": product_id,
            "product_name": product_name,
            "category": category,
            "chemical_formula": chemical_formula,
            "molecular_weight": molecular_weight,
            "density": density,
            "melting_point": melting_point,
            "boiling_point": boiling_point,
            "description": description,
            "application_areas": product_applications,
            "storage_conditions": storage,
            "full_description": full_description,
            "creation_date": creation_date.strftime("%Y-%m-%d"),
            "price_per_unit": price_per_unit
        })

    return pd.DataFrame(products)

# 2. Generate Batches Table
def create_batches_table(products_df, num_batches=200):
    """Generate synthetic batch production data"""
    status_options = ["Completed", "In Progress", "Failed", "On Hold", "Pending QC"]
    status_weights = [0.7, 0.1, 0.05, 0.05, 0.1]  # Weighted probabilities
    
    batches = []
    
    for i in range(1, num_batches + 1):
        batch_id = f"B{i:06d}"
        
        # Randomly select a product
        product_row = products_df.iloc[random.randint(0, len(products_df) - 1)]
        product_id = product_row["product_id"]
        
        # Generate batch size (kg)
        if product_row["category"] == "Catalyst":
            batch_size = round(random.uniform(10, 100), 1)  # Smaller batches for catalysts
        elif product_row["category"] == "Polymer":
            batch_size = round(random.uniform(100, 1000), 1)  # Larger batches for polymers
        else:
            batch_size = round(random.uniform(50, 500), 1)
        
        # Generate manufacture date in the last 2 years
        manufacture_date = random_date(datetime.now() - timedelta(days=2*365), datetime.now())
        
        # Expiration date is typically 1-3 years after manufacture
        shelf_life_days = random.randint(365, 3*365)
        expiration_date = manufacture_date + timedelta(days=shelf_life_days)
        
        reactor_id = f"R{random.randint(1, 10):02d}"
        operator_id = f"OP{random.randint(1, 20):03d}"
        
        # Calculate production cost based on batch size and random factors
        base_cost_per_kg = product_row["price_per_unit"] * 0.4  # 40% of selling price
        cost_variation = random.uniform(0.85, 1.15)  # +/- 15% variation
        production_cost = round(batch_size * base_cost_per_kg * cost_variation, 2)
        
        # Generate yield percentage
        status = np.random.choice(status_options, p=status_weights)
        if status == "Failed":
            yield_percentage = round(random.uniform(0, 60), 1)
        elif status == "Completed":
            yield_percentage = round(random.uniform(75, 98), 1)
        else:
            yield_percentage = round(random.uniform(60, 95), 1)
        
        batches.append({
            "batch_id": batch_id,
            "product_id": product_id,
            "batch_size": batch_size,
            "manufacture_date": manufacture_date.strftime("%Y-%m-%d"),
            "expiration_date": expiration_date.strftime("%Y-%m-%d"),
            "reactor_id": reactor_id,
            "operator_id": operator_id,
            "production_cost": production_cost,
            "yield_percentage": yield_percentage,
            "status": status
        })
    
    return pd.DataFrame(batches)

# 3. Generate Quality Control Table
def create_qc_table(batches_df, num_tests=800):
    """Generate synthetic quality control data"""
    test_types = [
        "Purity", "pH", "Viscosity", "Density", "Color", "Particle Size", 
        "Water Content", "Solubility", "Melting Point", "Boiling Point", 
        "Flash Point", "Refractive Index", "Molecular Weight", "Activity"
    ]
    
    tests = []
    
    # For each batch, generate multiple tests
    batch_ids = batches_df["batch_id"].tolist()
    
    for i in range(1, num_tests + 1):
        test_id = f"QC{i:06d}"
        
        # Randomly select a batch
        batch_id = random.choice(batch_ids)
        batch_row = batches_df[batches_df["batch_id"] == batch_id].iloc[0]
        
        # Test date should be on or after manufacture date
        manufacture_date = datetime.strptime(batch_row["manufacture_date"], "%Y-%m-%d")
        if batch_row["status"] == "Completed" or batch_row["status"] == "Failed":
            # For completed batches, test date is between manufacture date and now
            test_date = random_date(manufacture_date, datetime.now())
        else:
            # For in-progress batches, test date is between manufacture date and up to 14 days after
            test_end_date = min(datetime.now(), manufacture_date + timedelta(days=14))
            test_date = random_date(manufacture_date, test_end_date)
        
        # Randomly select a test type
        test_type = random.choice(test_types)
        
        # Generate test parameters and acceptable ranges based on test type
        if test_type == "Purity":
            test_parameter = "% Purity"
            acceptable_range_min = 98.0
            acceptable_range_max = 100.0
            test_value = round(random.uniform(96.5, 100.0), 2)
        elif test_type == "pH":
            test_parameter = "pH Value"
            acceptable_range_min = 6.5
            acceptable_range_max = 8.5
            test_value = round(random.uniform(6.0, 9.0), 1)
        elif test_type == "Viscosity":
            test_parameter = "Viscosity (cP)"
            acceptable_range_min = 400
            acceptable_range_max = 600
            test_value = round(random.uniform(350, 650), 0)
        elif test_type == "Density":
            test_parameter = "Density (g/mL)"
            acceptable_range_min = 0.90
            acceptable_range_max = 1.10
            test_value = round(random.uniform(0.85, 1.15), 3)
        elif test_type == "Color":
            test_parameter = "Color (APHA)"
            acceptable_range_min = 0
            acceptable_range_max = 50
            test_value = round(random.uniform(0, 70), 0)
        elif test_type == "Water Content":
            test_parameter = "Water Content (%)"
            acceptable_range_min = 0.0
            acceptable_range_max = 0.5
            test_value = round(random.uniform(0.0, 0.8), 2)
        else:
            # Generic test parameters for other test types
            test_parameter = f"{test_type} Value"
            mid_value = random.uniform(50, 500)
            acceptable_range_min = round(mid_value * 0.9, 1)
            acceptable_range_max = round(mid_value * 1.1, 1)
            test_value = round(random.uniform(mid_value * 0.85, mid_value * 1.15), 1)
        
        # Determine test result
        if acceptable_range_min <= test_value <= acceptable_range_max:
            test_result = "Pass"
        else:
            test_result = "Fail"
        
        # Generate analyst ID
        analyst_id = f"AN{random.randint(1, 15):03d}"
        
        # Generate notes, especially for failed tests
        if test_result == "Fail":
            notes_options = [
                f"Value outside acceptable range. Retesting recommended.",
                f"Investigating cause of deviation in {test_parameter}.",
                f"Possible contamination affecting {test_type} result.",
                f"Test to be repeated after calibration of equipment.",
                f"Deviation may be due to sampling error."
            ]
            notes = random.choice(notes_options)
        else:
            notes = "" if random.random() < 0.7 else "Standard test procedure followed."
        
        tests.append({
            "test_id": test_id,
            "batch_id": batch_id,
            "test_date": test_date.strftime("%Y-%m-%d"),
            "test_type": test_type,
            "test_parameter": test_parameter,
            "test_value": test_value,
            "acceptable_range_min": acceptable_range_min,
            "acceptable_range_max": acceptable_range_max,
            "test_result": test_result,
            "analyst_id": analyst_id,
            "notes": notes
        })
    
    return pd.DataFrame(tests)

# 4. Generate Inventory Table
def create_inventory_table(products_df, num_inventory=100):
    """Generate synthetic inventory data"""
    item_types = ["Raw Material", "Finished Product"]
    locations = ["Main Warehouse", "Production Floor", "R&D Lab", "Cold Storage", "External Warehouse"]
    
    inventory = []
    
    # First, add raw materials
    raw_materials = [
        "Acetic Acid", "Acetone", "Benzene", "Chloroform", "Ethanol", "Hexane", "Methanol", 
        "Toluene", "Sulfuric Acid", "Nitric Acid", "Sodium Hydroxide", "Hydrogen Peroxide",
        "Phosphoric Acid", "Ammonia", "Carbon Tetrachloride", "Diethyl Ether", "Formaldehyde",
        "Glycerol", "Isopropyl Alcohol", "Potassium Hydroxide"
    ]
    
    for i in range(1, num_inventory//2 + 1):
        inventory_id = f"INV{i:06d}"
        
        material_name = random.choice(raw_materials)
        quantity = round(random.uniform(100, 10000), 1)
        unit = random.choice(["kg", "L"])
        location = random.choice(locations)
        last_updated = random_date(datetime.now() - timedelta(days=30), datetime.now()).strftime("%Y-%m-%d")
        reorder_level = round(quantity * 0.2, 1)  # 20% of current quantity
        supplier_id = f"SUP{random.randint(1, 10):03d}"
        
        inventory.append({
            "inventory_id": inventory_id,
            "item_type": "Raw Material",
            "item_id": f"RM{i:04d}",
            "item_name": material_name,
            "quantity": quantity,
            "unit": unit,
            "location": location,
            "last_updated": last_updated,
            "reorder_level": reorder_level,
            "supplier_id": supplier_id
        })
    
    # Then, add finished products
    for i in range(num_inventory//2 + 1, num_inventory + 1):
        inventory_id = f"INV{i:06d}"
        
        # Randomly select a product
        product_row = products_df.iloc[random.randint(0, len(products_df) - 1)]
        product_id = product_row["product_id"]
        product_name = product_row["product_name"]
        
        if product_row["category"] == "Catalyst":
            quantity = round(random.uniform(50, 500), 1)
        elif product_row["category"] == "Polymer":
            quantity = round(random.uniform(500, 5000), 1)
        else:
            quantity = round(random.uniform(200, 2000), 1)
        
        unit = "kg"
        location = random.choice(locations)
        last_updated = random_date(datetime.now() - timedelta(days=30), datetime.now()).strftime("%Y-%m-%d")
        reorder_level = round(quantity * 0.15, 1)  # 15% of current quantity
        
        inventory.append({
            "inventory_id": inventory_id,
            "item_type": "Finished Product",
            "item_id": product_id,
            "item_name": product_name,
            "quantity": quantity,
            "unit": unit,
            "location": location,
            "last_updated": last_updated,
            "reorder_level": reorder_level,
            "supplier_id": None
        })
    
    return pd.DataFrame(inventory)

# 5. Generate Reactions Table
def create_reactions_table(products_df, num_reactions=80):
    """Generate synthetic chemical reaction data"""
    reaction_types = ["Addition", "Elimination", "Substitution", "Polymerization", "Reduction", "Oxidation", "Condensation", "Hydrolysis"]
    catalysts = ["Palladium on Carbon", "Raney Nickel", "Platinum Oxide", "Aluminum Chloride", "Sulfuric Acid", "Sodium Methoxide", "Titanium Tetrachloride", "Zinc Chloride", "None"]
    solvents = ["Water", "Ethanol", "Methanol", "Acetone", "Tetrahydrofuran", "Dichloromethane", "Toluene", "Dimethylformamide", "Dimethyl Sulfoxide", "None"]
    
    reactions = []
    
    for i in range(1, num_reactions + 1):
        reaction_id = f"R{i:04d}"
        
        # Define product associated with this reaction
        product_row = products_df.iloc[random.randint(0, len(products_df) - 1)]
        product_id = product_row["product_id"]
        product_name = product_row["product_name"]
        
        # Create reaction name based on product
        reaction_name = f"{product_name} Synthesis"
        
        reaction_type = random.choice(reaction_types)
        
        # Generate reactants (2-4 reactants)
        num_reactants = random.randint(2, 4)
        reactants = []
        for j in range(num_reactants):
            reactant_name = random.choice(["Acetic Acid", "Acetone", "Benzene", "Chloroform", "Ethanol", "Hexane", "Methanol", "Toluene", "Sulfuric Acid", "Nitric Acid", "Sodium Hydroxide", "Hydrogen Peroxide", "Phosphoric Acid", "Ammonia", "Carbon Tetrachloride", "Diethyl Ether", "Formaldehyde", "Glycerol", "Isopropyl Alcohol", "Potassium Hydroxide"])
            quantity = round(random.uniform(1, 10), 2)
            unit = random.choice(["kg", "L", "mol"])
            reactants.append({"name": reactant_name, "quantity": quantity, "unit": unit})
        
        # Generate products (1-3 products, main product + side products)
        products = []
        # Main product
        main_product_quantity = round(random.uniform(1, 8), 2)
        products.append({"name": product_name, "quantity": main_product_quantity, "unit": "kg", "is_main_product": True})
        
        # Side products (0-2)
        num_side_products = random.randint(0, 2)
        for j in range(num_side_products):
            side_product_name = f"Side Product {j+1}"
            side_product_quantity = round(random.uniform(0.1, 1), 2)
            products.append({"name": side_product_name, "quantity": side_product_quantity, "unit": "kg", "is_main_product": False})
        
        catalyst = random.choice(catalysts)
        solvent = random.choice(solvents)
        
        temperature = round(random.uniform(0, 200), 1)
        pressure = round(random.uniform(1, 10), 1)
        reaction_time = round(random.uniform(1, 24), 1)
        energy_consumption = round(random.uniform(5, 50), 1)
        
        # Generate description
        if reaction_type == "Polymerization" and "Poly" in product_name:
            description = f"A {reaction_type.lower()} reaction to produce {product_name} from monomers. The reaction is carried out at {temperature}°C for {reaction_time} hours."
        else:
            description = f"A {reaction_type.lower()} reaction to synthesize {product_name}. The reaction proceeds via a {random.choice(['one-step', 'two-step', 'multi-step'])} process under controlled conditions."
        
        # Generate hazards
        hazard_options = ["Flammable", "Corrosive", "Toxic", "Oxidizing", "Explosive", "Harmful to environment"]
        num_hazards = random.randint(0, 3)
        hazards = ", ".join(random.sample(hazard_options, num_hazards)) if num_hazards > 0 else "Low hazard"
        
        notes = "Standard operating procedure should be followed." if random.random() < 0.7 else fake.sentence()
        
        reactions.append({
            "reaction_id": reaction_id,
            "reaction_name": reaction_name,
            "product_id": product_id,
            "reaction_type": reaction_type,
            "reactants_json": json.dumps(reactants),
            "products_json": json.dumps(products),
            "catalyst": catalyst,
            "solvent": solvent,
            "temperature": temperature,
            "pressure": pressure,
            "reaction_time": reaction_time,
            "energy_consumption": energy_consumption,
            "description": description,
            "hazards": hazards,
            "notes": notes
        })
    
    return pd.DataFrame(reactions)

# 6. Generate Text Descriptions for Similarity Search
def create_descriptions_table(products_df, num_descriptions=300):
    """Generate text descriptions for products, processes, and research notes"""
    descriptions = []
    
    # Process descriptions
    for _, product in products_df.iterrows():
        process_description = generate_process_description(product["product_name"], product["category"])
        descriptions.append({
            "description_id": f"PD{len(descriptions)+1:04d}",
            "description_type": "Process",
            "product_id": product["product_id"],
            "title": f"Manufacturing Process for {product['product_name']}",
            "content": process_description,
            "created_date": random_date(datetime.now() - timedelta(days=3*365), datetime.now()).strftime("%Y-%m-%d")
        })
        
        # Add research notes (not for all products)
        if random.random() < 0.6:  # 60% chance of having research notes
            research_note = generate_research_note(product["product_name"], product["chemical_formula"])
            descriptions.append({
                "description_id": f"PD{len(descriptions)+1:04d}",
                "description_type": "Research",
                "product_id": product["product_id"],
                "title": f"Research Notes on {product['product_name']}",
                "content": research_note,
                "created_date": random_date(datetime.now() - timedelta(days=2*365), datetime.now()).strftime("%Y-%m-%d")
            })
    
    # Safety protocols
    safety_protocol_templates = [
        "Safety Protocol: Handling of {product_name}\n\nThis protocol outlines the safe handling procedures for {product_name}, which {hazard_description}. Always wear appropriate PPE including {ppe_list}. Store in {storage_conditions}. In case of spill, {spill_procedure}.",
        "Material Safety Guidelines: {product_name}\n\nSafety precautions for working with {product_name}: This material {hazard_description}. Required PPE: {ppe_list}. Emergency procedures: {emergency_procedures}.",
        "Safe Handling Instructions: {product_name}\n\n{product_name} requires careful handling due to its {hazard_description}. Always work in a {ventilation_requirement} area and use {ppe_list}. Dispose of waste according to local regulations for chemical waste."
    ]
    
    hazard_descriptions = [
        "is highly flammable and may cause skin irritation",
        "is corrosive and may cause severe burns",
        "may release toxic fumes when heated",
        "is harmful if swallowed or inhaled",
        "is an oxidizing agent and may intensify fire",
        "presents low hazard under normal handling conditions",
        "may cause allergic skin reaction or eye irritation",
        "is harmful to aquatic life with long-lasting effects"
    ]
    
    ppe_lists = [
        "safety glasses, chemical-resistant gloves, and lab coat",
        "face shield, chemically resistant gloves, and protective clothing",
        "safety goggles, nitrile gloves, and standard lab attire",
        "full face respirator, butyl rubber gloves, and chemical splash suit",
        "safety glasses with side shields, latex gloves, and lab coat"
    ]
    
    spill_procedures = [
        "absorb with inert material and dispose as hazardous waste",
        "use appropriate absorbent and store in closed container for disposal",
        "neutralize with sodium bicarbonate and collect for proper disposal",
        "contain spill and collect in suitable container, avoid dust formation",
        "ventilate area and wash site of spillage thoroughly with water"
    ]
    
    ventilation_requirements = [
        "well-ventilated", "fume hood", "adequate ventilation", "local exhaust ventilation", "closed system"
    ]
    
    emergency_procedures = [
        "In case of eye contact, rinse cautiously with water for at least 15 minutes. Remove contact lenses if present and easy to do.",
        "If inhaled, remove person to fresh air and keep comfortable for breathing. Call poison center if you feel unwell.",
        "In case of skin contact, wash with plenty of soap and water. If skin irritation occurs, get medical advice.",
        "If swallowed, rinse mouth with water. Do NOT induce vomiting. Seek immediate medical attention.",
        "In case of fire, use dry chemical, CO2, water spray, or foam for extinction."
    ]
    
    # Add some safety protocols
    for i in range(20):
        product_row = products_df.iloc[random.randint(0, len(products_df) - 1)]
        
        template = random.choice(safety_protocol_templates)
        safety_content = template.format(
            product_name=product_row["product_name"],
            hazard_description=random.choice(hazard_descriptions),
            ppe_list=random.choice(ppe_lists),
            storage_conditions=product_row["storage_conditions"].lower(),
            spill_procedure=random.choice(spill_procedures),
            ventilation_requirement=random.choice(ventilation_requirements),
            emergency_procedures=random.choice(emergency_procedures)
        )
        
        descriptions.append({
            "description_id": f"PD{len(descriptions)+1:04d}",
            "description_type": "Safety",
            "product_id": product_row["product_id"],
            "title": f"Safety Protocol for {product_row['product_name']}",
            "content": safety_content,
            "created_date": random_date(datetime.now() - timedelta(days=2*365), datetime.now()).strftime("%Y-%m-%d")
        })
    
    # Quality control procedures
    qc_procedure_templates = [
        "Quality Control Procedure: {product_name}\n\nThis document outlines the quality control testing for {product_name}. Required tests include {test_list}. Each batch must meet specifications before release. Sampling plan: {sampling_plan}. Acceptance criteria: {acceptance_criteria}.",
        "QC Protocol: Testing of {product_name}\n\nStandard quality control procedures for {product_name}. Test methods: {test_list}. Sampling frequency: {sampling_frequency}. Documentation requirements: {documentation}.",
        "Batch Release Testing: {product_name}\n\nThe following tests must be performed on each batch of {product_name}: {test_list}. Results must be within specification limits for batch release. Out-of-specification results require investigation according to SOP-QC-023."
    ]
    
    test_lists = [
        "purity by HPLC, water content by Karl Fischer, and appearance",
        "assay by titration, pH, viscosity, and density",
        "identity by IR, related substances by GC, residual solvents, and physical properties",
        "particle size distribution, bulk density, and chemical composition",
        "melting point, color, solubility, and chemical purity"
    ]
    
    sampling_plans = [
        "Sample 3 containers per batch according to statistical sampling plan",
        "Take samples from top, middle, and bottom of reactor vessel",
        "Collect composite sample from each batch according to SOP-QC-015",
        "Sample according to √n+1 rule, where n is the number of containers",
        "100% sampling for critical parameters, reduced sampling for non-critical attributes"
    ]
    
    acceptance_criteria = [
        "All results must meet specification. Any deviations require QA approval.",
        "Results within specification limits. Borderline results require additional testing.",
        "Conformance to all specification parameters as listed in product specification document.",
        "All critical parameters must meet specifications. Minor parameters allow for ±5% tolerance.",
        "Full compliance with registered specifications. No OOS results permitted."
    ]
    
    sampling_frequencies = [
        "Every batch", "First and last batch of campaign", "According to skip-lot testing protocol",
        "25% of batches, but minimum one batch per month", "Based on control chart performance"
    ]
    
    documentation_requirements = [
        "Results to be recorded in LIMS and batch record",
        "Electronic and paper documentation with dual signature verification",
        "Full audit trail with electronic signatures in compliance system",
        "Raw data, calculations, and final results to be reviewed by QC supervisor",
        "Test worksheets must be completed for each analysis with all raw data attached"
    ]
    
    # Add some QC procedures
    for i in range(20):
        product_row = products_df.iloc[random.randint(0, len(products_df) - 1)]
        
        template = random.choice(qc_procedure_templates)
        qc_content = template.format(
            product_name=product_row["product_name"],
            test_list=random.choice(test_lists),
            sampling_plan=random.choice(sampling_plans),
            acceptance_criteria=random.choice(acceptance_criteria),
            sampling_frequency=random.choice(sampling_frequencies),
            documentation=random.choice(documentation_requirements)
        )
        
        descriptions.append({
            "description_id": f"PD{len(descriptions)+1:04d}",
            "description_type": "QC",
            "product_id": product_row["product_id"],
            "title": f"Quality Control Procedure for {product_row['product_name']}",
            "content": qc_content,
            "created_date": random_date(datetime.now() - timedelta(days=365), datetime.now()).strftime("%Y-%m-%d")
        })
    
    return pd.DataFrame(descriptions)

# Main function to generate and save all datasets
def generate_all_data():
    """Generate all tables and save to CSV files"""
    # 1. Generate Products table
    products_df = create_products_table(num_products=50)
    
    # 2. Generate Batches table
    batches_df = create_batches_table(products_df, num_batches=200)
    
    # 3. Generate Quality Control table
    qc_df = create_qc_table(batches_df, num_tests=800)
    
    # 4. Generate Inventory table
    inventory_df = create_inventory_table(products_df, num_inventory=100)
    
    # 5. Generate Reactions table
    reactions_df = create_reactions_table(products_df, num_reactions=80)
    
    # 6. Generate Descriptions table
    descriptions_df = create_descriptions_table(products_df, num_descriptions=300)

    # Convert pandas DataFrames to Spark DataFrames
    spark_products_df = spark.createDataFrame(products_df)
    spark_batches_df = spark.createDataFrame(batches_df)
    spark_qc_df = spark.createDataFrame(qc_df)
    spark_inventory_df = spark.createDataFrame(inventory_df)
    spark_reactions_df = spark.createDataFrame(reactions_df)
    spark_descriptions_df = spark.createDataFrame(descriptions_df)
    
    # Save all dataframes to Delta tables
    spark_products_df.write.format("delta").mode("overwrite").saveAsTable(f"{schema}.products")
    spark_batches_df.write.format("delta").mode("overwrite").saveAsTable(f"{schema}.batches")
    spark_qc_df.write.format("delta").mode("overwrite").saveAsTable(f"{schema}.quality_control")
    spark_inventory_df.write.format("delta").mode("overwrite").saveAsTable(f"{schema}.inventory")
    spark_reactions_df.write.format("delta").mode("overwrite").saveAsTable(f"{schema}.reactions")
    spark_descriptions_df.write.format("delta").mode("overwrite").saveAsTable(f"{schema}.descriptions")

    
    return {
        "products": products_df,
        "batches": batches_df,
        "quality_control": qc_df,
        "inventory": inventory_df,
        "reactions": reactions_df,
        "descriptions": descriptions_df
    }

# Call the main function
data_dict = generate_all_data()

# Display sample from each table
for table_name, df in data_dict.items():
    print(f"\n=== Sample of {table_name} table ===")
    display(df.head(3))

print("All data generated successfully!")


## Enable change data feed
For tables that we want to use with vector index we need to enable change data feed.

In [0]:
spark.sql(f"ALTER TABLE {catalog}.{schema}.products SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
spark.sql(f"ALTER TABLE {catalog}.{schema}.descriptions SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")