# DynamoDB 数据上传工具

这个Notebook用于将CSV文件上传到DynamoDB表中。目前支持以下CSV文件：
- fund_basic_info.csv - 基金基本信息
- fund_manager_em.csv - 基金经理信息
- fund_performance_info.csv - 基金业绩信息

In [14]:
import boto3
import pandas as pd
import os
import yaml
from decimal import Decimal
import json
from tqdm import tqdm
from boto3.dynamodb.conditions import Key, Attr

## 读取配置文件

In [15]:
def read_yaml_file(file_path):
    with open(file_path, "r") as file:
        try:
            return yaml.safe_load(file)
        except yaml.YAMLError as e:
            print(f"Error reading YAML file: {e}")
            return None

# 获取当前目录的父目录路径
current_dir = os.path.dirname(os.path.abspath("__file__"))
parent_dir = os.path.dirname(current_dir)
config_path = os.path.join(parent_dir, "prereqs", "prereqs_config.yaml")

# 读取配置文件
config = read_yaml_file(config_path)
print("配置文件内容:")
print(config)

配置文件内容:
{'knowledge_base_name': 'fsi-fund-knowledge', 'knowledge_base_description': 'bedrock-allow', 'kb_files_path': 'data', 'tables': [{'table_name': 'fund_basic_info', 'pk_item': 'fund_code', 'sk_item': 'fund_name'}, {'table_name': 'fund_manager_info', 'pk_item': 'fund_code', 'sk_item': 'fund_name'}, {'table_name': 'fund_fee_structure', 'pk_item': 'fund_code', 'sk_item': 'fee_type'}, {'table_name': 'fund_performance', 'pk_item': 'fund_code', 'sk_item': 'report_date'}]}


## 连接到DynamoDB

In [16]:
# 创建DynamoDB资源
dynamodb = boto3.resource('dynamodb')
smm_client = boto3.client("ssm")

# 获取表名
def get_table(table_name):
    kb_name = config["knowledge_base_name"]
    param_name = f"{kb_name}-{table_name}-table-name"
    
    try:
        table_param = smm_client.get_parameter(
            Name=param_name, WithDecryption=False
        )
        return dynamodb.Table(table_param["Parameter"]["Value"])
    except Exception as e:
        # 尝试旧格式的参数名称（兼容性）
        if table_name == "fund_basic_info":
            try:
                table_param = smm_client.get_parameter(
                    Name=f"{kb_name}-table-name", WithDecryption=False
                )
                return dynamodb.Table(table_param["Parameter"]["Value"])
            except Exception as e:
                print(f"无法获取表 {table_name}: {str(e)}")
                return None
        else:
            print(f"无法获取表 {table_name}: {str(e)}")
            return None

## 辅助函数

In [17]:
# 将DataFrame转换为DynamoDB项目格式
def convert_to_dynamodb_item(row):
    item = {}
    for column, value in row.items():
        if pd.isna(value):
            continue
        elif isinstance(value, float):
            item[column] = Decimal(str(value))
        else:
            item[column] = str(value)
    return item

# 批量写入DynamoDB
def batch_write_to_dynamodb(table, items, batch_size=25):
    with table.batch_writer() as batch:
        for i, item in enumerate(tqdm(items)):
            try:
                batch.put_item(Item=item)
            except Exception as e:
                print(f"Error writing item {i}: {str(e)}")
                continue

In [18]:
file_path = os.path.join(current_dir, "fund_basic_info.csv")
file_path = os.path.join(current_dir, "fund_manager_em.csv")
file_path = os.path.join(current_dir, "fund_performance_info.csv")
df = pd.read_csv(file_path, dtype=str, keep_default_na=False)
df

Unnamed: 0,序号,基金代码,基金简称,日期,单位净值,累计净值,日增长率,近1周,近1月,近3月,近6月,近1年,近2年,近3年,今年来,成立来,自定义,手续费
0,1,014938,同泰产业升级混合A,2025-04-15,1.6249,1.6249,0.2,12.39,57.03,88.46,113.55,113.33,74.23,62.49,101.25,62.49,115.8475,0.15%
1,2,014939,同泰产业升级混合C,2025-04-15,1.6052,1.6052,0.2,12.39,56.96,88.27,113.12,112.44,72.84,60.55,101.0,60.52,114.9725,0.00%
2,3,018124,永赢先进制造智选混合发起A,2025-04-15,1.7565,1.7565,1.04,15.18,-15.6,13.07,94.43,94.93,,,38.88,75.65,105.2226,0.15%
3,4,018125,永赢先进制造智选混合发起C,2025-04-15,1.7433,1.7433,1.03,15.17,-15.63,12.94,94.04,94.15,,,38.71,74.33,104.4207,0.00%
4,5,016295,新华利率债债券E,2025-04-15,1.7196,2.0006,-0.02,0.12,0.47,0.07,2.18,94.07,99.09,,0.21,100.16,94.0863,0.00%
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
17226,17227,023484,中金中证A500ETF联接C,2025-04-15,0.9561,0.9561,-0.11,3.34,,,,,,,,-4.39,-4.39,0.00%
17227,17228,023181,华泰柏瑞上证180ETF联接I,2025-04-15,0.9731,0.9731,-0.08,2.73,-4.42,,,,,,,-2.69,-2.69,0.00%
17228,17229,022887,华宝标普港股通低波红利指数A,2025-04-15,1.009,1.009,0.62,4.77,-1.93,,,,,,,0.9,0.9,0.10%
17229,17230,022493,金元顺安鑫怡混合发起式C,2025-04-15,0.9754,0.9754,-0.06,1.76,-2.22,,,,,,,-2.46,-2.46,0.00%


## 上传基金基本信息

In [None]:
def upload_fund_basic_info():
    # 读取CSV文件
    file_path = os.path.join(current_dir, "fund_basic_info.csv")
    df = pd.read_csv(file_path, dtype=str, keep_default_na=False)
    
    # 重命名列名（中文转英文）
    df.columns = ['fund_code', 'pinyin_abbr', 'fund_name', 'fund_type', 'pinyin_full']

    df = df[['fund_code', 'fund_name', 'fund_type']].drop_duplicates()
    
    # 获取表
    table = get_table("fund_basic_info")
    if table is None:
        print("无法获取fund_basic_info表")
        return
    
    # 转换为DynamoDB项目格式
    items = [convert_to_dynamodb_item(row) for _, row in df.iterrows()]
    
    # 批量写入DynamoDB
    print(f"正在上传{len(items)}条基金基本信息...")
    batch_write_to_dynamodb(table, items)
    print("基金基本信息上传完成！")

# 执行上传
upload_fund_basic_info()

## 上传基金经理信息

In [10]:
def upload_fund_manager_info():
    # 读取CSV文件
    file_path = os.path.join(current_dir, "fund_manager.csv")
    df = pd.read_csv(file_path, dtype=str, keep_default_na=False)
    
    # 获取表
    table = get_table("fund_manager_info")
    if table is None:
        print("无法获取fund_manager_info表")
        return
    
    # 转换为DynamoDB项目格式
    items = [convert_to_dynamodb_item(row) for _, row in df.iterrows()]
    
    # 批量写入DynamoDB
    print(f"正在上传{len(items)}条基金经理信息...")
    batch_write_to_dynamodb(table, items)
    print("基金经理信息上传完成！")

# 执行上传
upload_fund_manager_info()

正在上传23271条基金经理信息...


100%|██████████| 23271/23271 [05:38<00:00, 68.78it/s]


基金经理信息上传完成！


## 上传基金业绩信息

In [8]:
def upload_fund_performance_info():
    # 读取CSV文件
    file_path = os.path.join(current_dir, "fund_performance_info.csv")
    df = pd.read_csv(file_path, dtype=str, keep_default_na=False)

    
    # 重命名列名（中文转英文）
    df.columns = ['index', 'fund_code', 'fund_name', 'date', 'nav', 'acc_nav', 
                 'daily_return', 'weekly_return', 'monthly_return', 'quarterly_return', 
                 'half_year_return', 'yearly_return', 'two_year_return', 'three_year_return', 
                 'ytd_return', 'since_inception_return', 'custom_return', 'fee']
    
    # 获取表
    table = get_table("fund_performance")
    if table is None:
        print("无法获取fund_performance表")
        return
    
    # 使用date作为排序键
    df['report_date'] = df['date']
    
    # 转换为DynamoDB项目格式
    items = [convert_to_dynamodb_item(row) for _, row in df.iterrows()]
    
    # 批量写入DynamoDB
    print(f"正在上传{len(items)}条基金业绩信息...")
    batch_write_to_dynamodb(table, items)
    print("基金业绩信息上传完成！")

# 执行上传
upload_fund_performance_info()

正在上传17231条基金业绩信息...


 74%|███████▍  | 12725/17231 [03:11<01:06, 67.84it/s]

Error writing item 12724: An error occurred (ValidationException) when calling the BatchWriteItem operation: One or more parameter values are not valid. The AttributeValue for a key attribute cannot contain an empty string value. Key: report_date


 89%|████████▊ | 15250/17231 [03:50<00:29, 67.94it/s]

Error writing item 15249: An error occurred (ValidationException) when calling the BatchWriteItem operation: One or more parameter values are not valid. The AttributeValue for a key attribute cannot contain an empty string value. Key: report_date


100%|██████████| 17231/17231 [04:19<00:00, 66.32it/s]


基金业绩信息上传完成！


## 上传基金费率信息

注意：这里我们从fund_performance_info.csv中提取费率信息，创建一个单独的表

In [9]:
def upload_fund_fee_info():
    # 读取CSV文件
    file_path = os.path.join(current_dir, "fund_performance_info.csv")
    df = pd.read_csv(file_path, dtype=str, keep_default_na=False)
    
    # 重命名列名（中文转英文）
    df.columns = ['index', 'fund_code', 'fund_name', 'date', 'nav', 'acc_nav', 
                 'daily_return', 'weekly_return', 'monthly_return', 'quarterly_return', 
                 'half_year_return', 'yearly_return', 'two_year_return', 'three_year_return', 
                 'ytd_return', 'since_inception_return', 'custom_return', 'fee']
    
    # 提取费率信息
    fee_df = df[['fund_code', 'fund_name', 'fee']].drop_duplicates()
    
    # 添加fee_type列作为排序键
    fee_df['fee_type'] = 'subscription_fee'  # 认购费
    
    # 获取表
    table = get_table("fund_fee_structure")
    if table is None:
        print("无法获取fund_fee_structure表")
        return
    
    # 转换为DynamoDB项目格式
    items = [convert_to_dynamodb_item(row) for _, row in fee_df.iterrows()]
    
    # 批量写入DynamoDB
    print(f"正在上传{len(items)}条基金费率信息...")
    batch_write_to_dynamodb(table, items)
    print("基金费率信息上传完成！")

# 执行上传
upload_fund_fee_info()

正在上传17231条基金费率信息...


100%|██████████| 17231/17231 [04:20<00:00, 66.15it/s]


基金费率信息上传完成！


## 查询示例

In [21]:
def query_fund_by_code(fund_code):
    # 获取基金基本信息
    basic_info_table = get_table("fund_basic_info")
    if basic_info_table is None:
        print("无法获取fund_basic_info表")
        return
    
    response = basic_info_table.query(
        KeyConditionExpression=Key("fund_code").eq(fund_code)
    )
    
    if not response["Items"]:
        print(f"未找到基金代码为{fund_code}的基金")
        return
    
    basic_info = response["Items"][0]
    print("基金基本信息:")
    print(json.dumps(basic_info, indent=2, default=str))
    
    # 获取基金经理信息
    manager_table = get_table("fund_manager_info")
    if manager_table is not None:
        response = manager_table.scan(
            FilterExpression=Attr("fund_code").eq(fund_code)
        )
        if response["Items"]:
            print("\n基金经理信息:")
            print(json.dumps(response["Items"][0], indent=2, default=str))
    
    # 获取基金费率信息
    fee_table = get_table("fund_fee_structure")
    if fee_table is not None:
        response = fee_table.query(
            KeyConditionExpression=Key("fund_code").eq(fund_code)
        )
        if response["Items"]:
            print("\n基金费率信息:")
            print(json.dumps(response["Items"][0], indent=2, default=str))
    
    # 获取基金业绩信息
    holdings_table = get_table("fund_performance")
    if holdings_table is not None:
        response = holdings_table.query(
            KeyConditionExpression=Key("fund_code").eq(fund_code),
            Limit=1  # 只获取最新的一条记录
        )
        if response["Items"]:
            print("\n基金业绩信息:")
            print(json.dumps(response["Items"][0], indent=2, default=str))

# 查询示例
query_fund_by_code("000001")  # 华夏成长混合

基金基本信息:
{
  "fund_code": "000001",
  "fund_type": "\u6df7\u5408\u578b-\u7075\u6d3b",
  "fund_name": "\u534e\u590f\u6210\u957f\u6df7\u5408"
}

基金费率信息:
{
  "fee_type": "subscription_fee",
  "fund_name": "\u534e\u590f\u6210\u957f\u6df7\u5408",
  "fund_code": "000001",
  "fee": "0.15%"
}

基金业绩信息:
{
  "nav": "0.831",
  "report_date": "2025-04-15",
  "index": "2526",
  "half_year_return": "0.85",
  "fee": "0.15%",
  "since_inception_return": "415.96",
  "acc_nav": "3.394",
  "two_year_return": "-12.06",
  "daily_return": "-0.84",
  "yearly_return": "13.52",
  "date": "2025-04-15",
  "monthly_return": "-7.46",
  "fund_name": "\u534e\u590f\u6210\u957f\u6df7\u5408",
  "three_year_return": "-13.64",
  "fund_code": "000001",
  "ytd_return": "1.09",
  "quarterly_return": "2.85",
  "weekly_return": "4.79",
  "custom_return": "15.5772"
}


In [22]:
def query_fund_by_name(fund_name):
    # 获取基金基本信息
    basic_info_table = get_table("fund_basic_info")
    if basic_info_table is None:
        print("无法获取fund_basic_info表")
        return
    
    response = basic_info_table.scan(
        FilterExpression=Attr("fund_name").eq(fund_name)
    )
    
    if not response["Items"]:
        print(f"未找到基金名称为{fund_name}的基金")
        return
    
    basic_info = response["Items"][0]
    fund_code = basic_info["fund_code"]
    
    print("基金基本信息:")
    print(json.dumps(basic_info, indent=2, default=str))
    
    # 使用fund_code查询其他信息
    # 获取基金经理信息
    manager_table = get_table("fund_manager_info")
    if manager_table is not None:
        response = manager_table.scan(
            FilterExpression=Attr("fund_code").eq(fund_code)
        )
        if response["Items"]:
            print("\n基金经理信息:")
            print(json.dumps(response["Items"][0], indent=2, default=str))
    
    # 获取基金费率信息
    fee_table = get_table("fund_fee_structure")
    if fee_table is not None:
        response = fee_table.query(
            KeyConditionExpression=Key("fund_code").eq(fund_code)
        )
        if response["Items"]:
            print("\n基金费率信息:")
            print(json.dumps(response["Items"][0], indent=2, default=str))
    
    # 获取基金业绩信息
    holdings_table = get_table("fund_performance")
    if holdings_table is not None:
        response = holdings_table.query(
            KeyConditionExpression=Key("fund_code").eq(fund_code),
            Limit=1  # 只获取最新的一条记录
        )
        if response["Items"]:
            print("\n基金业绩信息:")
            print(json.dumps(response["Items"][0], indent=2, default=str))

# 查询示例
query_fund_by_name("华夏成长混合")

基金基本信息:
{
  "fund_code": "000001",
  "fund_type": "\u6df7\u5408\u578b-\u7075\u6d3b",
  "fund_name": "\u534e\u590f\u6210\u957f\u6df7\u5408"
}

基金费率信息:
{
  "fee_type": "subscription_fee",
  "fund_name": "\u534e\u590f\u6210\u957f\u6df7\u5408",
  "fund_code": "000001",
  "fee": "0.15%"
}

基金业绩信息:
{
  "nav": "0.831",
  "report_date": "2025-04-15",
  "index": "2526",
  "half_year_return": "0.85",
  "fee": "0.15%",
  "since_inception_return": "415.96",
  "acc_nav": "3.394",
  "two_year_return": "-12.06",
  "daily_return": "-0.84",
  "yearly_return": "13.52",
  "date": "2025-04-15",
  "monthly_return": "-7.46",
  "fund_name": "\u534e\u590f\u6210\u957f\u6df7\u5408",
  "three_year_return": "-13.64",
  "fund_code": "000001",
  "ytd_return": "1.09",
  "quarterly_return": "2.85",
  "weekly_return": "4.79",
  "custom_return": "15.5772"
}


In [26]:
fund_name = "华夏成长混合"
manager_table = get_table("fund_manager_info")
if manager_table is not None:
    response = manager_table.scan(
        FilterExpression=Attr("fund_name").eq(fund_name)
    )
    if response["Items"]:
        print("\n基金经理信息:")
        print(json.dumps(response["Items"][0], indent=2, default=str))

In [None]:
manager_table

In [11]:
dynamodb = boto3.client('dynamodb', region_name='us-east-1')
response = dynamodb.scan(TableName="fund_manager_info",Select='COUNT')
count = response['Count']
count

4713