In [7]:
import pandas as pd, rdflib as rdf, yaml, pathlib, datetime

In [41]:
from urllib.parse import quote_plus

BASE = "https://example.org/retail/"

def uri(entity_type: str, key) -> str:
    """
    Mint a deterministic URI like
    https://example.org/retail/product/123 or
    https://example.org/retail/household/H001

    entity_type : short slug ('product', 'household', 'store', 'txn', …)
    key         : primary-key value from the CSV (str or int)
    """
    return f"{BASE}{entity_type}/{quote_plus(str(key))}"

from neo4j import GraphDatabase, basic_auth
import glob, pathlib, time

URI, AUTH = "bolt://localhost:7687", basic_auth("neo4j", "sebastienM19")
driver = GraphDatabase.driver(URI, auth=AUTH)

In [42]:
CFG = yaml.safe_load(open("config.yaml"))
g = rdf.Graph()
NS = {pfx: rdf.Namespace(uri) for pfx, uri in CFG["prefixes"].items()}
for pfx, ns in NS.items():
    g.bind(pfx, ns)
    
def new_graph():
    g = rdf.Graph()
    for pfx, uri in CFG["prefixes"].items():
        g.bind(pfx, rdf.Namespace(uri))
    return g

def day_time_to_iso(day, trans_time):
    # DAY is offset in dataset (1-711); Jan-01-2017 baseline
    date = datetime.date(2017, 1, 1) + datetime.timedelta(int(day))
    hh, mm = divmod(int(trans_time), 100)
    return datetime.datetime.combine(date, datetime.time(hh, mm)).isoformat()

chunksize = 1000
for chunk in pd.read_csv("../../../data/transaction_data.csv", chunksize=chunksize):
    g = new_graph()
    for row in chunk.itertuples():
        txn = rdf.URIRef(uri("txn", row.BASKET_ID))
        g.add((txn, rdf.RDF.type, NS["rio"].Transaction))
        g.add((txn, NS["rio"].hasCustomer, rdf.URIRef(uri("household", row.household_key))))
        g.add((txn, NS["time"].inXSDDateTime, rdf.Literal(day_time_to_iso(row.DAY, row.TRANS_TIME), datatype=rdf.XSD.dateTime)))
        g.add((txn, NS["rio"].occurredAt, rdf.URIRef(uri("store", row.STORE_ID))))
        # line item
        line = rdf.BNode()
        g.add((line, rdf.RDF.type, NS["rio"].TransactionLine))
        g.add((line, NS["rio"].lineProduct, rdf.URIRef(uri("product", row.PRODUCT_ID))))
        g.add((txn, NS["rio"].containsLine, line))
        g.add((line, NS["rio"].lineProduct, rdf.URIRef(uri("product", row.PRODUCT_ID))))
        g.add((line, NS["rio"].quantity, rdf.Literal(int(row.QUANTITY))))
        g.add((line, NS["rio"].lineTotal, rdf.Literal(float(row.SALES_VALUE))))
    ttl_file = pathlib.Path("export") / f"transactions_{chunk.index.start}.ttl"
    g.serialize(destination=ttl_file, format="turtle")
    g.remove((None, None, None)) 

KeyboardInterrupt: 

In [33]:
df = pd.read_csv("../../../data/product.csv", dtype=str).fillna("")
for row in df.itertuples(index=False):
    prd = rdf.URIRef(uri("product", row.PRODUCT_ID))
    g.add((prd, rdf.RDF.type,           NS["gr"].ProductOrServiceModel))
    g.add((prd, NS["gr"].name,          rdf.Literal(row.SUB_COMMODITY_DESC)))
    g.add((prd, NS["schema"].brand,     rdf.Literal(row.BRAND)))

    # manufacturer as an Organization node
    if row.MANUFACTURER:
        m_uri = rdf.URIRef(uri("manufacturer", row.MANUFACTURER))
        g.add((m_uri, rdf.RDF.type,      NS["schema"].Organization))
        g.add((prd,  NS["schema"].manufacturer, m_uri))

    # size-of-product literal (keep text, e.g. "12 OZ")
    if row.CURR_SIZE_OF_PRODUCT:
        g.add((prd, NS["schema"].packageQuantity,
               rdf.Literal(row.CURR_SIZE_OF_PRODUCT)))

    # taxonomy: Department ⭢ Commodity ⭢ Sub-commodity
    dept_uri      = rdf.URIRef(uri("dept", row.DEPARTMENT))
    commod_uri    = rdf.URIRef(uri("commodity", row.COMMODITY_DESC))
    subcommod_uri = rdf.URIRef(uri("subcomm", row.SUB_COMMODITY_DESC))

    g.add((dept_uri,      rdf.RDF.type, NS["skos"].Concept))
    g.add((commod_uri,    rdf.RDF.type, NS["skos"].Concept))
    g.add((subcommod_uri, rdf.RDF.type, NS["skos"].Concept))

    g.add((commod_uri,    NS["skos"].broader, dept_uri))
    g.add((subcommod_uri, NS["skos"].broader, commod_uri))
    g.add((prd,           NS["skos"].broader, subcommod_uri))

# ---  write -------------------------------------------------------------------
export = pathlib.Path("export")
export.mkdir(exist_ok=True)
g.serialize(destination=export / "products.ttl", format="turtle")
print("✓ products.ttl written (", len(df), "products )")

✓ products.ttl written ( 92353 products )


In [34]:
df = pd.read_csv("../../../data/hh_demographic.csv", dtype=str).fillna("")
def parse_household_size(size_text: str) -> int | None:
    """Convert '1-2' → 2, '3' → 3, '5+' → 5 ."""
    if not size_text:
        return None
    if "-" in size_text:
        return int(size_text.split("-")[-1])
    if size_text.endswith("+"):
        return int(size_text[:-1])
    return int(size_text)

for row in df.itertuples(index=False):
    hh = rdf.URIRef(uri("household", row.household_key))
    g.add((hh, rdf.RDF.type,         NS["foaf"].Group))

    # simple literals as RUPO predicates
    if row.AGE_DESC:
        g.add((hh, NS["rupo"].ageBand, rdf.Literal(row.AGE_DESC)))
    if row.INCOME_DESC:
        g.add((hh, NS["rupo"].incomeBand, rdf.Literal(row.INCOME_DESC)))
    if row.HOMEOWNER_DESC:
        g.add((hh, NS["rupo"].homeownerStatus, rdf.Literal(row.HOMEOWNER_DESC)))
    if row.MARITAL_STATUS_CODE:
        g.add((hh, NS["rupo"].maritalStatus, rdf.Literal(row.MARITAL_STATUS_CODE)))

    # numeric size (optional)
    size_val = parse_household_size(row.HOUSEHOLD_SIZE_DESC)
    if size_val:
        g.add((hh, NS["rupo"].householdSize, rdf.Literal(size_val)))

    if row.KID_CATEGORY_DESC:
        g.add((hh, NS["rupo"].kidCategory, rdf.Literal(row.KID_CATEGORY_DESC)))

# ---  write -------------------------------------------------------------------
export = pathlib.Path("export")
export.mkdir(exist_ok=True)
g.serialize(destination=export / "households.ttl", format="turtle")
print("✓ households.ttl written (", len(df), "households )")

✓ households.ttl written ( 801 households )


In [None]:
def register_prefixes():
    with driver.session() as session:
        for prefix, uri in CFG["prefixes"].items():
            session.run("""
                CALL n10s.nsprefixes.add($prefix, $namespace)
            """, prefix=prefix, namespace=uri)

register_prefixes()
print("✓ Registered RDF namespace prefixes.")

In [35]:


def import_ttl(path):
    with driver.session() as s:
        s.run("""
            CALL n10s.rdf.import.fetch($url,'Turtle',{ handleVocabUris: "SHORTEN" })
        """, url=f"file:///{path}")

for ttl in glob.glob("export/*.ttl"):
    print("loading", pathlib.Path(ttl).name)
    import_ttl(ttl)
    time.sleep(0.1)

loading transactions_250000.ttl
loading transactions_233000.ttl
loading transactions_184000.ttl
loading transactions_242000.ttl
loading transactions_188000.ttl
loading transactions_196000.ttl
loading transactions_221000.ttl
loading transactions_274000.ttl
loading transactions_81000.ttl
loading transactions_209000.ttl
loading transactions_217000.ttl
loading transactions_266000.ttl
loading transactions_93000.ttl
loading transactions_278000.ttl
loading transactions_205000.ttl
loading transactions_27000.ttl
loading transactions_165000.ttl
loading transactions_118000.ttl
loading transactions_39000.ttl
loading transactions_106000.ttl
loading transactions_44000.ttl
loading transactions_35000.ttl
loading transactions_177000.ttl
loading transactions_48000.ttl
loading transactions_169000.ttl
loading transactions_114000.ttl
loading transactions_56000.ttl
loading transactions_141000.ttl
loading transactions_60000.ttl
loading transactions_122000.ttl
loading transactions_153000.ttl
loading transacti

In [36]:
def import_ttl(filename: pathlib.Path):
    url = f"file:///import/{filename.name}"
    with driver.session() as s:
        s.run("""
            CALL n10s.rdf.import.fetch(
              $url, 'Turtle', { handleVocabUris: "SHORTEN" })
        """, url=url)

In [37]:
export_dir = pathlib.Path("export")

In [38]:
def import_schema(path):
    with driver.session() as session:
        session.run("""
            CALL n10s.rdf.import.fetch($url, 'Turtle', { handleVocabUris: "SHORTEN", typesToLabels: true })
        """, url=f"file:///{path}")

import_schema("export/schema.ttl")
print("✓ Imported schema.ttl into Neo4j")

loading households.ttl
loading products.ttl
loading transactions_0.ttl
loading transactions_1000.ttl
loading transactions_10000.ttl
loading transactions_100000.ttl
loading transactions_101000.ttl
loading transactions_102000.ttl
loading transactions_103000.ttl
loading transactions_104000.ttl
loading transactions_105000.ttl
loading transactions_106000.ttl
loading transactions_107000.ttl
loading transactions_108000.ttl
loading transactions_109000.ttl
loading transactions_11000.ttl
loading transactions_110000.ttl
loading transactions_111000.ttl
loading transactions_112000.ttl
loading transactions_113000.ttl
loading transactions_114000.ttl
loading transactions_115000.ttl
loading transactions_116000.ttl
loading transactions_117000.ttl
loading transactions_118000.ttl
loading transactions_119000.ttl
loading transactions_12000.ttl
loading transactions_120000.ttl
loading transactions_121000.ttl
loading transactions_122000.ttl
loading transactions_123000.ttl
loading transactions_124000.ttl
loadin

In [39]:
with driver.session() as s:
    res = s.run("""
        CALL n10s.rdf.import.fetch(
          "file:///import/transactions_88000.ttl", "Turtle",
          { handleVocabUris:"SHORTEN" })
    """)
    print(res.data())

[{'terminationStatus': 'KO', 'triplesLoaded': 0, 'triplesParsed': 0, 'namespaces': None, 'extraInfo': "The following constraint is required for importing RDF. Please run 'CREATE CONSTRAINT n10s_unique_uri FOR (r:Resource) REQUIRE r.uri IS UNIQUE' and try again.", 'callParams': None}]


In [43]:
for fp in sorted(export_dir.glob("*.ttl")):
    url = f"file:///import/{fp.name}"
    with driver.session(database="neo4j") as s:
        summary = s.run("""
            CALL n10s.rdf.import.fetch(
              $url,'Turtle',{ handleVocabUris:'SHORTEN', batchSize:10000 })
        """, url=url).single()
        print(fp.name, summary["triplesLoaded"])

households.ttl 5607
products.ttl 567562
schema.ttl 10
transactions_0.ttl 53656
transactions_10000.ttl 54172
transactions_20000.ttl 53984
transactions_30000.ttl 54264
transactions_40000.ttl 54304
transactions_50000.ttl 54432
transactions_60000.ttl 54764
transactions_70000.ttl 54288
transactions_80000.ttl 54460
transactions_90000.ttl 54356
