In [1]:
# import psycopg2
# import pandas as pd
# from sqlalchemy import create_engine, text
# # 连接数据库取数
# engine = create_engine('postgresql+psycopg2://postgres:123456@127.0.0.1:5432/sql_advanced') 
# df = pd.read_sql_query('SELECT * FROM test_hl', engine) 



In [2]:
import psycopg2
from psycopg2 import sql
from psycopg2.extras import DictCursor
from contextlib import contextmanager
from typing import Optional, List, Dict, Any, Union

class PostgreSQLManager:
    """PostgreSQL 数据库操作封装类"""

    def __init__(self, dbname: str, user: str, password: str, host: str = 'localhost', port: int = 5432):
        """
        初始化数据库连接参数
        :param dbname: 数据库名称
        :param user: 用户名
        :param password: 密码
        :param host: 主机地址
        :param port: 端口号
        """
        self.dbname = dbname
        self.user = user
        self.password = password
        self.host = host
        self.port = port
        self.connection = None

    def connect(self):
        """连接到 PostgreSQL 数据库"""
        try:
            self.connection = psycopg2.connect(
                dbname=self.dbname,
                user=self.user,
                password=self.password,
                host=self.host,
                port=self.port
            )
            print("成功连接到 PostgreSQL 数据库！")
        except psycopg2.Error as e:
            print(f"连接数据库失败: {e}")

    def disconnect(self):
        """断开数据库连接"""
        if self.connection:
            self.connection.close()
            print("数据库连接已关闭。")

    @contextmanager
    def get_cursor(self):
        """获取数据库游标的上下文管理器"""
        if not self.connection or self.connection.closed:
            self.connect()
        cursor = self.connection.cursor(cursor_factory=DictCursor)
        try:
            yield cursor
        finally:
            cursor.close()

    def execute_query(self, query: str, params: Optional[tuple] = None, fetch: bool = True) -> Optional[List[Dict[str, Any]]]:
        """
        执行 SQL 查询
        :param query: SQL 查询语句
        :param params: 查询参数
        :param fetch: 是否获取查询结果
        :return: 查询结果（字典列表）或 None
        """
        with self.get_cursor() as cursor:
            try:
                cursor.execute(query, params)
                if fetch:
                    return [dict(row) for row in cursor.fetchall()]
                else:
                    self.connection.commit()
                    return None
            except psycopg2.Error as e:
                self.connection.rollback()
                print(f"查询执行失败: {e}")
                return None

    def insert(self, table: str, data: Dict[str, Any]) -> Optional[int]:
        """
        插入单条数据
        :param table: 表名
        :param data: 数据字典（列名: 值）
        :return: 插入的行 ID 或 None
        """
        columns = data.keys()
        values = [data[col] for col in columns]
        query = sql.SQL("INSERT INTO {} ({}) VALUES ({}) RETURNING id").format(
            sql.Identifier(table),
            sql.SQL(', ').join(map(sql.Identifier, columns)),
            sql.SQL(', ').join(sql.Placeholder() * len(values))
        )
        with self.get_cursor() as cursor:
            try:
                cursor.execute(query, values)
                self.connection.commit()
                return cursor.fetchone()['id']
            except psycopg2.Error as e:
                self.connection.rollback()
                print(f"插入数据失败: {e}")
                return None

    def bulk_insert(self, table: str, data: List[Dict[str, Any]]) -> Optional[int]:
        """
        批量插入数据
        :param table: 表名
        :param data: 数据字典列表
        :return: 插入的行数或 None
        """
        if not data:
            return 0
        columns = data[0].keys()
        values = [[row[col] for col in columns] for row in data]
        query = sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
            sql.Identifier(table),
            sql.SQL(', ').join(map(sql.Identifier, columns)),
            sql.SQL(', ').join([sql.SQL('({})').format(sql.SQL(', ').join(sql.Placeholder() * len(columns)))] * len(data))
        )
        with self.get_cursor() as cursor:
            try:
                cursor.execute(query, [item for sublist in values for item in sublist])
                self.connection.commit()
                return cursor.rowcount
            except psycopg2.Error as e:
                self.connection.rollback()
                print(f"批量插入失败: {e}")
                return None

    def update(self, table: str, data: Dict[str, Any], condition: str, condition_params: Optional[tuple] = None) -> Optional[int]:
        """
        更新数据
        :param table: 表名
        :param data: 更新的数据字典
        :param condition: 更新条件
        :param condition_params: 条件参数
        :return: 更新的行数或 None
        """
        set_clause = sql.SQL(', ').join(
            [sql.SQL("{} = {}").format(sql.Identifier(k), sql.Placeholder()) for k in data.keys()]
        )
        query = sql.SQL("UPDATE {} SET {} WHERE {}").format(
            sql.Identifier(table),
            set_clause,
            sql.SQL(condition)
        )
        params = tuple(data.values()) + (condition_params if condition_params else ())
        with self.get_cursor() as cursor:
            try:
                cursor.execute(query, params)
                self.connection.commit()
                return cursor.rowcount
            except psycopg2.Error as e:
                self.connection.rollback()
                print(f"更新数据失败: {e}")
                return None

    def delete(self, table: str, condition: str, condition_params: Optional[tuple] = None) -> Optional[int]:
        """
        删除数据
        :param table: 表名
        :param condition: 删除条件
        :param condition_params: 条件参数
        :return: 删除的行数或 None
        """
        query = sql.SQL("DELETE FROM {} WHERE {}").format(
            sql.Identifier(table),
            sql.SQL(condition)
        )
        with self.get_cursor() as cursor:
            try:
                cursor.execute(query, condition_params)
                self.connection.commit()
                return cursor.rowcount
            except psycopg2.Error as e:
                self.connection.rollback()
                print(f"删除数据失败: {e}")
                return None

    def table_exists(self, table: str) -> bool:
        """
        检查表是否存在
        :param table: 表名
        :return: 表是否存在
        """
        query = """
            SELECT EXISTS (
                SELECT 1
                FROM information_schema.tables
                WHERE table_name = %s
            )
        """
        result = self.execute_query(query, (table,))
        return result[0]['exists'] if result else False

    def create_table(self, table: str, columns: Dict[str, str], primary_key: Optional[str] = None) -> bool:
        """
        创建表
        :param table: 表名
        :param columns: 列定义字典（列名: 类型）
        :param primary_key: 主键列名
        :return: 是否创建成功
        """
        column_defs = [sql.SQL("{} {}").format(sql.Identifier(col), sql.SQL(dtype)) for col, dtype in columns.items()]
        if primary_key:
            column_defs.append(sql.SQL("PRIMARY KEY ({})").format(sql.Identifier(primary_key)))
        query = sql.SQL("CREATE TABLE IF NOT EXISTS {} ({})").format(
            sql.Identifier(table),
            sql.SQL(', ').join(column_defs)
        )
        with self.get_cursor() as cursor:
            try:
                cursor.execute(query)
                self.connection.commit()
                return True
            except psycopg2.Error as e:
                self.connection.rollback()
                print(f"创建表失败: {e}")
                return False

In [9]:
import psycopg2
import pandas as pd
from psycopg2 import sql
from psycopg2.extras import DictCursor
from contextlib import contextmanager
from typing import Optional, List, Dict, Any, Union
from io import StringIO

def copy_expert_dataframe(df: pd.DataFrame, table: str, sep: str = ',', null: str = '\\N'):
    """
    使用 copy_expert 将 DataFrame 导入数据库
    :param df: 要导入的 DataFrame
    :param table: 目标表名
    :param sep: 分隔符
    :param null: NULL 值的表示
    
    """
    connection = psycopg2.connect(
                dbname='sql_advanced',
                user='postgres',
                password='123456',
                host='127.0.0.1',
                port=5432
            )
    

    # 将 DataFrame 转换为 CSV 格式的字符串
    buffer = StringIO()
    df.to_csv(buffer, sep=sep, header=False, index=False, na_rep=null)
    buffer.seek(0)

    # 构建 COPY 命令
    columns = df.columns.tolist()
    copy_query = sql.SQL("COPY {} ({}) FROM STDIN WITH (FORMAT CSV, DELIMITER '{}', NULL '{}')").format(
        sql.Identifier(table),
        sql.SQL(', ').join(map(sql.Identifier, columns)),
        sql.SQL(sep),
        sql.SQL(null)
    )

    with connection.cursor() as cursor:
        try:
            cursor.copy_expert(copy_query, buffer)
            connection.commit()
            print(f"成功将数据导入表 {table}，行数: {len(df)}")
        except psycopg2.Error as e:
            connection.rollback()
            print(f"copy_expert 导入失败: {e}")

    connection.close()

# 示例 DataFrame
data = {
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'age': [25, 30, 35],
    'ctm': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03'])
}
df = pd.DataFrame(data)


# 使用 copy_from 导入数据
# print("使用 copy_from 导入数据:")
# db.copy_from_dataframe(df, table="users")


# 使用 copy_expert 导入数据
print("使用 copy_expert 导入数据:")
copy_expert_dataframe(df, table="app_user.student_test")


使用 copy_expert 导入数据:
成功将数据导入表 app_user.student_test，行数: 3


In [4]:
db = PostgreSQLManager(dbname="sql_advanced", user="postgres", password="123456", host="127.0.0.1",port=5432)
# 连接到数据库
db.connect()

# PostgreSQL_HOST = '127.0.0.1'
# PostgreSQL_PORT = '5432'
# PostgreSQL_USER = 'postgres'
# PostgreSQL_PASSWORD = '123456'
# PostgreSQL_DB = 'sql_advanced'



# 1.创建表
# columns = {
#     "id": "SERIAL",
#     "name": "VARCHAR(100)",
#     "age": "INTEGER",
#     "email": "VARCHAR(100)"
# }

# table_create_res = db.create_table("app_user.users", columns, primary_key="id")
# print(table_create_res)

# 2.判断表存在
# table_exist = db.table_exists("app_user.users")
# print(table_exist)


# columns = {
#     "id": "INTEGER",
#     "name": "VARCHAR(100)",
#     "age": "INTEGER",
#     "ctm": "date"
# }

# table_create_res = db.create_table("app_user.student_test", columns, primary_key="id")
# print(table_create_res)



# 断开数据库连接
db.disconnect()

# 检查表是否存在
# if not db.table_exists("users"):
#     # 创建表
#     columns = {
#         "id": "SERIAL",
#         "name": "VARCHAR(100)",
#         "age": "INTEGER",
#         "email": "VARCHAR(100)"
#     }
#     db.create_table("users", columns, primary_key="id")

成功连接到 PostgreSQL 数据库！
True
数据库连接已关闭。


In [None]:
import psycopg2
import pandas as pd
from psycopg2 import sql
from psycopg2.extras import DictCursor
from contextlib import contextmanager
from typing import Optional, List, Dict, Any, Union
from io import StringIO

def copy_expert_dataframe(df: pd.DataFrame, table: str, sep: str = ',', null: str = '\\N'):
    """
    使用 copy_expert 将 DataFrame 导入数据库
    :param df: 要导入的 DataFrame
    :param table: 目标表名
    :param sep: 分隔符
    :param null: NULL 值的表示
    
    """
    connection = psycopg2.connect(
                dbname='sql_advanced',
                user='postgres',
                password='123456',
                host='127.0.0.1',
                port=5432
            )
    

    # 将 DataFrame 转换为 CSV 格式的字符串
    buffer = StringIO()
    df.to_csv(buffer, sep=sep, header=False, index=False, na_rep=null)
    buffer.seek(0)

    # 构建 COPY 命令
    columns = df.columns.tolist()
    copy_query = sql.SQL("COPY {} ({}) FROM STDIN WITH (FORMAT CSV, DELIMITER '{}', NULL '{}')").format(
        sql.Identifier(table),
        sql.SQL(', ').join(map(sql.Identifier, columns)),
        sql.SQL(sep),
        sql.SQL(null)
    )

    with connection.cursor() as cursor:
        try:
            cursor.copy_expert(copy_query, buffer)
            connection.commit()
            print(f"成功将数据导入表 {table}，行数: {len(df)}")
        except psycopg2.Error as e:
            connection.rollback()
            print(f"copy_expert 导入失败: {e}")

    connection.close()

# 示例 DataFrame
data = {
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'age': [25, 30, 35],
    'ctm': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03'])
}
df = pd.DataFrame(data)


# 使用 copy_from 导入数据
# print("使用 copy_from 导入数据:")
# db.copy_from_dataframe(df, table="users")


# 使用 copy_expert 导入数据
print("使用 copy_expert 导入数据:")
copy_expert_dataframe(df, table="app_user.student_test")
