In [1]:
# azure-data-lake-sentinel/src/data_processing_scripts/synapse_notebooks/text_extraction_chunking.ipynb

# This is a conceptual Synapse Spark Notebook.
# It would be triggered by the 'classification-orchestrator' Azure Function
# for larger/complex files that require distributed processing for text extraction.

# --- Parameters ---
# These parameters would be passed when triggering the Synapse Notebook via API/SDK
dbutils.widgets.text("filePath", "", "File Path to Process (ADLS Gen2)")
dbutils.widgets.text("outputFolder", "/processed_chunks/", "Output Folder for Chunks")
file_path = dbutils.widgets.get("filePath")
output_folder = dbutils.widgets.get("outputFolder")

print(f"Processing file: {file_path}")

# --- 1. Read Data from ADLS Gen2 ---
# Spark can read directly from ADLS Gen2
# Example: reading a CSV (adjust for PDF, Docx, etc.)
# For binary files like PDF/Docx, you'd integrate with libraries like Apache Tika (if Spark supports it)
# or external OCR services before this step.
try:
    if file_path.endswith(".csv"):
        df_raw = spark.read.csv(file_path, header=True, inferSchema=True)
        # Combine relevant columns into a single text string for classification
        df_text = df_raw.withColumn("full_text", F.concat_ws(" ", *df_raw.columns)).select("full_text")
    elif file_path.endswith(".txt"):
        df_text = spark.read.text(file_path).withColumnRenamed("value", "full_text")
    elif file_path.endswith(".json"):
        df_raw = spark.read.json(file_path)
        # Flatten JSON and convert to text
        df_text = df_raw.select(F.to_json(F.struct(F.col("*"))).alias("full_text"))
    # Add more file types (PDF, Docx - require external libraries/services for content extraction)
    else:
        print(f"Unsupported file type for direct text extraction: {file_path}")
        # Here, you might log an error and/or trigger an OCR service if it's an image.
        # For this example, we'll just use the file path as text if content cannot be read.
        df_text = spark.createDataFrame([(f"File content could not be extracted directly from {file_path}.")]).toDF("full_text")

    print(f"Extracted {df_text.count()} rows of text.")
    df_text.show(truncate=False)

except Exception as e:
    print(f"Error reading file {file_path}: {e}")
    # Handle error: log, potentially send to a dead-letter queue.
    df_text = spark.createDataFrame([(f"Error processing {file_path}: {e}")]).toDF("full_text")


# --- 2. Text Chunking (for large documents) ---
# If a document is too large for a single API call to Azure OpenAI/Cognitive Services, chunk it.
# This is a simplified example; real chunking requires more advanced NLP techniques (sentence tokenization etc.)
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

MAX_CHUNK_SIZE = 4000 # Max characters for OpenAI/Cognitive Services input

@F.udf(returnType=F.ArrayType(StringType()))
def chunk_text_udf(text):
    if not text:
        return []
    chunks = []
    for i in range(0, len(text), MAX_CHUNK_SIZE):
        chunks.append(text[i:i + MAX_CHUNK_SIZE])
    return chunks

df_chunks = df_text.withColumn("chunks", chunk_text_udf(F.col("full_text"))) \
                   .withColumn("chunk", F.explode("chunks")) \
                   .withColumn("original_file_path", F.lit(file_path)) \
                   .select("original_file_path", "chunk")

print(f"Generated {df_chunks.count()} chunks.")
df_chunks.show(truncate=False)

# --- 3. Save Chunks to ADLS Gen2 ---
# These chunks can then be processed in parallel by Azure Functions calling AI services.
output_path = f"abfss://{output_folder.strip('/')}@${data.azurerm_storage_account.adls_gen2_account.name}.dfs.core.windows.net/processed_chunks/"
df_chunks.write.mode("overwrite").parquet(output_path) # Save as Parquet
print(f"Processed chunks saved to: {output_path}")

# Note: The Synapse Notebook would then signal completion (e.g., by writing a success file,
# or the orchestrating Azure Function would poll for its status).


NameError: name 'dbutils' is not defined