In [3]:
import os
import pandas as pd
from neo4j import GraphDatabase
import time


In [4]:
# new instance from my edu account
NEO4J_URI      = os.getenv("NEO4J_URI", "neo4j+s://a0183311.databases.neo4j.io")
NEO4J_USERNAME = os.getenv("NEO4J_USERNAME", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "g9nN2A4Pp_ExSlJtescRkeZBI9BhhZulnwawbZla2oA")
NEO4J_CLEAR    = os.getenv("NEO4J_CLEAR", "false").lower() == "true"


In [5]:

# --- 1. 配置您的Aura数据库连接信息 ---
# 建议使用环境变量来保护您的凭证，但也可以直接在此处硬编码字符串。
# 例如: NEO4J_URI = "neo4j+s://xxxx.databases.neo4j.io"
CSV_FILEPATH = "MAS/MAS_FID_2025-06-19.csv"
BATCH_SIZE = 1000 # 每次向数据库发送的数据量

class AuraUploader:
    def __init__(self, uri, user, password):
        # 初始化Neo4j驱动程序
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
        print("成功初始化Neo4j驱动程序。")

    def close(self):
        # 关闭驱动程序连接
        self.driver.close()
        print("Neo4j驱动程序连接已关闭。")

    def process_csv_to_graph_data(self, filepath):
        """从CSV文件读取数据并转换为节点和关系的列表。"""
        if not os.path.exists(filepath):
            print(f"错误: 文件 '{filepath}' 未找到。")
            return None, None

        df = pd.read_csv(filepath)
        for col in df.select_dtypes(include=['object']).columns:
            df[col] = df[col].str.strip()

        nodes = {}
        relationships = []

        def add_node(node_id, label, properties=None):
            if isinstance(node_id, str) and node_id:
                if node_id not in nodes:
                    nodes[node_id] = {"id": node_id, "label": label, "properties": properties or {}}
        
        def add_relationship(source_id, target_id, rel_type):
            if isinstance(source_id, str) and source_id and isinstance(target_id, str) and target_id:
                relationships.append({"source": source_id, "target": target_id, "type": rel_type})

        for _, row in df.iterrows():
            org_name = row['Organisation Name']
            sector = row['Sector']
            licence = row['Licence Type/Status']
            activity = row['Activity/Business Type']
            sub_activity = row['Sub-Activity/Product']

            add_node(org_name, 'Organisation', {'name': org_name, 'address': row['Address'], 'phone': row['Phone Number'] if pd.notna(row['Phone Number']) else None, 'website': row['Website'] if pd.notna(row['Website']) else None})
            add_node(sector, 'Sector', {'name': sector})
            add_node(licence, 'Licence', {'name': licence})
            add_node(activity, 'Activity', {'name': activity})
            add_node(sub_activity, 'SubActivity', {'name': sub_activity})

            add_relationship(org_name, sector, 'IN_SECTOR')
            add_relationship(org_name, licence, 'HAS_LICENCE')
            add_relationship(org_name, activity, 'PERFORMS_ACTIVITY')
            if pd.notna(activity) and pd.notna(sub_activity):
                add_relationship(activity, sub_activity, 'HAS_SUB_ACTIVITY')
        
        print(f"从CSV中处理了 {len(nodes)} 个节点和 {len(relationships)} 条关系。")
        return list(nodes.values()), relationships

    def create_constraints(self):
        """在数据库中创建唯一性约束，以保证数据一致性并提高性能。"""
        print("正在创建唯一性约束...")
        with self.driver.session() as session:
            for label in ["Organisation", "Sector", "Licence", "Activity", "SubActivity"]:
                try:
                    session.run(f"CREATE CONSTRAINT unique_{label.lower()}_id IF NOT EXISTS FOR (n:{label}) REQUIRE n.id IS UNIQUE")
                    print(f"  - 约束 'unique_{label.lower()}_id' 已创建或已存在。")
                except Exception as e:
                    print(f"创建约束 {label} 时出错: {e}")
        print("约束创建完成。")

    def upload_nodes_in_batches(self, nodes):
        """将节点数据分批上传到Neo4j。"""
        print(f"开始上传 {len(nodes)} 个节点...")
        query = """
        UNWIND $batch AS node_data
        MERGE (n:%s {id: node_data.id})
        SET n += node_data.properties
        """
        # 按标签对节点进行分组，以便高效批量上传
        nodes_by_label = {}
        for node in nodes:
            label = node['label']
            if label not in nodes_by_label:
                nodes_by_label[label] = []
            # 我们只需要id和properties
            nodes_by_label[label].append({'id': node['id'], 'properties': node['properties']})

        with self.driver.session() as session:
            for label, data in nodes_by_label.items():
                print(f"  - 正在上传 {len(data)} 个 '{label}' 节点...")
                for i in range(0, len(data), BATCH_SIZE):
                    batch = data[i:i + BATCH_SIZE]
                    session.run(query % label, batch=batch)
        print("节点上传完成。")


    def upload_relationships_in_batches(self, relationships):
        """将关系数据分批上传到Neo4j。"""
        print(f"开始上传 {len(relationships)} 条关系...")
        query = """
        UNWIND $batch AS rel_data
        MATCH (source {id: rel_data.source})
        MATCH (target {id: rel_data.target})
        MERGE (source)-[:%s]->(target)
        """
        # 按类型对关系进行分组
        rels_by_type = {}
        for rel in relationships:
            rel_type = rel['type']
            if rel_type not in rels_by_type:
                rels_by_type[rel_type] = []
            rels_by_type[rel_type].append(rel)

        with self.driver.session() as session:
            for rel_type, data in rels_by_type.items():
                print(f"  - 正在上传 {len(data)} 条 '{rel_type}' 关系...")
                for i in range(0, len(data), BATCH_SIZE):
                    batch = data[i:i + BATCH_SIZE]
                    session.run(query % rel_type, batch=batch)
        print("关系上传完成。")

    def run_full_process(self, filepath):
        """执行完整的ETL流程：处理CSV -> 创建约束 -> 上传节点 -> 上传关系。"""
        start_time = time.time()
        
        nodes, relationships = self.process_csv_to_graph_data(filepath)
        if nodes is None:
            return

        self.create_constraints()
        self.upload_nodes_in_batches(nodes)
        self.upload_relationships_in_batches(relationships)
        
        end_time = time.time()
        print(f"\n✅ 全部数据成功上传到 Neo4j Aura！")
        print(f"   总耗时: {end_time - start_time:.2f} 秒。")


if __name__ == "__main__":
    # 检查凭证是否已填写
    if "在此处填入您" in NEO4J_URI or "在此处填入您" in NEO4J_PASSWORD:
        print("错误：请先在脚本中填写您的 Neo4j Aura 连接凭证 (NEO4J_URI, NEO4J_PASSWORD)。")
    else:
        uploader = AuraUploader(NEO4J_URI, NEO4J_USERNAME, NEO4J_PASSWORD)
        uploader.run_full_process(CSV_FILEPATH)
        uploader.close()

成功初始化Neo4j驱动程序。
从CSV中处理了 2866 个节点和 28362 条关系。
正在创建唯一性约束...
  - 约束 'unique_organisation_id' 已创建或已存在。
  - 约束 'unique_sector_id' 已创建或已存在。
  - 约束 'unique_licence_id' 已创建或已存在。
  - 约束 'unique_activity_id' 已创建或已存在。
  - 约束 'unique_subactivity_id' 已创建或已存在。
约束创建完成。
开始上传 2866 个节点...
  - 正在上传 2765 个 'Organisation' 节点...
  - 正在上传 5 个 'Sector' 节点...
  - 正在上传 50 个 'Licence' 节点...
  - 正在上传 38 个 'Activity' 节点...
  - 正在上传 8 个 'SubActivity' 节点...
节点上传完成。
开始上传 28362 条关系...
  - 正在上传 8234 条 'IN_SECTOR' 关系...
  - 正在上传 8234 条 'HAS_LICENCE' 关系...
  - 正在上传 7359 条 'PERFORMS_ACTIVITY' 关系...
  - 正在上传 4535 条 'HAS_SUB_ACTIVITY' 关系...
关系上传完成。

✅ 全部数据成功上传到 Neo4j Aura！
   总耗时: 123.22 秒。
Neo4j驱动程序连接已关闭。


In [6]:
!pip install thefuzz python-Levenshtein

Collecting thefuzz
  Downloading thefuzz-0.22.1-py3-none-any.whl.metadata (3.9 kB)
Collecting python-Levenshtein
  Using cached python_levenshtein-0.27.1-py3-none-any.whl.metadata (3.7 kB)
Collecting rapidfuzz<4.0.0,>=3.0.0 (from thefuzz)
  Downloading rapidfuzz-3.13.0-cp39-cp39-win_amd64.whl.metadata (12 kB)
Collecting Levenshtein==0.27.1 (from python-Levenshtein)
  Downloading levenshtein-0.27.1-cp39-cp39-win_amd64.whl.metadata (3.6 kB)
Downloading thefuzz-0.22.1-py3-none-any.whl (8.2 kB)
Downloading rapidfuzz-3.13.0-cp39-cp39-win_amd64.whl (1.6 MB)
   ---------------------------------------- 0.0/1.6 MB ? eta -:--:--
   ------------------------- -------------- 1.0/1.6 MB 10.1 MB/s eta 0:00:01
   ---------------------------------------- 1.6/1.6 MB 8.7 MB/s eta 0:00:00
Using cached python_levenshtein-0.27.1-py3-none-any.whl (9.4 kB)
Downloading levenshtein-0.27.1-cp39-cp39-win_amd64.whl (100 kB)
Installing collected packages: rapidfuzz, thefuzz, Levenshtein, python-Levenshtein

   ----

In [7]:
import os
import pandas as pd
from neo4j import GraphDatabase
from datetime import datetime # 导入datetime模块
from thefuzz import process as fuzzy_process

# --- 1. 配置 ---
PERSONNEL_CSV_PATH = "MAS/MAS_Personnel.csv"
BATCH_SIZE = 500
FUZZY_MATCH_THRESHOLD = 90

class DateTimeVersionedUploader:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
        # --- 核心改动在这里 ---
        # 为本次运行生成一个datetime对象作为版本时间戳
        self.run_timestamp = datetime.now() 
        print(f"✅ 成功初始化Neo4j驱动程序。本次运行的版本时间戳: {self.run_timestamp.isoformat()}")

    def close(self):
        self.driver.close()
        print("✅ Neo4j驱动程序连接已关闭。")

    def run_cypher_query(self, query, params=None):
        with self.driver.session() as session:
            try:
                result = session.run(query, params)
                return [record for record in result]
            except Exception as e:
                print(f"❌ Cypher查询执行出错: {e}")
                return []

    def create_constraints(self):
        print("\n--- 步骤 1: 确保约束存在 ---")
        constraints = [
            "CREATE CONSTRAINT unique_organisation_id IF NOT EXISTS FOR (n:Organisation) REQUIRE n.id IS UNIQUE",
            "CREATE CONSTRAINT unique_person_name IF NOT EXISTS FOR (p:Person) REQUIRE p.name IS UNIQUE"
        ]
        for constraint in constraints:
            self.run_cypher_query(constraint)
        print("✅ 所有约束已创建或已存在。")

    def upload_personnel_data(self, filepath):
        print(f"\n--- 步骤 2: 处理并上传人员数据 ({filepath}) ---")
        
        # 数据处理部分与之前完全相同
        print("  - 正在从Neo4j获取现有机构名称...")
        existing_org_names = [r["name"] for r in self.run_cypher_query("MATCH (o:Organisation) RETURN o.name AS name")]
        if not os.path.exists(filepath):
            print(f"❌ 错误: 文件 '{filepath}' 未找到。")
            return
        personnel_df = pd.read_csv(filepath)
        unique_csv_org_names = personnel_df['company_name'].unique()
        name_mapping = {csv_name: fuzzy_process.extractOne(csv_name, existing_org_names)[0] 
                        for csv_name in unique_csv_org_names 
                        if fuzzy_process.extractOne(csv_name, existing_org_names)[1] >= FUZZY_MATCH_THRESHOLD}

        persons_to_upload = [{'name': name} for name in personnel_df['person_name'].unique()]
        relationships_to_upload = []
        for _, row in personnel_df.iterrows():
            if row['company_name'] in name_mapping:
                relationships_to_upload.append({
                    "org_name": name_mapping[row['company_name']],
                    "person_name": row['person_name'],
                    "role": str(row['role']).lower()})

        # 上传Person节点
        if persons_to_upload:
            query_persons = "UNWIND $batch AS p MERGE (:Person {name: p.name})"
            for i in range(0, len(persons_to_upload), BATCH_SIZE):
                self.run_cypher_query(query_persons, params={"batch": persons_to_upload[i:i + BATCH_SIZE]})
            print("  - ✅ Person节点上传成功。")

        # --- 上传关系，并打上datetime版本时间戳 ---
        if relationships_to_upload:
            query_rels = """
            UNWIND $batch AS rel_data
            MATCH (o:Organisation {name: rel_data.org_name})
            MATCH (p:Person {name: rel_data.person_name})
            MERGE (o)-[r:HAS_OFFICER]->(p)
            SET r.role = rel_data.role, r.last_updated_run = $timestamp
            """
            for i in range(0, len(relationships_to_upload), BATCH_SIZE):
                batch = relationships_to_upload[i:i + BATCH_SIZE]
                self.run_cypher_query(query_rels, params={"batch": batch, "timestamp": self.run_timestamp})
            print("  - ✅ HAS_OFFICER关系及datetime版本戳上传成功。")


In [None]:
if "在此处填入您" in NEO4J_URI or "在此处填入您" in NEO4J_PASSWORD:
    print("❌ 错误：请先在脚本中填写您的 Neo4j Aura 连接凭证。")
else:
    uploader = DateTimeVersionedUploader(NEO4J_URI, NEO4J_USERNAME, NEO4J_PASSWORD)
    uploader.create_constraints()
    uploader.upload_personnel_data(PERSONNEL_CSV_PATH)
    uploader.close()
    print(f"\n🎉 流程执行完毕！")