# Init

In [1]:
# Notebook specific
from shared.utils.import_utils import import_or_reload

# Load environment variables
from dotenv import load_dotenv
load_dotenv('./.env_staging')

# Configure database connection
import os
from sqlalchemy.engine import URL

db_url = URL.create(
    drivername="postgresql",
    username=os.getenv("DB_USERNAME"),
    password=None,
    host=os.getenv("DB_HOST"),
    port=5432,
    database=os.getenv("DB_DATABASE"),
)

os.environ["SQL_DATABASE_URL"] = db_url.render_as_string(hide_password=True)
from shared.db.session import engine


session.py: successful connexion to PostgreSQL !
session.py: database URL: postgresql://cedric@easygy-aurora-cluster.cluster-cywqf0ulc6ar.eu-west-3.rds.amazonaws.com:5432/easy_crm_uat


# Initial Relation Extraction

In [2]:
# Imports
import json
from typing import cast

import_or_reload("graph.utils.event_retriever")
from graph.utils.event_retriever import get_opportunity_events

import_or_reload("shared.models.event_base")
from shared.models.event_base import EventBaseModel

import_or_reload("shared.semantics.model_configs.semantic_model")
import_or_reload("shared.semantics.semantic_function")
from shared.semantics.semantic_function import run_semantic_pipeline

import_or_reload("graph.utils.event_filter")
from graph.utils.event_filter import create_business_relevance_config

import_or_reload("shared.models.graph_base")
from shared.models.graph_base import EventRelations, Relation, Entity

# Get the records from the table event for an opportunity
# with the concatenated column fomatted (string)
print("\n*****************\nRetrieving events for the opportunity...")
events_df = get_opportunity_events(
    engine,
    opportunity_id="006IV00000f2fykYAA", 
    limit=20)
print(f"Number of events for the opportunity: {len(events_df)}")

# Convert to a dictionary of EventBaseModel with event_id as key
event_dict: dict[str, EventBaseModel] = {}
for record in events_df.to_dict(orient="records"):
    record['formatted'] = json.loads(record['formatted'])
    event_dict[record["event_id"]] = EventBaseModel(**record) # type: ignore

# Filter the relevant events
print("\n*****************\nFiltering relevant events...")
semantic_payload_config = {
    "item_table": "event",
    "table_schema":  "crm_assistant",
    "id_column": "event_id",
    "type_column": "field",
}

results, _ = await run_semantic_pipeline(
    item_ids = list(event_dict.keys()),
    semantic_function_str = "filter",
    semantic_function_config = create_business_relevance_config,
    semantic_payload_config = semantic_payload_config,
    llm_model = "gpt-4o-mini",
    batch_size = 500,
    overwrite = False,
    client = None
)

is_relevant = [item.result_value_.get("is_relevant", False)
               if item.result_value_ is not None
               else False
               for item in results.get("processed", [])]

event_dict = {
    event_id: event_dict[event_id]
    for event_id, is_relevant in zip(event_dict.keys(), is_relevant)
    if is_relevant
}
print(f"Number of relevant events after filtering: {len(event_dict)}")

# Extract relations from the relevant events
print("\n*****************\nExtract relations...")

results, _ = await run_semantic_pipeline(
    item_ids = list(event_dict.keys()),
    semantic_function_str = "extract_relations",
    semantic_function_config = {},
    semantic_payload_config = semantic_payload_config,
    llm_model = "gpt-4o-mini",
    batch_size = 100,
    overwrite = True,
    client = None
)




*****************
Retrieving events for the opportunity...


Reading from S3: 100%|██████████| 20/20 [00:00<00:00, 35.26it/s]


Number of events for the opportunity: 20

*****************
Filtering relevant events...
Processing batch 1/1 events

[process_pipeline] - Processed items: 20

[process_pipeline] - Pass-through items: 0
[process_pipeline] - Missing items: 0

[process_pipeline] - No S3 upload needed.

[process_pipeline] - No rows to upload to DB.
[from_semantic_table_to_response_model] - Reading 20 cached items from the semantic result table.


Reading from S3: 100%|██████████| 20/20 [00:00<00:00, 99.48it/s]


[from_semantic_table_to_response_model] - Reading 20 S3 payloads.


Reading from S3: 100%|██████████| 20/20 [00:00<00:00, 92.00it/s]


Number of relevant events after filtering: 12

*****************
Extract relations...
Processing batch 1/1 events

[process_pipeline] - Overwriting results - - Considering processed items: 0.

[process_pipeline] - Pass-through items: 0
[process_pipeline] - Missing items: 12


Reading from S3: 100%|██████████| 12/12 [00:00<00:00, 74.86it/s]
Processing extract_relations events:  50%|█████     | 6/12 [00:36<00:34,  5.70s/it]
[extract_relations] Exception for event '{"type": "transcription", "from": "Thibault Boyer"...':<
1 validation error for ExtractRelationsModel
relations.1.roles.themes.0
  Value error, Le label 'Dossier' est trop générique. Utilisez un label métier plus spécifique. N'utilise pas les termes suivants: agent, theme, trigger, purpose, reason, instrument, beneficiary, context, origin, destination, co_agent, location, time, personne, organisation, lieu, objet, concept, événement, document, information, donnée, dossier. [type=value_error, input_value={'label': 'Dossier', 'nam...ande.', 'role': 'theme'}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/value_error.
Returning default response.
Processing extract_relations events: 100%|██████████| 12/12 [00:54<00:00,  4.53s/it]



[process_pipeline] - Processed missing items: 12

[process_pipeline] - Uploading to S3...


Uploading to S3: 100%|██████████| 11/11 [00:02<00:00,  3.97it/s]



[process_pipeline] - Uploading to DB...


Uploading data: 100%|██████████| 12/12 [00:01<00:00,  9.71it/s]

total_processed 12
inserted 12
rejected 0





In [3]:
from shared.models.graph_base import EventRelations, Relation, Entity

print("\n*****************\nCollect relations and fill metadata...")
metadata = {}
metadata_keys = ['id_', 'status_', 'type_', 'date_', 's3_path_', 'result_value_']

relations: list[Relation] = []
entities: list[Entity] = []

for event_relations in results['processed']:
    for key in metadata_keys:
        metadata[key] = getattr(event_relations, key)
    for i, relation in enumerate(event_relations.relations):
        relation.metadata = metadata.copy()
        relations.append(relation)
        for roles in relation.roles:
            for entity in roles[1]:
                entity.metadata = metadata.copy()
                entities.append(entity)
print(f"Number of relations extracted: {len(relations)}")

for relation in relations:
    print("\n*****************")
    print(f"\nEvent n°{relation.metadata['id_']}")
    print(f"\nContexte: {relation.description}")
    print(f"-> {relation.relation_type.verb}")
    print(f"-> {relation.relation_type.target_category}")
    print(f"-> {relation.relation_type.label}")
    print(f"-> {relation.relation_type.definition}")
    print("\n")
    for role in relation.roles:
        print(f"{role[0]:<21} |") 
        if role[1]:
            for entity in role[1]:
                print(f"  --- {entity.role:<15} | {entity.label:<25} | {entity.name:<20}")


*****************
Collect relations and fill metadata...
Number of relations extracted: 34

*****************

Event n°148262172

Contexte: Thibault Boyer informe Raphaël Cohen qu'il a pu faire une offre pour l'hôtel Astres, mais que celle-ci a été invalidée en interne car le client était déjà chez un autre fournisseur.
-> INFORMER
-> offre
-> INFORMER_OFFRE
-> Action de communiquer des informations concernant une proposition commerciale.


agents                |
  --- agent           | Commercial                | Thibault Boyer      
  --- agent           | Commercial                | Raphaël Cohen       
themes                |
  --- theme           | Offre                     | Offre Pour L'Hôtel Astres
circumstances         |
  --- reason          | Raison                    | Client Déjà Chez Un Autre Fournisseur
context               |
  --- context         | Communication             | Appel Téléphonique  
origin_destinations   |
time_locations        |

*****************

Eve

# Cluster and Generate Models

In [4]:
import openai
import instructor
from instructor import AsyncInstructor, Mode

import_or_reload("shared.models.graph_base")
from shared.models.graph_base import RelationTypeClusters

import_or_reload("graph.utils.clustering")
from graph.utils.clustering import get_artefacts_clusters, sub_cluster, synth_clusters

import_or_reload("graph.graph_converter")
from graph.graph_converter import generate_py

#Get clusters from the relations
print("\n*****************\nGet relation clusters...")
embds_clusters  = get_artefacts_clusters(relations)

# Sub-cluster the clusters by relation types
print("\n*****************\nSub-cluster relations by types...")
llm_model = 'gpt-4o-mini'
client: AsyncInstructor = instructor.patch(
                openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")),
                mode=Mode.TOOLS_STRICT,
            ) # type: ignore

relation_type_clusters_list: list[RelationTypeClusters] = await sub_cluster(
    clusters=embds_clusters,
    client=client,
    llm_model=llm_model
) # Each element of the list is a cluster of relation types

# Synthesize the clusters
# TODO: make synth_clusters() faster
print("\n*****************\nSynthetize clusters...")
relation_type_clusters = await synth_clusters(
    relation_type_clusters=relation_type_clusters_list,
    client=client,
    llm_model='o3',
    max_examples_for_synthesis=200
)
print("Done. Number of clusters synthesized: ", len(relation_type_clusters.clusters))

# Generate business relation type models from the clusters and save to file
print("\n*****************\nGenerate business relation type models...")
file_str = generate_py(
    relation_type_clusters,
    save_to_file=True,
    file_path="shared/models/biz_relation_types.py"
)


*****************
Get relation clusters...
Computing embeddings using model text-embedding-3-small
Calling OpenAI API for 34 embeddings
Processing batch #1 with 34 items...
Attaching embeddings to items
Number of relations: 34
Number of items to embed: 34
Number of computed embeddings: 34
Cumulative explained variance:  0.7743118112483961
Number of PCA components:  12
UMAP embedding shape:  (34, 12)
Number of clusters found: 2
Number of data points: 34
Number of noise points: 2

*****************
Sub-cluster relations by types...
Preparing task for cluster 0 (1/2), which has 7 examples.
Preparing task for cluster 1 (2/2), which has 25 examples.

Executing 2 sub-clustering tasks in parallel...

*****************
Synthetize clusters...
Done. Number of clusters synthesized:  5

*****************
Generate business relation type models...


In [None]:
import openai
import instructor
from instructor import AsyncInstructor, Mode

import_or_reload("shared.models.graph_base")
from shared.models.graph_base import RelationTypeClusters, EntityTypeClusters

import_or_reload("graph.utils.clustering")
from graph.utils.clustering import get_artefacts_clusters, sub_cluster, synth_clusters

import_or_reload("graph.graph_converter")
from graph.graph_converter import generate_py
type = 'entity'

#Get clusters from the artefacts
print("\n*****************\nGet artefact clusters...")
embds_clusters  = get_artefacts_clusters(entities)


# Sub-cluster the clusters by artefact types
print("\n*****************\nSub-cluster artefacts by types...")
llm_model = 'gpt-4o-mini'
client: AsyncInstructor = instructor.patch(
                openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")),
                mode=Mode.TOOLS_STRICT,
            ) # type: ignore

artefact_type_clusters_list: list[EntityTypeClusters] = await sub_cluster(
    clusters=embds_clusters,
    client=client,
    llm_model=llm_model,
    type=type
) # Each element of the list is a cluster of entity types


*****************
Get artefact clusters...
Computing embeddings using model text-embedding-3-small
Calling OpenAI API for 166 embeddings
Processing batch #1 with 166 items...
Attaching embeddings to items
Number of relations: 166
Number of items to embed: 166
Number of computed embeddings: 166
Cumulative explained variance:  0.7595825792795968
Number of PCA components:  21
UMAP embedding shape:  (166, 15)
Number of clusters found: 24
Number of data points: 166
Number of noise points: 15

*****************
Sub-cluster artefacts by types...
Preparing task for cluster 0 (1/24), which has 6 examples.
Preparing task for cluster 1 (2/24), which has 8 examples.
Preparing task for cluster 2 (3/24), which has 5 examples.
Preparing task for cluster 3 (4/24), which has 4 examples.
Preparing task for cluster 4 (5/24), which has 5 examples.
Preparing task for cluster 5 (6/24), which has 8 examples.
Preparing task for cluster 6 (7/24), which has 5 examples.
Preparing task for cluster 7 (8/24), whic

In [11]:
for cl in artefact_type_clusters_list:
    clusters = cl.clusters
    for cluster in cl.clusters:
        print(f"Cluster label: {cluster.name}")

Cluster label: COLLABORATEUR
Cluster label: COLLABORATEUR_ADMINISTRATIF
Cluster label: COLLABORATEUR_TECHNIQUE
Cluster label: COLLABORATEUR_COMMERCIAL
Cluster label: INTERLOCUTEUR
Cluster label: COMMUNICATION_VERBALE_PROFESSIONNELLE
Cluster label: APPELS_TELEPHONIQUES_ADMINISTRATIFS
Cluster label: APPEL
Cluster label: COMMERCIAUX
Cluster label: NEGOCIATEURS
Cluster label: COMMERCIAL
Cluster label: COMMERCIAUX
Cluster label: RESPONSABLE_COMMERCIAL
Cluster label: RESPONSABLE_DE_LA_FACTURATION
Cluster label: COMMERCIAUX
Cluster label: RESPONSABLES_DE_LA_SIGNATURE
Cluster label: GESTION_DES_SIGNATURES_ELECTRONIQUES
Cluster label: ASSISTANCE_CLIENT_ET_ACTIONS_DE_SIGNATURE
Cluster label: DOCUMENTS_ET_ACCORDS_FORMELS
Cluster label: INTERACTIONS_ET_NEGOCIATIONS_COMMERCIALES
Cluster label: CONTRATS_FORMELS
Cluster label: CONTRAT_UNIQUE
Cluster label: CLIENTS
Cluster label: FOURNISSEURS
Cluster label: OPERATIONS
Cluster label: ROLES_COMMERCIAUX
Cluster label: ROLES_DE_GESTION_ET_COMMUNICATION
Cl

In [9]:
cl

EntityTypeClusters(thinking="Le cluster fourni contient plusieurs occurrences de la même définition pour le terme 'Collaborateur'. Cela indique qu'il n'y a pas de diversité dans les entités présentées, toutes se référant à un même concept. Cependant, pour une analyse plus approfondie, il est possible d'explorer des sous-clusters basés sur des caractéristiques ou des rôles spécifiques que les collaborateurs peuvent avoir au sein d'une entreprise. Par exemple, on pourrait envisager des sous-clusters tels que 'Collaborateur Administratif', 'Collaborateur Technique', 'Collaborateur Commercial', etc. Chacun de ces sous-clusters pourrait avoir des définitions spécifiques qui les distinguent les uns des autres.", clusters=[EntityTypeCluster(name='COLLABORATEUR', definition="Un individu engagé dans des interactions professionnelles au sein d'une entreprise, sans distinction de rôle ou de fonction.", entity_types=[EntityType(label='Collaborateur', definition="Un individu engagé dans des interac

# Relation Extraction

In [None]:
import asyncio
from typing import Coroutine, Any, Union

import_or_reload("shared.models.biz_relation_types")
from shared.models.biz_relation_types import UNION_CLUSTERS, DICT_CLUSTERS

# Create a list of all tasks (coroutines) to be executed.
tasks: list[Coroutine[Any, Any, list[UNION_CLUSTERS]]] = []
for v in event_dict.values():
    prompt = f"""
    Classify the following event into the appropriate business clusters. To be classified, the event must provide relevant information about that cluster as it will be used downstream to extract that information:
    ##########################
    {json.dumps(v.formatted)}
    ##########################
    """

    llm_model = 'gpt-4o-mini'
    task = client.chat.completions.create(
        model=llm_model,
        messages=[
            {
                "role": "system",
                "content": prompt,
            },
        ],
        temperature=0.0 if llm_model != 'o3' else 1.0, # type: ignore
        max_retries=2,
        response_model=list[UNION_CLUSTERS], # type: ignore
        timeout=30,
    )
    tasks.append(task)

# Execute all tasks concurrently and wait for all of them to complete.
# The 'await' is here, on the entire set of tasks.
all_clusters_results = await asyncio.gather(*tasks)

# Associate the results with their corresponding events.
for event, clusters_for_event in zip(event_dict.values(), all_clusters_results):
    event.metadata = {}
    if 'clusters' not in event.metadata:
        event.metadata['clusters'] = {}
    for cluster in clusters_for_event:
        event.metadata['clusters'][cluster.name] = {
            "cluster": cluster,
        }

# Create a cluster to events mapping.
from collections import defaultdict
cluster_to_events: dict[str, list[str]] = defaultdict(list)

#---Iterate through each event in the event_dict.
for event_id, event in event_dict.items():
    if event.metadata and 'clusters' in event.metadata:
        clusters_in_event = event.metadata['clusters']
        for cluster_name in clusters_in_event.keys():
            cluster_to_events[cluster_name].append(event_id)

#--- Convert the defaultdict back to a regular dictionary for cleaner output (optional).
cluster_to_events = dict(cluster_to_events)

#--- Display the resulting dictionary.
n_events = sum([1 for cluster_list in cluster_to_events.values() for _ in cluster_list])
print(f"Found {len(cluster_to_events)} unique clusters referencing {n_events} events in total")


In [None]:
import_or_reload("shared.models.graph_base")
import_or_reload("shared.semantics.model_configs.extract_biz_relations_model")
import_or_reload("shared.semantics.semantic_function")
from shared.semantics.semantic_function import SemanticFunction

tasks: list[Coroutine[Any, Any, tuple[dict[str, Any], SemanticFunction]]] = []

#shortened = {k: cluster_to_events[k] for k in list(cluster_to_events.keys())[0:1]}
shortened = cluster_to_events
for cluster_name, event_ids in shortened.items():

    print("Processing cluster: ", cluster_name)
    BizRelationType = Union[tuple(DICT_CLUSTERS[cluster_name])]
    print("\nThis cluster has the following biz relation types: ", BizRelationType)

    semantic_payload_config: dict[str, str|bool] = {
        "item_table": "event",
        "table_schema":  "crm_assistant",
        "id_column": "event_id",
        "type_column": "field",
        "include_source": False,
    }
    
    task = run_semantic_pipeline(
    item_ids = event_ids,
    semantic_function_str = "extract_biz_relations",
    semantic_function_config = {"BizRelationType": BizRelationType},
    semantic_payload_config = semantic_payload_config,
    llm_model = "gpt-4o-mini",
    batch_size = 100,
    overwrite = True,
    client = None
    )
    
    tasks.append(task)

print(f"\nExecuting {len(tasks)} tasks concurrently...")
results = await asyncio.gather(*tasks)
print("All tasks completed.")


In [None]:
from datetime import datetime as dt

for result in results:
    event_relations_list = result[0]['processed']
    event_ids = result[0]['processed_ids']
    for event_id, relation_model in zip(event_ids, event_relations_list):

        event = event_dict[event_id]

        event.metadata = {}
        event.metadata['_event_id'] = relation_model.id_
        if relation_model.id_ != event.event_id:
            raise ValueError("Ids in event and relation model should be equal.")
        event.metadata['_event_type'] = relation_model.type_
        if relation_model.type_ != event.field:
            raise ValueError("Types in event and relation model should be equal.")
        date = dt.strptime(relation_model.date_, "%Y-%m-%dT%H:%M:%S%z")
        event.metadata['_event_date'] = date
        if date != event.event_date:
           raise ValueError("Dates in event and relation model should be equal.")
        event.metadata['_event_s3_path'] = event.s3_path
        # event.metadata['event_formatted'] = event.formatted
        event.metadata['_xrel_s3_path'] = relation_model.s3_path_
        event.metadata['_xrel_status'] = relation_model.status_
        
        event.processed = {}
        event.processed['_xrel_thinking'] = relation_model.thinking
        event.processed['_xrel_relations'] = relation_model.relations
        event.processed['_xrel_result_value'] = relation_model.result_value_


In [None]:
keys = list(event_dict.keys())
event = event_dict[keys[5]]
event.processed

# Build the graph

In [None]:
import_or_reload("graph.graph_converter")
from graph.graph_converter import generate_graph_elements

nodes, edges = generate_graph_elements(event_dict)

In [None]:
from neo4j import GraphDatabase

# URI examples: "neo4j://localhost", "neo4j+s://xxx.databases.neo4j.io"
NEO4J_URI = os.getenv("NEO4J_URI", "")
NEO4J_USERNAME = os.getenv("NEO4J_USERNAME", "")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "")

AUTH = (NEO4J_USERNAME, NEO4J_PASSWORD)

with GraphDatabase.driver(NEO4J_URI, auth=AUTH) as driver:
    driver.verify_connectivity()

# Import the new neo4j_uploader module
import_or_reload("graph.utils.neo4j_uploader")
from graph.utils.neo4j_uploader import upload_graph_to_neo4j

# Upload the graph data to Neo4j
# Set clear_db=True if you want to clear the database before uploading (be careful in production)
upload_graph_to_neo4j(NEO4J_URI, AUTH, nodes, edges, clear_db=True)