# Web-to-Graph Pipeline

This Jupyter Notebook will guide you step-by-step on how to build a web-powered knowledge graph and automate it with Prefect. 

### Step 1: Extract data with Wikipedia API

Install Required Packages
```bash
pip install requests transformers neo4j python-dotenv prefect pyvis openai

In [1]:
#Define a list of CEO names so Wikipedia knows which ones to return. 

ceo_names = [
    "Elon Musk", "Sundar Pichai", "Tim Cook", "Satya Nadella", "Mark Zuckerberg",
    "Andy Jassy", "Jensen Huang", "Ginni Rometty", "Larry Page", "Susan Wojcicki",
    "Shantanu Narayen", "Reed Hastings", "Michael Dell", "Daniel Ek", "Evan Spiegel",
    "Marc Benioff", "Lisa Su", "Dara Khosrowshahi", "Patrick Collison", "Brian Chesky"
]

In [2]:
# Define a function that retrieves the Wikipedia summaries of the highlighted CEOs.

import requests
def get_wikipedia_summary(name):
    url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{name.replace(' ', '_')}"
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.json().get("extract", "")
        else:
            return ""
    except Exception as e:
        return ""

Wikipedia API outputs structured data which removes the need for manual preprocessing

### Step 2: Entity recognition and relationship mapping using dslim/bert-base-NER model
We extract PER, ORG and LOC entities from the data

#Initialize the NER pipeline.
from transformers import pipeline
ner_pipeline = pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple")

In [None]:
# Extract organization (ORG) entity from each CEO summary
def extract_organizations(text):
    ner_results = ner_pipeline(text)
    orgs = set()
    for entity in ner_results:
        if entity["entity_group"] == "ORG":
            orgs.add(entity["word"])
    return list(orgs)

In [None]:
# Map each CEO to their company

ceo_to_companies = {}
for ceo in ceo_names:
    summary = get_wikipedia_summary(ceo)
    organizations = extract_organizations(summary)
    ceo_to_companies[ceo] = organizations

ceo_to_companies

In [None]:
# Aggregate all unique company entities to remove any duplicates using .update() set method. 

all_companies = set()
for company_list in ceo_to_companies.values():
    all_companies.update(company_list)

all_companies = list(all_companies)
print("Unique companies found:", all_companies)


In [8]:
# Extract location (LOC) entity.

def extract_locations(text):
    ner_results = ner_pipeline(text)
    locations = set()
    for entity in ner_results:
# Check if the entity is labeled as a location
        if entity["entity_group"] == "LOC":
            locations.add(entity["word"])
    return list(locations)

In [None]:
# Map the relationship between location and company.

company_to_locations = {}
for company in all_companies:
    summary = get_wikipedia_summary(company)
    locations = extract_locations(summary)
    company_to_locations[company] = locations

company_to_locations

### Step 3: Store the data in Neo4j
Set up an instance in Neo4j AuraDB and save the connection URI, username (typically, Neo4j) and password as environment variables using a .env file.

In [40]:
import dotenv
dotenv.load_dotenv(".env", override=True)

import os
from neo4j import GraphDatabase
uri = os.environ["NEO4J_URI"]
user = os.environ["NEO4J_USERNAME"]
password = os.environ["NEO4J_PASSWORD"]

driver = GraphDatabase.driver(uri, auth=(user, password))

In [None]:
# Populate Neo4j with the company info. 
from datetime import datetime
def create_node_with_provenance(session, label, name, timestamp):
    session.run(f"""
        MERGE (n:{label} {{name: $name}})
        WITH n
        MATCH (p:Provenance {{run_time: datetime($time)}})
        MERGE (n)-[:EXTRACTED_FROM]->(p)
    """, name=name, time=timestamp)
def create_knowledge_graph(ceo_to_companies, company_to_locations):
    timestamp = datetime.utcnow().isoformat()
    source = "https://en.wikipedia.org/api/rest_v1/page/summary"
    with driver.session() as session:
        #provenance node for the entire data load
        session.run("""
            MERGE (p:Provenance {run_time: datetime($time)})
            SET p.source = $source
        """, time=timestamp, source=source)
        for ceo, companies in ceo_to_companies.items():
            create_node_with_provenance(session, "CEO", ceo, timestamp)
            for company in companies:
                create_node_with_provenance(session, "Company", company, timestamp)
                session.run("""
                    MATCH (c:CEO {name:$ceo}), (comp:Company {name:$company})
                    MERGE (c)-[:OWNS]->(comp)
                """, ceo=ceo, company=company)
                for location in company_to_locations.get(company, []):
                    create_node_with_provenance(session, "Location", location, timestamp)
                    session.run("""
                        MATCH (comp:Company {name:$company}), (l:Location {name:$location})
                        MERGE (comp)-[:LOCATED_IN]->(l)
                    """, company=company, location=location)
create_knowledge_graph(ceo_to_companies, company_to_locations)



This also creates a provenance entity which has `source` and `run_time` properties, and connects to other existing entities. Provenance is crucial for the reproducibility and traceability of knowledge graphs because it provides metadata about the data origin, retrieval time and extracted entities. 

In [None]:
# To view provenance

def view_provenance():
    with driver.session() as session:
        query = """
        MATCH (p:Provenance)<-[:EXTRACTED_FROM]-(n)
        RETURN p.source AS source, p.run_time AS timestamp, collect(n.name) AS linked_entities
        """
        result = session.run(query)
        for record in result:
            print(f"Source: {record['source']}")
            print(f"Timestamp: {record['timestamp']}")
            print("Entities:", record['linked_entities'])
view_provenance()

In [None]:
# Define a function that returns knowledge from the graph. 


def get_graph_data():
    query = """
    MATCH (ceo:CEO)-[:OWNS]->(company:Company)
    OPTIONAL MATCH (company)-[:LOCATED_IN]->(loc:Location)
    RETURN ceo.name AS ceo, company.name AS company, loc.name AS location
    """
    with driver.session() as session:
        results = session.run(query)
        return [record.data() for record in results]

In [None]:
# Visualize the graph


from pyvis.network import Network
def visualize_graph(data):
    net = Network(height='600px', width='100%', notebook=True)
    for item in data:
        ceo = item['ceo']
        company = item['company']
        location = item['location']
        net.add_node(ceo, label=ceo, color='orange', shape='dot')
        net.add_node(company, label=company, color='lightblue', shape='box')
        net.add_edge(ceo, company, label='OWNS')
        if location:
            net.add_node(location, label=location, color='lightgreen', shape='ellipse')
            net.add_edge(company, location, label='LOCATED_IN')
    net.show('ceo_graph.html')

In [None]:
data = get_graph_data()
visualize_graph(data)

### Step 4: Query and validate the graph 

In [None]:
# Import openAI

from openai import OpenAI

In [None]:
# Get your API token from OpenAI and set it as an environment variable.

client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))


In [None]:
# Generate Cypher query with GPT-4.1 to query the Neo4j

def generate_cypher_query(user_input):
# Define the prompt that guides gpt-4.1 to translate natural language into Cypher
    prompt = f"""
You are an assistant that translates natural language into Cypher queries for a Neo4j graph.
The graph has the following entities:
- CEO (with `name` property)
- Company (with `name` property)
- Location (with `name` property)
Relationships:
- (ceo:CEO)-[:OWNS]->(company:Company)
- (company:Company)-[:LOCATED_IN]->(location:Location)
Now write a Cypher query for the following request:
"{user_input}"
Only return the query, without explanations.
"""
    response = client.chat.completions.create(
        model="gpt-4.1",
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
    )
    return response.choices[0].message.content.strip() 

In [None]:
# Execute the Cypher query

def execute_cypher_query(cypher_query):
    with driver.session() as session:
        result = session.run(cypher_query)
        return [record.data() for record in result]

In [None]:

def ask_graph_with_natural_output(user_input):
    print("Step 1: Generating Cypher query...")
    cypher_query = generate_cypher_query(user_input)
    print("Cypher:", cypher_query)
    print("Step 2: Executing on Neo4j...")
    results = execute_cypher_query(cypher_query)
    print("Raw results:", results)
    print("Step 3: Generating human-friendly response...")
    
    response_prompt = f"""
You are an assistant that explains the result of a database query to users in plain English.
User Question:
"{user_input}"
Query Results:
{results}
Write a clear and concise answer to the user's question based on the results.
"""
    response = client.chat.completions.create(
        model="gpt-4.1",
        messages=[{"role": "user", "content": response_prompt}],
        temperature=0.3,
    )
    answer = response.choices[0].message.content.strip()
    return answer

In [None]:
print(ask_graph_with_natural_output("Which company does Satya Nadella own?"))

### Step 5: Automate data ingestion using Prefect

You can either run Prefect on a self-hosted server or in Prefect Cloud. For this graph, we demonstrate how to set up Prefect and a CRON job in your local server to provide a steady flow of fresh data from Wikipedia. We automate three stages:

1. Data extraction
2. NER processing
3. Data loading into Neo4j


In [None]:
# Refactor the existing code with Prefect

from prefect import task, flow, get_run_logger
from datetime import datetime, timezone
import requests
from neo4j import GraphDatabase
import os


def get_wikipedia_summary(name):
    """Fetch summary text from Wikipedia API."""
    url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{name.replace(' ', '_')}"
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.json().get("extract", "")
        else:
            return ""
    except Exception:
        return ""

def extract_organizations(text, ner_pipeline):
    """Extract organizations from text using NER model."""
    orgs = set()
    for entity in ner_pipeline(text):
        if entity["entity_group"] == "ORG":
            orgs.add(entity["word"])
    return list(orgs)

def extract_locations(text, ner_pipeline):
    """Extract locations from text using NER model."""
    locations = set()
    for entity in ner_pipeline(text):
        if entity["entity_group"] == "LOC":
            locations.add(entity["word"])
    return list(locations)


@task(retries=3, retry_delay_seconds=10)
def create_knowledge_graph(ceo_to_companies, company_to_locations):
    """data in Neo4j with provenance tracking."""
    uri = os.environ["NEO4J_URI"]
    user = os.environ["NEO4J_USERNAME"]
    password = os.environ["NEO4J_PASSWORD"]
    driver = GraphDatabase.driver(uri, auth=(user, password))

    timestamp = datetime.utcnow().isoformat()
    source = "https://en.wikipedia.org/api/rest_v1/page/summary"

    with driver.session() as session:
        # Create a provenance node for this entire run
        session.run("""
            MERGE (p:Provenance {run_time: datetime($time)})
            SET p.source = $source
        """, time=timestamp, source=source)

        for ceo, companies in ceo_to_companies.items():
            session.run("""
                MERGE (c:CEO {name: $ceo})
                WITH c
                MATCH (p:Provenance {run_time: datetime($time)})
                MERGE (c)-[:EXTRACTED_FROM]->(p)
            """, ceo=ceo, time=timestamp)

            for company in companies:
                session.run("""
                    MERGE (comp:Company {name: $company})
                    WITH comp
                    MATCH (p:Provenance {run_time: datetime($time)})
                    MERGE (comp)-[:EXTRACTED_FROM]->(p)
                """, company=company, time=timestamp)

                session.run("""
                    MATCH (c:CEO {name:$ceo}), (comp:Company {name:$company})
                    MERGE (c)-[:OWNS]->(comp)
                """, ceo=ceo, company=company)

                for location in company_to_locations.get(company, []):
                    session.run("""
                        MERGE (l:Location {name: $location})
                        WITH l
                        MATCH (p:Provenance {run_time: datetime($time)})
                        MERGE (l)-[:EXTRACTED_FROM]->(p)
                    """, location=location, time=timestamp)

                    session.run("""
                        MATCH (comp:Company {name:$company}), (l:Location {name:$location})
                        MERGE (comp)-[:LOCATED_IN]->(l)
                    """, company=company, location=location)

    driver.close()


# Prefect Flow

@flow(name="knowledge-graph-pipeline")
def knowledge_graph_pipeline(ceo_names, ner_pipeline):
    """Main flow to build and update the Knowledge Graph."""
    logger = get_run_logger()

    # Extract companies for each CEO
    ceo_to_companies = {}
    for ceo in ceo_names:
        summary = get_wikipedia_summary(ceo)
        companies = extract_organizations(summary, ner_pipeline)
        ceo_to_companies[ceo] = companies

    # Collect unique companies
    all_companies = set()
    for company_list in ceo_to_companies.values():
        all_companies.update(company_list)

    # Extract locations for each company
    company_to_locations = {}
    for company in all_companies:
        summary = get_wikipedia_summary(company)
        locations = extract_locations(summary, ner_pipeline)
        company_to_locations[company] = locations

    # Load graph into Neo4j
    create_knowledge_graph(ceo_to_companies, company_to_locations)


Save the script as a `.py`  file and run locally in your terminal to test. 

PERFORM THE FOLLOWING STEPS IN YOUR TERMINAL

In [None]:
#Provide file path
python kg_pipeline.py #file was saved as kg_pipeline

In [None]:
# Create a Prefect deployment with a CRON schedule. 

prefect work-pool create "default"

# This creates a process-based work pool, meaning Prefect will run your flow on your local machine.

prefect deploy --name "kg_pipeline" --cron "0 6 * * *" --pool "Knowledge graph" kg_pipeline.py:knowledge_graph_pipeline #sets schedule to 6am UTC daily

The deployment configuration will be saved as a `.yaml` file. You can make changes to the configuration by modifying the `.yaml` file.

In [None]:
# Verify CRON schedule.

prefect deployment inspect knowledge-graph-pipeline/kg_pipeline

In [None]:
# Launch a local Prefect server

prefect server start

Keep this server running in a separate terminal window. It acts as the backend where deployments are registered, schedules are stored and Prefect Workers (the component responsible for running the flows) connect to fetch tasks. If successful, you will see this:


`PREFECT_API_URL=http://127.0.0.1:4200/api`

In [None]:
# Start the Prefect Worker

prefect worker start -p "Knowledge graph"

In [None]:
# It should connect successfully to your local server. You can confirm the connection using:

prefect work-pool inspect "Knowledge graph"