# Import

In [21]:
import os
import sys
import logging
import multiprocessing
import pandas as pd
import sqlalchemy
from dotenv import load_dotenv
# import exchange_calendars as xcals
from datetime import datetime, timedelta
# import pytz
# import pandas as pd
# from IPython.display import display, HTML
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import insert
from concurrent.futures import ThreadPoolExecutor

module_path = "/Users/jx/ProgramData/python/akshare"
if module_path not in sys.path:
    sys.path.insert(0, module_path)

import akshare as ak  # noqa: E402

print(ak.__version__)

1.12.92


# Init

In [17]:
load_dotenv()  # take environment variables from .env.

DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
DB_NAME = os.getenv('DB_NAME')

# Create an engine instance
alchemyEngine = create_engine(
    f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}",
    pool_recycle=3600,
)

logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)

file_handler = logging.FileHandler('etl.log')
console_handler = logging.StreamHandler()

# Step 4: Create a formatter
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')

# Step 5: Attach the formatter to the handlers
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

# Step 6: Add the handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

# Helper functions

In [None]:
def update_on_conflict(table, conn, df, primary_keys):
    """
    Insert new records, update existing records
    """
    table = sqlalchemy.Table(table, sqlalchemy.MetaData(), autoload_with=conn)
    insert_stmt = insert(table).values(df.to_dict(orient='records'))
    on_conflict_stmt = insert_stmt.on_conflict_do_update(
        index_elements=primary_keys,
        set_={c.key: c for c in insert_stmt.excluded if c.key not in primary_keys}
    )
    conn.execute(on_conflict_stmt)

def ignore_on_conflict(table, conn, df, primary_keys):
    """
    Insert new records, ignore existing records
    """
    table = sqlalchemy.Table(table, sqlalchemy.MetaData(), autoload_with=conn)
    insert_stmt = insert(table).values(df.to_dict(orient='records'))
    on_conflict_stmt = insert_stmt.on_conflict_do_nothing(
        index_elements=primary_keys
    )
    conn.execute(on_conflict_stmt)

def saveAsCsv(file_name_main: str, df):
    """
    Save dataframe to CSV file
    """
    # save to file
    # Get the current timestamp to append to the filename
    current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
    # Save the dataframe to a csv file with timestamp as suffix. Need to properly encode and display Chinese characters.
    df.to_csv(
        f"{file_name_main}_{current_time}.csv", encoding="utf_8_sig", index=False
    )
    

# fund_etf_spot_em 

In [2]:
# Get laste fund / ETF data set for today (or latest trading date), and persists into database.

df = ak.fund_etf_spot_em()
df = df[
    [
        "代码",
        "名称",
        "最新价",
        "IOPV实时估值",
        "基金折价率",
        "涨跌额",
        "涨跌幅",
        "成交量",
        "成交额",
        "开盘价",
        "最高价",
        "最低价",
        "昨收",
        "换手率",
        "量比",
        "委比",
        "外盘",
        "内盘",
        "主力净流入-净额",
        "主力净流入-净占比",
        "超大单净流入-净额",
        "超大单净流入-净占比",
        "大单净流入-净额",
        "大单净流入-净占比",
        "中单净流入-净额",
        "中单净流入-净占比",
        "小单净流入-净额",
        "小单净流入-净占比",
        "流通市值",
        "总市值",
        "最新份额",
        "数据日期",
        "更新时间",
    ]
]

saveAsCsv("fund_etf_spot_em", df)

# Rename the columns of df to match the table's column names
df = df.rename(
    columns={
        "数据日期": "date",
        "更新时间": "update_time",
        "代码": "code",
        "名称": "name",
        "最新价": "latest_price",
        "IOPV实时估值": "iopv",
        "基金折价率": "fund_discount_rate",
        "涨跌额": "change_amount",
        "涨跌幅": "change_rate",
        "成交量": "volume",
        "成交额": "turnover",
        "开盘价": "opening_price",
        "最高价": "highest_price",
        "最低价": "lowest_price",
        "昨收": "previous_close",
        "换手率": "turnover_rate",
        "量比": "volume_ratio",
        "委比": "order_ratio",
        "外盘": "external_disc",
        "内盘": "internal_disc",
        "主力净流入-净额": "main_force_net_inflow_amount",
        "主力净流入-净占比": "main_force_net_inflow_ratio",
        "超大单净流入-净额": "super_large_net_inflow_amount",
        "超大单净流入-净占比": "super_large_net_inflow_ratio",
        "大单净流入-净额": "large_net_inflow_amount",
        "大单净流入-净占比": "large_net_inflow_ratio",
        "中单净流入-净额": "medium_net_inflow_amount",
        "中单净流入-净占比": "medium_net_inflow_ratio",
        "小单净流入-净额": "small_net_inflow_amount",
        "小单净流入-净占比": "small_net_inflow_ratio",
        "流通市值": "circulating_market_value",
        "总市值": "total_market_value",
        "最新份额": "latest_shares",
    }
)

with alchemyEngine.begin() as conn:
    update_on_conflict('fund_etf_spot_em', conn, df, ['code', 'date'])

# fund_etf_perf_em

In [14]:
fund_exchange_rank_em_df = ak.fund_exchange_rank_em()

saveAsCsv("fund_exchange_rank_em", fund_exchange_rank_em_df)

column_mapping = {
    '序号': 'id',
    '基金代码': 'fundcode',
    '基金简称': 'fundname',
    '类型': 'type',
    '日期': 'date',
    '单位净值': 'unitnav',
    '累计净值': 'accumulatednav',
    '近1周': 'pastweek',
    '近1月': 'pastmonth',
    '近3月': 'past3months',
    '近6月': 'past6months',
    '近1年': 'pastyear',
    '近2年': 'past2years',
    '近3年': 'past3years',
    '今年来': 'ytd',
    '成立来': 'sinceinception',
    '成立日期': 'inceptiondate'
}
fund_exchange_rank_em_df.rename(columns=column_mapping, inplace=True)

with alchemyEngine.begin() as conn:
    update_on_conflict('fund_etf_perf_em', conn, fund_exchange_rank_em_df, ['fundcode'])


# Get a full list of ETF fund

In [11]:
# retrieve list from Sina
fund_etf_category_sina_df = ak.fund_etf_category_sina(symbol="ETF基金")

# keep only 2 columns from `fund_etf_category_sina_df`: 代码, 名称.
# split `代码` values by `exchange code` and `symbol` and store into 2 columns. No need to keep the `代码` column.
# for example: 代码=sz159998, split into `exch=sz`, `symbol=159998`.
df = fund_etf_category_sina_df[['代码', '名称']].copy()
df.columns = ['code', 'name']
df[['exch', 'symbol']] = df['code'].str.extract(r'([a-z]+)(\d+)')
df.drop(columns=['code'], inplace=True)

# Now, use the update_on_conflict function to insert or update the data
with alchemyEngine.begin() as conn:
    update_on_conflict('fund_etf_list_sina', conn, df, ['exch', 'symbol'])

# Get Full List History Trade Data

In [20]:
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=20)).strftime('%Y%m%d')
# start_date = '19700101' # For entire history.

# Function to fetch and process ETF data
def fetch_and_process_etf(symbol):
    try:
        df = ak.fund_etf_hist_em(symbol=symbol, period='daily', start_date='19700101', end_date=end_date, adjust='qfq')
        df['symbol'] = symbol
        df.columns = ['date', 'open', 'close', 'high', 'low', 'volume', 'turnover', 'amplitude', 'change_rate', 'change_amount', 'turnover_rate', 'symbol']
        df = df[['symbol', 'date', 'open', 'close', 'high', 'low', 'volume', 'turnover', 'amplitude', 'change_rate', 'change_amount', 'turnover_rate']]
        with alchemyEngine.begin() as conn:
            ignore_on_conflict('fund_etf_daily_em', conn, df, ['symbol', 'date'])
    except Exception:
        logging.error(f'failed to get daily trade history data for {symbol}', exc_info=True)
        return None
    return df

# Fetch the ETF list
etf_list_df = pd.read_sql('SELECT symbol FROM fund_etf_list_sina', alchemyEngine)

# get the number of CPU cores
num_cores = multiprocessing.cpu_count()

# Use ThreadPoolExecutor to fetch data in parallel
with ThreadPoolExecutor(max_workers=num_cores) as executor:
    futures = [executor.submit(fetch_and_process_etf, symbol) for symbol in etf_list_df['symbol']]
    results = [future.result() for future in futures]