In [1]:
!pip install py2neo

Looking in indexes: http://mirrors.aliyun.com/pypi/simple
Collecting py2neo
  Downloading http://mirrors.aliyun.com/pypi/packages/b6/e5/d97c8adbda3b9a6957c572f05a8427661194832d4709b90c108b7e868268/py2neo-2021.2.4-py2.py3-none-any.whl (177 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m177.2/177.2 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
Collecting interchange~=2021.0.4 (from py2neo)
  Downloading http://mirrors.aliyun.com/pypi/packages/88/bd/abc58e5a36a28e0e55501f4bc15df74e430f399375b14b83f4ce22a257b4/interchange-2021.0.4-py2.py3-none-any.whl (28 kB)
Collecting monotonic (from py2neo)
  Downloading http://mirrors.aliyun.com/pypi/packages/9a/67/7e8406a29b6c45be7af7740456f7f37025f0506ae2e05fb9009a53946860/monotonic-1.6-py2.py3-none-any.whl (8.2 kB)
Collecting pansi>=2020.7.3 (from py2neo)
  Downloading http://mirrors.aliyun.com/pypi/packages/0b/81/8d19773a1da754e6c9342ca7801d0d75775425ea729e127e4851de63d8da/pansi-2024.11.0-py2.py3-none-any.whl (26 kB)
Installing 

In [3]:
import os
import csv
from py2neo import Graph, Node, Relationship

# 配置 Neo4j 连接
NEO4J_URI = "neo4j+s://33daa25d.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "oykvwpgg99SOby9BLlbWsjQ14QWMQh_VrLkY-bMdCMU"

# 创建图实例
graph = Graph(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))

# 清空图数据（可选）
graph.delete_all()

# 映射：文件名 → 主键字段
NODE_KEYS = {
    'supplier_nodes.csv': 'supplierId:ID(Supplier)',
    'shipper_nodes.csv': 'shipperId:ID(Shipper)',
    'review_nodes.csv': 'reviewId:ID(Review)',
    'product_nodes.csv': 'productId:ID(Product)',
    'employee_nodes.csv': 'employeeId:ID(Employee)',
    'customer_nodes.csv': 'customerId:ID(Customer)',
    'category_nodes.csv': 'categoryId:ID(Category)'
}

# 映射：文件名 → 标签字段
LABEL_KEYS = {
    'supplier_nodes.csv': 'labels:LABEL',
    'shipper_nodes.csv': 'labels:LABEL',
    'review_nodes.csv': 'labels:LABEL',
    'product_nodes.csv': 'labels:LABEL',
    'employee_nodes.csv': 'labels:LABEL',
    'customer_nodes.csv': 'labels:LABEL',
    'category_nodes.csv': 'labels:LABEL'
}

# 映射：边文件 → 边类型
EDGE_TYPES = {
    'review_product_edges.csv': ('Review', 'Product', 'ABOUT'),
    'product_supplier_edges.csv': ('Product', 'Supplier', 'SUPPLIED_BY'),
    'customer_order_edges.csv': ('Customer', 'Order', 'PLACED'),
    'order_shipper_edges.csv': ('Order', 'Shipper', 'SHIPPED_VIA'),
    'order_product_edges.csv': ('Order', 'Product', 'CONTAINS'),
    'product_category_edges.csv': ('Product', 'Category', 'BELONGS_TO'),
    'customer_review_edges.csv': ('Customer', 'Review', 'WROTE'),
    'employee_reports_to_edges.csv': ('Employee', 'Employee', 'REPORTS_TO'),
    'employee_order_edges.csv': ('Employee', 'Order', 'PROCESSED')
}

# 创建节点
def create_nodes_from_csv(filepath, primary_key, label_key):
    filename = os.path.basename(filepath)
    print(f"导入节点：{filename}")
    with open(filepath, encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            node_id = row.pop(primary_key)
            labels = row.pop(label_key).split(',')
            node = Node(*labels, id=node_id, **{k: v for k, v in row.items() if v})
            graph.create(node)

# 创建边
def create_edges_from_csv(filepath, start_label, end_label, rel_type):
    filename = os.path.basename(filepath)
    print(f"导入边：{filename}")
    with open(filepath, encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            start_id = row[':START_ID(' + start_label + ')']
            end_id = row[':END_ID(' + end_label + ')']
            start_node = graph.nodes.match(start_label, id=start_id).first()
            end_node = graph.nodes.match(end_label, id=end_id).first()
            if start_node and end_node:
                rel = Relationship(start_node, rel_type, end_node)
                graph.create(rel)

# 遍历当前目录下的所有 CSV 文件
for file in os.listdir('.'):
    if not file.endswith('.csv'):
        continue
    if 'nodes' in file:
        key = NODE_KEYS.get(file)
        label_key = LABEL_KEYS.get(file)
        if key:
            create_nodes_from_csv(file, key, label_key)
    elif 'edges' in file:
        info = EDGE_TYPES.get(file)
        if info:
            create_edges_from_csv(file, *info)

print("✅ 所有数据已成功导入 Neo4j！")

导入节点：category_nodes.csv
导入节点：customer_nodes.csv
导入边：customer_order_edges.csv
导入边：customer_review_edges.csv
导入节点：employee_nodes.csv
导入边：employee_order_edges.csv
导入边：employee_reports_to_edges.csv
导入边：order_product_edges.csv
导入边：order_shipper_edges.csv
导入边：product_category_edges.csv
导入节点：product_nodes.csv
导入边：product_supplier_edges.csv
导入节点：review_nodes.csv
导入边：review_product_edges.csv
导入节点：shipper_nodes.csv
导入节点：supplier_nodes.csv
✅ 所有数据已成功导入 Neo4j！


In [2]:
import os
BASE_PATH = os.path.join(os.getcwd(), "ceshi")
print(BASE_PATH)

/root/autodl-tmp/kefu_2/graphrag_2.1.0/graphrag/origin_data/data/ceshi


In [6]:
from neo4j import GraphDatabase, unit_of_work
import os
import time

# Neo4j数据库连接配置
URI = "neo4j+s://981d8a45.databases.neo4j.io"
AUTH = ("neo4j", "7bhGR2TUaVYg3N72GVGSf2g8DP16R-tmOXa4Vid0PFw")  # 替换为实际用户名和密码

# 文件路径配置（假设CSV文件在当前目录的import文件夹中）
BASE_PATH = os.path.join(os.getcwd(), "ceshi")
print("------------")
print(BASE_PATH)

# 节点文件列表
NODE_FILES = [
    "supplier_nodes.csv",
    "shipper_nodes.csv",
    "review_nodes.csv",
    "product_nodes.csv",
    "customer_nodes.csv",
    "employee_nodes.csv",
    "category_nodes.csv",
    "order_nodes.csv"
]

# 关系文件列表
RELATIONSHIP_FILES = [
    "review_product_edges.csv",
    "product_supplier_edges.csv",
    "customer_order_edges.csv",
    "order_shipper_edges.csv",
    "order_product_edges.csv",
    "employee_reports_to_edges.csv",
    "employee_order_edges.csv",
    "customer_review_edges.csv",
    "product_category_edges.csv"
]

class Neo4jImporter:
    def __init__(self, uri, auth):
        self.driver = GraphDatabase.driver(uri, auth=auth)
    
    def close(self):
        self.driver.close()
    
    @unit_of_work(timeout=300)
    def import_nodes(self, file_name, tx, batch_size=1000):
        print("=================")
        print(file_name)
        file_path = os.path.join(BASE_PATH, file_name)
        label = file_name.split("_")[0].capitalize()
        
        # 根据文件名确定节点标签和属性映射
        if file_name == "supplier_nodes.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            CREATE (n:Supplier {
                supplierId: row.`supplierId:ID(Supplier)`,
                CompanyName: row.CompanyName,
                ContactName: row.ContactName,
                ContactTitle: row.ContactTitle,
                Address: row.Address,
                City: row.City,
                Region: row.Region,
                PostalCode: row.PostalCode,
                Country: row.Country,
                Phone: row.Phone,
                Fax: row.Fax,
                HomePage: row.HomePage
            })
            """
        
        elif file_name == "shipper_nodes.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            CREATE (n:Shipper {
                shipperId: row.`shipperId:ID(Shipper)`,
                CompanyName: row.CompanyName,
                Phone: row.Phone
            })
            """
        
        elif file_name == "review_nodes.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            CREATE (n:Review {
                reviewId: row.`reviewId:ID(Review)`,
                ProductName: row.ProductName,
                CustomerName: row.CustomerName,
                Rating: toFloat(row.Rating),
                ReviewText: row.ReviewText,
                ReviewDate: date(row.ReviewDate),
                CategoryName: row.CategoryName
            })
            """
        
        elif file_name == "product_nodes.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            CREATE (n:Product {
                productId: row.`productId:ID(Product)`,
                ProductName: row.ProductName,
                SupplierID: row.SupplierID,
                CategoryID: row.CategoryID,
                QuantityPerUnit: row.QuantityPerUnit,
                UnitPrice: toFloat(row.UnitPrice),
                UnitsInStock: toInteger(row.UnitsInStock),
                UnitsOnOrder: toInteger(row.UnitsOnOrder),
                ReorderLevel: toInteger(row.ReorderLevel),
                Discontinued: toInteger(row.Discontinued),
                CategoryName: row.CategoryName,
                SupplierName: row.SupplierName
            })
            """
        
        elif file_name == "customer_nodes.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            CREATE (n:Customer {
                customerId: row.`customerId:ID(Customer)`,
                CompanyName: row.CompanyName,
                ContactName: row.ContactName,
                ContactTitle: row.ContactTitle,
                Address: row.Address,
                City: row.City,
                Region: row.Region,
                PostalCode: row.PostalCode,
                Country: row.Country,
                Phone: row.Phone,
                Fax: row.Fax
            })
            """
        
        elif file_name == "employee_nodes.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            CREATE (n:Employee {
                employeeId: row.`employeeId:ID(Employee)`,
                LastName: row.LastName,
                FirstName: row.FirstName,
                Title: row.Title,
                TitleOfCourtesy: row.TitleOfCourtesy,
                BirthDate: date(row.BirthDate),
                HireDate: date(row.HireDate),
                Address: row.Address,
                City: row.City,
                Region: row.Region,
                PostalCode: row.PostalCode,
                Country: row.Country,
                HomePhone: row.HomePhone,
                Extension: row.Extension,
                Notes: row.Notes,
                ReportsTo: row.ReportsTo
            })
            """
        
        elif file_name == "category_nodes.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            CREATE (n:Category {
                categoryId: row.`categoryId:ID(Category)`,
                CategoryName: row.CategoryName,
                Description: row.Description
            })
            """
        
        elif file_name == "order_nodes.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            CREATE (n:Order {
                orderId: row.`orderId:ID(Order)`,
                OrderDate: datetime(replace(row.OrderDate, ' ', 'T')),
                RequiredDate: datetime(replace(row.RequiredDate, ' ', 'T')),
                ShippedDate: datetime(replace(row.ShippedDate, ' ', 'T')),
                Freight: toFloat(row.Freight),
                ShipName: row.ShipName,
                ShipAddress: row.ShipAddress,
                ShipCity: row.ShipCity,
                ShipRegion: row.ShipRegion,
                ShipPostalCode: row.ShipPostalCode,
                ShipCountry: row.ShipCountry,
                CustomerName: row.CustomerName,
                LastName: row.LastName,
                FirstName: row.FirstName,
                ShipperName: row.ShipperName
            })
            """
        
        else:
            print(f"Unsupported node file: {file_name}")
            return
        
        tx.run(query, file_path=f"file:///{file_path}")
        print(f"Imported {label} nodes from {file_name}")
    
    @unit_of_work(timeout=300)
    def import_relationships(self, file_name, tx):
        file_path = os.path.join(BASE_PATH, file_name)
        
        # 根据文件名确定关系类型和属性
        if file_name == "review_product_edges.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            MATCH (review:Review {reviewId: row.`:START_ID(Review)`})
            MATCH (product:Product {productId: row.`:END_ID(Product)`})
            CREATE (review)-[:ABOUT]->(product)
            """
        
        elif file_name == "product_supplier_edges.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            MATCH (product:Product {productId: row.`:START_ID(Product)`})
            MATCH (supplier:Supplier {supplierId: row.`:END_ID(Supplier)`})
            CREATE (product)-[:SUPPLIED_BY]->(supplier)
            """
        
        elif file_name == "customer_order_edges.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            MATCH (customer:Customer {customerId: row.`:START_ID(Customer)`})
            MATCH (order:Order {orderId: row.`:END_ID(Order)`})
            CREATE (customer)-[:PLACED]->(order)
            """
        
        elif file_name == "order_shipper_edges.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            MATCH (order:Order {orderId: row.`:START_ID(Order)`})
            MATCH (shipper:Shipper {shipperId: row.`:END_ID(Shipper)`})
            CREATE (order)-[:SHIPPED_VIA]->(shipper)
            """
        
        elif file_name == "order_product_edges.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            MATCH (order:Order {orderId: row.`:START_ID(Order)`})
            MATCH (product:Product {productId: row.`:END_ID(Product)`})
            CREATE (order)-[r:CONTAINS {
                UnitPrice: toFloat(row.UnitPrice),
                Quantity: toInteger(row.Quantity),
                Discount: toFloat(row.Discount)
            }]->(product)
            """
        
        elif file_name == "employee_reports_to_edges.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            MATCH (employee:Employee {employeeId: row.`:START_ID(Employee)`})
            MATCH (manager:Employee {employeeId: row.`:END_ID(Employee)`})
            CREATE (employee)-[:REPORTS_TO]->(manager)
            """
        
        elif file_name == "employee_order_edges.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            MATCH (employee:Employee {employeeId: row.`:START_ID(Employee)`})
            MATCH (order:Order {orderId: row.`:END_ID(Order)`})
            CREATE (employee)-[:PROCESSED]->(order)
            """
        
        elif file_name == "customer_review_edges.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            MATCH (customer:Customer {customerId: row.`:START_ID(Customer)`})
            MATCH (review:Review {reviewId: row.`:END_ID(Review)`})
            CREATE (customer)-[:WROTE]->(review)
            """
        
        elif file_name == "product_category_edges.csv":
            query = """
            LOAD CSV WITH HEADERS FROM $file_path AS row
            MATCH (product:Product {productId: row.`:START_ID(Product)`})
            MATCH (category:Category {categoryId: row.`:END_ID(Category)`})
            CREATE (product)-[:BELONGS_TO]->(category)
            """
        
        else:
            print(f"Unsupported relationship file: {file_name}")
            return
        
        tx.run(query, file_path=f"file:///{file_path}")
        print(f"Imported relationships from {file_name}")
    
    def import_all(self):
        with self.driver.session() as session:
            print("Starting node import...")
            for node_file in NODE_FILES:
                session.execute_write(self.import_nodes, node_file)
                time.sleep(0.5)  # 避免资源争用
            
            print("\nStarting relationship import...")
            for rel_file in RELATIONSHIP_FILES:
                session.execute_write(self.import_relationships, rel_file)
                time.sleep(0.5)
        
        print("\nImport completed successfully!")

if __name__ == "__main__":
    # 确保import目录存在
    if not os.path.exists(BASE_PATH):
        os.makedirs(BASE_PATH)
        print(f"Created directory: {BASE_PATH}")
    
    importer = Neo4jImporter(URI, AUTH)
    try:
        importer.import_all()
    finally:
        importer.close()

------------
/root/autodl-tmp/kefu_2/graphrag_2.1.0/graphrag/origin_data/data/ceshi
Starting node import...
<neo4j._sync.work.transaction.ManagedTransaction object at 0x7fc2d7d43110>


TypeError: join() argument must be str, bytes, or os.PathLike object, not 'ManagedTransaction'