In [1]:
from dotenv import load_dotenv
import polars as pl
from tqdm import tqdm
from neo4j import GraphDatabase
from neo4j.exceptions import DatabaseError 
import os
import uuid

In [2]:
# Load environment variables
load_dotenv()

# Variables
NEO4J_USERNAME = os.getenv("NEO4J_USERNAME")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
NEO4J_URI = os.getenv("NEO4J_URI")

In [3]:
# Connect to Neo4j
driver = GraphDatabase.driver(uri = NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))

In [4]:
# Import the dataframe form the processing pipeline
chunks_df = pl.read_parquet("/Users/borja/Documents/Somniumrema/projects/genai/grag/pipeline_outcomes/chunks_df.parquet")

# Select only the columns needed
chunks_df = chunks_df[['post_url', 'post_title', 'series_number', 'blog_date', 'blog_title', 'chunk_text', 'entities']]

# Add a 'chunk_id' column to the DataFrame with UUIDs (if not already present)
if "chunk_id" not in chunks_df.columns:
    chunks_df = chunks_df.with_columns([
        pl.Series("chunk_id", [str(uuid.uuid4()) for _ in range(len(chunks_df))])
    ])
# Show the first five
chunks_df

post_url,post_title,series_number,blog_date,blog_title,chunk_text,entities,chunk_id
str,str,str,date,str,str,list[list[str]],str
"""https://www.acquired.fm/episod…","""Costco""","""Season 13, Episode 2""",2023-08-20,"""The Complete History & Strateg…","""I don't think I have ever been…","[[""CharlieMunger ?"", ""PER""], [""Ben"", ""PER""], … [""WarrenBuffett"", ""PER""]]","""9a2f203c-f18d-4f1b-965d-1bed0e…"
"""https://www.acquired.fm/episod…","""Costco""","""Season 13, Episode 2""",2023-08-20,"""The Complete History & Strateg…","""But unlike Costco today, they …","[[""costco"", ""ORG""], [""Fedco"", ""ORG""], … [""Fedco"", ""ORG""]]","""87c6fd73-b811-423f-a25b-9a0a53…"
"""https://www.acquired.fm/episod…","""Costco""","""Season 13, Episode 2""",2023-08-20,"""The Complete History & Strateg…","""Jim started as a grocery bagge…","[[""Jim"", ""PER""], [""CraigJelinek"", ""PER""], … [""SamWalton"", ""PER""]]","""ba860315-b8c4-4ee2-9bdc-95d2a1…"
"""https://www.acquired.fm/episod…","""Costco""","""Season 13, Episode 2""",2023-08-20,"""The Complete History & Strateg…","""And then they have the greates…","[[""SanDiego"", ""LOC""], [""SanDiegoCityCredit"", ""ORG""], … [""costco"", ""ORG""]]","""19b6d3ed-55a6-4424-bdb6-781004…"
"""https://www.acquired.fm/episod…","""Costco""","""Season 13, Episode 2""",2023-08-20,"""The Complete History & Strateg…","""In 1982, they do ultimately li…","[[""NASDAQ"", ""MISC""], [""sol"", ""PER""], … [""PriceClub"", ""ORG""]]","""46581ec4-ff10-42e9-b104-ee092c…"
…,…,…,…,…,…,…,…
"""https://www.acquired.fm/episod…","""Special‚ An Acquirer's View in…","""Season 1, Episode 18""",2016-08-22,"""Related Episodes""","""This person who's been their r…","[[""ProfitFromThe"", ""MISC""], [""NBA"", ""MISC""]]","""ae6a1ceb-bc15-4219-a11d-44699b…"
"""https://www.acquired.fm/episod…","""Jet""","""Season 1, Episode 19""",2016-08-29,"""Related Episodes""","""I'm here at Adobe so feel free…","[[""Adobe"", ""ORG""], [""Barada@Adobe.com"", ""ORG""], … [""jet.com"", ""ORG""]]","""5852936f-2872-4abd-81a6-9780c6…"
"""https://www.acquired.fm/episod…","""Jet""","""Season 1, Episode 19""",2016-08-29,"""Related Episodes""","""Well, that was what I was goin…","[[""jet"", ""ORG""], [""jet"", ""ORG""], … [""american"", ""MISC""]]","""7391fc5e-47b5-4a41-b462-6e8532…"
"""https://www.acquired.fm/episod…","""Jet""","""Season 1, Episode 19""",2016-08-29,"""Related Episodes""","""Boy, that doesn't sound like W…","[[""jet"", ""ORG""], [""Walmart"", ""ORG""], … [""Microsoft"", ""ORG""]]","""8b6c4771-790f-44c8-9921-b9b387…"


In [None]:
chunks_df.filter(pl.col("post_title") == "Airbnb")

post_url,post_title,series_number,blog_date,blog_title,chunk_text,entities,chunk_id
str,str,str,date,str,str,list[list[str]],str
"""https://www.acquired.fm/episod…","""Airbnb""","""Season 7, Episode 8""",2020-12-10,"""‍""","""Similarly for gifts, it's a li…","[[""lpshow"", ""ORG""], [""BenGilbert"", ""PER""], … [""DoorDash"", ""ORG""]]","""1f55e38f-468a-457e-b539-d84aa7…"
"""https://www.acquired.fm/episod…","""Airbnb""","""Season 7, Episode 8""",2020-12-10,"""‍""","""But I remember looking at hote…","[[""WWDC"", ""MISC""], ["""", ""MISC""], … [""SanFrancisco"", ""LOC""]]","""fff320b7-171e-414a-b4b6-fa0c64…"
"""https://www.acquired.fm/episod…","""Airbnb""","""Season 7, Episode 8""",2020-12-10,"""‍""","""They're still trying to basica…","[[""cheerio"", ""MISC""], [""y"", ""ORG""], … [""JeffBezos"", ""PER""]]","""c72d72b5-4194-4102-a893-c08f9f…"
"""https://www.acquired.fm/episod…","""Airbnb""","""Season 7, Episode 8""",2020-12-10,"""‍""","""For something that seemed like…","[[""Sequoia"", ""ORG""], [""Airbnb"", ""ORG""], … [""Facebook"", ""MISC""]]","""5cc7bdb9-6783-41c9-8b81-128d2c…"
"""https://www.acquired.fm/episod…","""Airbnb""","""Season 7, Episode 8""",2020-12-10,"""‍""","""Never heard of a startup winni…","[[""NobelPeacePrize ,"", ""MISC""], [""UpstartstoBrad"", ""MISC""], … [""Samwer"", ""PER""]]","""eb1a4710-3049-4d5e-be5b-47b62c…"
"""https://www.acquired.fm/episod…","""Airbnb""","""Season 7, Episode 8""",2020-12-10,"""‍""","""Which we've alluded to in our …","[[""ipo"", ""MISC""]]","""7c97989d-97b1-4a26-b8c6-4e03e5…"
"""https://www.acquired.fm/episod…","""Airbnb""","""Season 7, Episode 8""",2020-12-10,"""‍""","""We do have some more nuggets h…","[[""s1"", ""MISC""], [""bamboo"", ""ORG""], … [""BrianChesky"", ""PER""]]","""fad272ff-c099-4b06-a999-7fb067…"
"""https://www.acquired.fm/episod…","""Airbnb""","""Season 7, Episode 8""",2020-12-10,"""‍""","""All markets are supply and dem…","[[""Airbnb"", ""ORG""], [""Airbnb"", ""ORG""], … [""Airbnb"", ""ORG""]]","""4248dafc-a16d-45e2-8bce-e8b2c2…"
"""https://www.acquired.fm/episod…","""Airbnb""","""Season 7, Episode 8""",2020-12-10,"""‍""","""For consumers, it's very easy …","[[""uber"", ""MISC""], [""Airbnb"", ""MISC""], … [""Airbnb"", ""ORG""]]","""bc907952-0dcb-40f2-9cc9-672b33…"


In [None]:
# # Create the full-text index
# with driver.session() as session:
#     session.run(
#         "CREATE FULLTEXT INDEX chunk_fulltext_index FOR (c:Chunk) ON EACH [c.text] OPTIONS { indexConfig: { `fulltext.analyzer`: 'standard' } };"
#     )

In [None]:
# Prepare data for bulk upload with DISTINCT podcast nodes
podcast_nodes = []
chunk_nodes = []
belongs_to_rels = []
entity_nodes = []
mentions_rels = []

# Create sets to keep track of unique nodes and relationships
unique_podcast_nodes = set()
unique_chunk_nodes = set()
unique_belongs_to_rels = set()
unique_entity_nodes = set()
unique_mentions_rels = set()

for row in chunks_df.iter_rows(named=True):
    # --- Podcast Nodes ---
    
    # Create a key for the podcast based on its identifying attributes
    podcast_key = (row['post_url'], row['post_title'])  # Use a tuple of relevant attributes

    if podcast_key not in unique_podcast_nodes:
        podcast_node = {
            "podcast_id": str(uuid.uuid4()),
            "post_url": row['post_url'],
            "post_title": row['post_title'],
            "blog_date": row['blog_date'],
            "blog_title": row['blog_title'],
            "series_number": row['series_number']
        }
        podcast_nodes.append(podcast_node)
        unique_podcast_nodes.add(podcast_key)

    # --- Chunk Nodes ---
    chunk_node = {
        "chunk_id": row['chunk_id'],
        "text": row['chunk_text']
    }
    if tuple(chunk_node.items()) not in unique_chunk_nodes:
        chunk_nodes.append(chunk_node)
        unique_chunk_nodes.add(tuple(chunk_node.items()))

    # --- BELONGS_TO Relationships ---
    belongs_to_rel = {
        "chunk_id": row['chunk_id'],
        "podcast_id": podcast_node['podcast_id']  # Use the podcast_id from the podcast_node
    }
    if tuple(belongs_to_rel.items()) not in unique_belongs_to_rels:
        belongs_to_rels.append(belongs_to_rel)
        unique_belongs_to_rels.add(tuple(belongs_to_rel.items()))

    # --- Entity Nodes and MENTIONS Relationships ---
    for entity, label in row['entities']:
        entity_node = {
            "name": entity,
            "label": label
        }
        if tuple(entity_node.items()) not in unique_entity_nodes:
            entity_nodes.append(entity_node)
            unique_entity_nodes.add(tuple(entity_node.items()))

        mentions_rel = {
            "chunk_id": row['chunk_id'],
            "entity_name": entity,
            "entity_label": label
        }
        if tuple(mentions_rel.items()) not in unique_mentions_rels:
            mentions_rels.append(mentions_rel)
            unique_mentions_rels.add(tuple(mentions_rel.items()))

In [None]:
# Check the number of nodes and relationships
len(podcast_nodes), len(chunk_nodes), len(belongs_to_rels), len(entity_nodes), len(mentions_rels)

(199, 1304, 1304, 4579, 9365)

In [12]:
import re
from rapidfuzz import fuzz
from collections import defaultdict
import polars as pl

def clean_node_name(name):
    """
    Cleans the node name by:
    - Removing extraneous punctuation
    - Adding spaces before uppercase letters for better readability
    - Standardizing to title case
    """
    if not isinstance(name, str):
        return name  # Return as is if not a string
    
    name = re.sub(r'[^\w\s]', '', name)  # Remove punctuation
    name = re.sub(r'(?<!^)(?=[A-Z])', ' ', name)  # Space before uppercase
    name = name.title()  # Title case
    return ' '.join(name.split())  # Remove extra spaces

def standardize_nodes(nodes, name_column="name"):
    """
    Applies cleaning to all node names and adds standardized name fields.
    """
    df = pl.DataFrame(nodes)
    
    # Clean and standardize node names
    df = df.with_columns([
        pl.col(name_column).map_elements(clean_node_name, return_dtype=pl.Utf8).alias("cleaned_name"),
        pl.col(name_column).map_elements(lambda x: clean_node_name(x).lower() if isinstance(x, str) else x, return_dtype=pl.Utf8).alias("cleaned_name_lower")
    ])
    return df

def identify_duplicates(df, similarity_threshold=90):
    """
    Identifies duplicates based on name similarity.
    """
    duplicates = defaultdict(list)
    unique_names = df["cleaned_name"].unique().to_list()
    
    for i, name in enumerate(unique_names):
        for other_name in unique_names[i + 1:]:
            similarity = fuzz.ratio(name.lower(), other_name.lower())
            if similarity >= similarity_threshold:
                duplicates[name].append(other_name)
    return duplicates

def merge_duplicates(df, duplicates):
    """
    Merges duplicate entries by selecting a canonical name and updating labels.
    """
    merged_entries = []
    for canonical, dup_list in duplicates.items():
        labels = df.filter(pl.col("cleaned_name") == canonical)["label"].to_list()
        most_common_label = max(set(labels), key=labels.count) if labels else 'MISC'
        merged_entries.append({'name': canonical, 'label': most_common_label})
        
        for dup in dup_list:
            dup_labels = df.filter(pl.col("cleaned_name") == dup)["label"].to_list()
            dup_most_common_label = max(set(dup_labels), key=dup_labels.count) if dup_labels else 'MISC'
            merged_entries.append({'name': dup, 'label': dup_most_common_label})
    
    merged_df = pl.DataFrame(merged_entries)
    return merged_df

def correct_labels(merged_df):
    """
    Corrects misclassified labels based on business rules.
    """
    label_corrections = {
        'Sol': 'PER',
    }
    
    merged_df = merged_df.with_columns(
        pl.col("label").map_elements(
            lambda label, name: label_corrections.get(name.lower(), label) 
            if name.lower() in label_corrections else label, 
            return_dtype=pl.Utf8
        )
    )
    return merged_df

def clean_and_deduplicate(nodes, similarity_threshold=90):
    df = standardize_nodes(nodes)
    duplicates = identify_duplicates(df, similarity_threshold=similarity_threshold)
    merged_df = merge_duplicates(df, duplicates)
    
    # Ensure merged_df has the necessary columns
    if "name" not in merged_df.columns:
        merged_df = merged_df.with_columns(pl.col("cleaned_name").alias("name"))
    if "label" not in merged_df.columns:
        merged_df = merged_df.with_columns(pl.lit("MISC").alias("label"))
    
    corrected_df = correct_labels(merged_df)
    return corrected_df.select(["name", "label"]).to_dicts(), duplicates

# Execution
cleaned_nodes, duplicates_info = clean_and_deduplicate(entity_nodes, similarity_threshold=95)

# Output cleaned nodes and duplicates
print("Cleaned and Deduplicated Nodes:")
for node in cleaned_nodes:
    print(node)

if duplicates_info:
    print("\nIdentified Duplicates:")
    for canonical, dup_list in duplicates_info.items():
        print(f"Canonical Name: {canonical}")
        for dup in dup_list:
            print(f"  - Duplicate: {dup}")


Cleaned and Deduplicated Nodes:
{'name': 'North America', 'label': None}
{'name': 'North American', 'label': None}
{'name': 'Invest Like The', 'label': None}
{'name': 'Invest Likethe', 'label': None}
{'name': 'Charlie Munger', 'label': None}
{'name': 'Charliemunger', 'label': None}
{'name': 'Virgin Galactic', 'label': None}
{'name': 'Virgingalactic', 'label': None}
{'name': 'Pitch Book', 'label': None}
{'name': 'Pitch Books', 'label': None}
{'name': 'Foursquare', 'label': None}
{'name': 'Four Square', 'label': None}
{'name': 'David Rosenthal', 'label': None}
{'name': 'Davidrosenthal', 'label': None}
{'name': 'Squarespace', 'label': None}
{'name': 'Square Space', 'label': None}
{'name': 'Siliconvalley', 'label': None}
{'name': 'Silicon Valley', 'label': None}
{'name': 'Blizzardentertainment', 'label': None}
{'name': 'Blizzard Entertainment', 'label': None}
{'name': 'Donvalentine', 'label': None}
{'name': 'Don Valentine', 'label': None}
{'name': 'Gearsof Wars', 'label': None}
{'name': 'G

In [None]:
# # Bulk upload nodes and relationships
# with driver.session() as session:
#     # --- Podcast Nodes ---
#     for i in tqdm(range(0, len(podcast_nodes), 1000), desc="Creating Podcast Nodes"):
#         batch = podcast_nodes[i:i + 1000]
#         session.run(
#             """
#             UNWIND $podcast_nodes AS podcast
#             MERGE (p:Podcast {podcast_id: podcast.podcast_id})
#             SET p.post_url = podcast.post_url, 
#                 p.post_title = podcast.post_title,
#                 p.blog_date = podcast.blog_date, 
#                 p.blog_title = podcast.blog_title,
#                 p.series_number = podcast.series_number
#             """,
#             podcast_nodes=batch
#         )

#     # --- Chunk Nodes ---
#     for i in tqdm(range(0, len(chunk_nodes), 1000), desc="Creating Chunk Nodes"):
#         batch = chunk_nodes[i:i + 1000]
#         session.run(
#             """
#             UNWIND $chunk_nodes AS chunk
#             CREATE (c:Chunk {chunk_id: chunk.chunk_id, text: chunk.text})
#             """,
#             chunk_nodes=batch
#         )

#     # --- BELONGS_TO Relationships ---
#     for i in tqdm(range(0, len(belongs_to_rels), 1000), desc="Creating BELONGS_TO Relationships"):
#         batch = belongs_to_rels[i:i + 1000]
#         session.run(
#             """
#             UNWIND $belongs_to_rels AS rel
#             MATCH (c:Chunk {chunk_id: rel.chunk_id})
#             MATCH (p:Podcast {podcast_id: rel.podcast_id})
#             CREATE (c)-[:BELONGS_TO]->(p)
#             """,
#             belongs_to_rels=batch
#         )

#     # --- Entity Nodes and MENTIONS Relationships ---
#     unique_entity_nodes = []
#     for entity in entity_nodes:
#         if entity not in unique_entity_nodes:
#             unique_entity_nodes.append(entity)

#     for i in tqdm(range(0, len(unique_entity_nodes), 1000), desc="Creating Entity Nodes"):
#         batch = unique_entity_nodes[i:i + 1000]
#         session.run(
#             """
#             UNWIND $entity_nodes AS entity
#             MERGE (e:Entity {name: entity.name, label: entity.label})  
#             """,
#             entity_nodes=batch
#         )

#     # Create a list to store unique mentions relationships
#     unique_mentions_rels = []
#     for rel in mentions_rels:
#         if rel not in unique_mentions_rels:
#             unique_mentions_rels.append(rel)
#     for i in tqdm(range(0, len(unique_mentions_rels), 1000), desc="Creating MENTIONS Relationships"):
#         batch = unique_mentions_rels[i:i + 1000]
#         try:
#             session.run(
#                 """
#                 UNWIND $mentions_rels AS rel
#                 MATCH (c:Chunk {chunk_id: rel.chunk_id})
#                 MATCH (e:Entity {name: rel.entity_name, label: rel.entity_label})  
#                 CREATE (c)-[:MENTIONS]->(e)
#                 """,
#                 mentions_rels=batch
#             )
#         except DatabaseError as e:
#             if e.code == DatabaseError.Transaction.TransactionCommitFailed:
#                 print(f"Error creating MENTIONS relationships (batch {i // 1000 + 1}): {e.message}")
#                 # Handle the error (e.g., log the error, skip the batch, retry with smaller batches)
#             else:
#                 raise e  # Raise other types of errors