In [None]:
!pip install 'datafog==3.0.1'

# Example: Annotating PII from text


## Setup

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("DataFog") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

## Spark Functions to broadcast over DataFrame

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.types import StructType, StructField, StringType
import spacy
import requests

PII_ANNOTATION_LABELS = ["DATE_TIME", "LOC", "NRP", "ORG", "PER"]
MAXIMAL_STRING_SIZE = 1000000

def pii_annotator(text: str, broadcasted_nlp) -> list[list[str]]:
    """Extract features using en_spacy_pii_fast model.

    Returns:
        list[list[str]]: Values as arrays in order defined in the PII_ANNOTATION_LABELS.
    """
    if text:
        if len(text) > MAXIMAL_STRING_SIZE:
            # Cut the strings for required sizes
            text = text[:MAXIMAL_STRING_SIZE]
        nlp = broadcasted_nlp.value
        doc = nlp(text)

        # Pre-create dictionary with labels matching to expected extracted entities
        classified_entities: dict[str, list[str]] = {
            _label: [] for _label in PII_ANNOTATION_LABELS
        }
        for ent in doc.ents:
            # Add entities from extracted values
            classified_entities[ent.label_].append(ent.text)

        return [_ent for _ent in classified_entities.values()]
    else:
        return [[] for _ in PII_ANNOTATION_LABELS]

def broadcast_pii_annotator_udf(spark_session: SparkSession, spacy_model: str = "en_spacy_pii_fast"):
    """Broadcast PII annotator across Spark cluster and create UDF"""
    broadcasted_nlp = spark_session.sparkContext.broadcast(
        spacy.load(spacy_model)
    )

    pii_annotation_udf = udf(
        lambda text: pii_annotator(text, broadcasted_nlp),
        ArrayType(ArrayType(StringType())),
    )
    return pii_annotation_udf

In [None]:
sotu_url = 'https://gist.githubusercontent.com/sidmohan0/1aa3ec38b4e6594d3c34b113f2e0962d/raw/42e57146197be0f85a5901cd1dcdd9ad15b31bab/sotu_2023.txt'

# Fetch the content of the text file
response = requests.get(sotu_url)
sotu_text = response.text

# Create a DataFrame from the text data
df = spark.createDataFrame([(line,) for line in sotu_text.split('\n') if line], ["text"])
df.show()


+--------------------+
|                text|
+--------------------+
|Mr. Speaker, Mada...|
|And, by the way, ...|
|Members of the Ca...|
|You know, I start...|
|Speaker, I don’t ...|
|And I want to con...|
|He won despite th...|
|Congratulations t...|
|And congratulatio...|
|Well, I tell you ...|
|Folks, the story ...|
|We’re the only co...|
|Look, folks, that...|
|Two years ago, th...|
|Two years ago — a...|
|And two years ago...|
|As we gather here...|
|When world leader...|
|You know, we’re o...|
|Yes, we disagreed...|
+--------------------+
only showing top 20 rows



# Feature Extraction

In [None]:
extract_features_udf = broadcast_pii_annotator_udf(spark, spacy_model="en_spacy_pii_fast")

df = df.withColumn("en_spacy_pii_fast", extract_features_udf(df.text))
df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+
|text                                                                                                                                                                                                                                                                                 |en_spacy_pii_fast                                                        |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------

#