In [1]:
# 讀取環境變數
import sys
import os
from pathlib import Path

PROJECT_ROOT = Path().resolve().parent
sys.path.append(str(PROJECT_ROOT))

from dotenv import load_dotenv
load_dotenv("../.env")


True

In [None]:
# 測試 arxiv api 取資料
import arxiv

client = arxiv.Client(
  page_size=1,
)

search = arxiv.Search(
    query="",
    max_results=1,
    sort_by=arxiv.SortCriterion.SubmittedDate,
    sort_order=arxiv.SortOrder.Descending,
    id_list=["2301.12345v1"]
)

result = next(client.results(search))
for attr, value in vars(result).items():
  print(attr,":",value)


In [None]:
# 確認 db 中有哪些資料需要處理

from src.core.db import get_pg
from src.core.pg_engine import PsqlEngine
pg = get_pg()

def get_pending_gz(pg: PsqlEngine, num: str) -> list:
    stmt = f"""
        select category,s3_path from etl.raw_batches where etl_status = 'pending' order by batch_id limit {num};
    """
    result = pg.execute_query(stmt)
    return result


pending_gz = get_pending_gz(pg, 10)

pending_gz = [r.__dict__ if hasattr(r, "__dict__") else dict(r._asdict()) for r in pending_gz]
print(pending_gz)

In [None]:
import os
import json
from psycopg2.extras import Json
import gzip
import io
import boto3
import uuid
import yaml
import logging
from datetime import datetime, timezone
from src.core.db import get_pg
from src.core.pg_engine import PsqlEngine

BUCKET_NAME = os.getenv("BUCKET_NAME")

s3 = boto3.client(
    "s3",
    region_name=os.getenv("AWS_REGION"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

def load_config(BUCKET_NAME):
    bucket_name = BUCKET_NAME
    key = "config/config.yaml"
    local_path = "/tmp/config.yaml"

    try:
        s3.download_file(bucket_name, key, local_path)
        with open(local_path, "r") as f:
            config = yaml.safe_load(f)
        return config
    except Exception as e:
        logging.error(f"Failed to load config from S3: {e}")
        raise

cfg = load_config(BUCKET_NAME)

# ETL 每次吃多少個 gz 檔
PENDING_GZ_BATCH = cfg['etl']['pending_gz_batch']

# ETL 多少筆資料一次寫入 DB
ETL_BATCH_SIZE = cfg['etl']['etl_batch_size']


pg = get_pg()

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

def get_pending_gz(pg: PsqlEngine, num: str) -> list:
    stmt = f"""
        select category, s3_path from etl.raw_batches where etl_status = 'pending' order by batch_id limit {num};
    """
    result = pg.execute_query(stmt)
    return result

def parse_record(record: dict, s3_key) -> tuple:
    published_date = record.get("published")
    updated_date = record.get("updated")
    if published_date:
        published_date = datetime.fromisoformat(published_date).date()
    if updated_date:
        updated_date = datetime.fromisoformat(updated_date).date()
    return (
        record.get("entry_id"),
        record.get("title"),
        record.get("authors", []),
        json.dumps({}),
        record.get("summary"),
        record.get("primary_category"),
        record.get("categories", []),
        record.get("published"),
        record.get("updated"),
        record.get("journal_ref"),
        record.get("doi"),
        json.dumps({}),
        published_date,
        updated_date,
        datetime.now(timezone.utc),
        1,
        [],
        None,
        s3_key
    )

def parse_history_record(record: dict, s3_key: str, operation: str, etl_stage: str) -> tuple:
    summary = record.get("summary") or ""
    summary = summary.replace('\x00', '').replace('\n', ' ').replace('\r', ' ')
    return (
        str(uuid.uuid4()),
        record.get("entry_id"),
        datetime.now(timezone.utc).timestamp(),
        datetime.now(timezone.utc),
        etl_stage,
        record.get("title"),
        record.get("authors", []),
        Json({}),
        summary,
        record.get("primary_category"),
        record.get("categories", []),
        record.get("published"),
        record.get("updated"),
        record.get("journal_ref"),
        record.get("doi"),
        Json({}),
        [],
        None,
        s3_key,
        operation
    )

def safe_insert(table, batch):
    try:
        pg.insert_mogrify(table, batch)
    except Exception as e:
        logger.error(f"Batch insert failed: {e}")
        for row in batch:
            try:
                pg.insert_mogrify(table, [row])
            except Exception as e2:
                logger.error(f"Single insert failed for row {row[0]}: {e2}")
        return False
    return True

def update_etl_status(pg: PsqlEngine, s3_path: str, status: str, started_at=None, finished_at=None, error_msg=None):
    stmt = """
        update etl.raw_batches
        set etl_status = %s,
            etl_started_at = coalesce(%s, etl_started_at),
            etl_finished_at = coalesce(%s, etl_finished_at),
            error_msg = %s
        where s3_path = %s;
    """
    params = (status, started_at, finished_at, error_msg, s3_path)
    pg.execute_cmd(stmt, params)

def load_s3_gzip_to_pg(bucket: str, s3_key: str, etl_stage: str = "initial_load"):
    obj = s3.get_object(Bucket=bucket, Key=s3_key)
    batch, batch_history = [], []

    with gzip.GzipFile(fileobj=io.BytesIO(obj["Body"].read()), mode="rb") as f:
        for line in f:
            record = json.loads(line.decode("utf-8"))
            batch.append(parse_record(record, s3_key))
            batch_history.append(parse_history_record(record, s3_key, operation="insert", etl_stage=etl_stage))
            if len(batch) >= ETL_BATCH_SIZE:
                safe_insert("arxiv_papers", batch)
                safe_insert("arxiv_papers_history", batch_history)
                batch, batch_history = [], []

    if batch:
        safe_insert("arxiv_papers", batch)
        safe_insert("arxiv_papers_history", batch_history)
    return datetime.now(timezone.utc)

pending_gz = get_pending_gz(pg, PENDING_GZ_BATCH) # 取多少檔案下來
pending_gz = [r.__dict__ if hasattr(r, "__dict__") else dict(r._asdict()) for r in pending_gz]

for pending_gz_dict in pending_gz:
    key = pending_gz_dict['s3_path']
    logger.info(f"Processing {key}")
    started_at = datetime.now(timezone.utc)
    update_etl_status(pg, key, "processing", started_at=started_at)
    try:
        finished_at = load_s3_gzip_to_pg(BUCKET_NAME, key)
        update_etl_status(pg, key, "finished", finished_at=finished_at)
        # logger.info(f"Finished {key}")
    except Exception as e:
        logger.error(f"Error processing {key}: {e}", exc_info=True)
        update_etl_status(pg, key, "failed", finished_at=datetime.now(timezone.utc), error_msg=str(e))
    


In [None]:
import json
from pathlib import Path
from datetime import datetime, timezone

input_file = Path("arxiv_data/arxiv_batch_1.json")
output_file = Path("arxiv_data/arxiv_batch_cleaned.json")

def transform_datetime2date(dt_str):
    try:
        dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
        return dt.strftime("%Y-%m-%d")
    except Exception:
        return None

with open(input_file, "r", encoding="utf-8") as f:
    papers = json.load(f)

# 去重
unique_papers = {paper["entry_id"]: paper for paper in papers}


required_fields = [
    "entry_id", "title", "summary", "authors", 
    "primary_category", "published", "updated"
]

cleaned_papers = []
for paper in unique_papers.values():
    # 刪除缺值資料
    if all(paper.get(field) for field in required_fields) and all(a.strip() for a in paper["authors"]):
        paper["published_date"] = transform_datetime2date(paper["published"])
        paper["updated_date"] = transform_datetime2date(paper["updated"])
        paper["etl_datetime"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # use UTC timezone
        cleaned_papers.append(paper)

for paper in cleaned_papers:
    print(paper)

# with open(output_file, "w", encoding="utf-8") as f:
#     json.dump(cleaned_papers, f, ensure_ascii=False, indent=2)

# print(f"清理完成，共 {len(cleaned_papers)} 筆，已儲存到 {output_file}")


In [None]:
import boto3
from botocore.exceptions import ClientError

# 建立 DynamoDB 連線
dynamodb = boto3.resource(
    "dynamodb",
    region_name=os.getenv("AWS_REGION"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

table = dynamodb.Table('download_paper_entry_id')


In [None]:
# S3 新增一筆資料
from datetime import datetime, timezone

entry_id = "http://arxiv.org/abs/2510.11683v1"
item = {
    "category": "cs.LG",
    "entry_id": entry_id,
    "status": "uploaded",  # "failed"
    "last_attempt": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
    "error_msg": "" 
}

try:
    table.put_item(
        Item=item,
        ConditionExpression='attribute_not_exists(entry_id)'
    )
    print("已新增")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
        print("這篇 paper 已存在")
    else:
        raise


In [None]:
import boto3
from dotenv import load_dotenv
import os

def create_s3_bucket_and_prefix(bucket_name: str, domain: str):
    env_path = os.path.join(os.path.dirname(__file__), "../.env")
    if not os.path.exists(env_path):
        raise FileNotFoundError(f".env not found at {env_path}")
    
    load_dotenv(env_path)

    s3 = boto3.client(
        "s3",
        region_name=os.getenv("AWS_REGION"),
        aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
    )

    s3.create_bucket(Bucket=bucket_name)
    prefix = f"raw/domain={domain}/"
    s3.put_object(Bucket=bucket_name, Key=(prefix + ".keep"))

if __name__ == "__main__":
    create_s3_bucket_and_prefix("my-test-bucket", "cs.LG")


In [None]:
# 查看你有哪個 Bucket

import boto3
from dotenv import load_dotenv
import os

load_dotenv("../.env")

s3 = boto3.client(
    "s3",
    region_name=os.getenv("AWS_REGION"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

response = s3.list_buckets()
for bucket in response["Buckets"]:
    print(bucket["Name"])


In [None]:
import boto3
from dotenv import load_dotenv
import os

load_dotenv("../.env")

s3 = boto3.client(
    "s3",
    region_name=os.getenv("AWS_REGION"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

bucket_name = "arvix-paper-bucket"

response = s3.list_objects_v2(
    Bucket=bucket_name,
    Prefix="raw/",   # 只看 raw/ 底下
    Delimiter="/"
)

if "CommonPrefixes" in response:
    print("Prefixes:")
    for prefix in response["CommonPrefixes"]:
        print(prefix["Prefix"])
else:
    print("沒有找到任何 prefix")


In [None]:
# 上傳 config.yaml 至 S3

import boto3
from dotenv import load_dotenv
import os

load_dotenv("../.env")

s3 = boto3.client(
    "s3",
    region_name=os.getenv("AWS_REGION"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

bucket_name = "arvix-paper-bucket"
local_file = "../config/config.yaml"
key = "config/config.yaml"

with open(local_file, "rb") as f:
    s3.put_object(
        Bucket=bucket_name,
        Key=key,
        Body=f,
        ContentType="application/x-yaml"
    )


In [None]:
# 讀取 S3 上的 config.yaml
import boto3
from dotenv import load_dotenv
import os

load_dotenv("../.env")

s3 = boto3.client(
    "s3",
    region_name=os.getenv("AWS_REGION"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

bucket_name = "arvix-paper-bucket"
local_path = "/tmp/config.yaml"
key = "config/config.yaml"
s3.download_file(bucket_name, key, local_path)

with open(local_path) as f:
    data = f.read()
    print(data)


In [None]:
import boto3
from dotenv import load_dotenv
import os

load_dotenv("../.env")

s3 = boto3.client(
    "s3",
    region_name=os.getenv("AWS_REGION"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

bucket_name = "hackmd-paper-bucket"
prefix = "raw/domain=cs.LG/"
local_file = "/home/hank/hackmd-data-pipeline/tests/arxiv_data/arxiv_batch_2.json"
key = prefix + os.path.basename(local_file)

with open(local_file, "rb") as f:
    s3.put_object(
        Bucket=bucket_name,
        Key=key,
        Body=f,
        ContentType="application/json"
    )

print(f"已上傳 {local_file} 到 S3: {key}")


In [None]:
import boto3
from dotenv import load_dotenv
import os

load_dotenv("../.env")

s3 = boto3.client(
    "s3",
    region_name=os.getenv("AWS_REGION"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

bucket_name = "hackmd-paper-bucket"
prefix = "raw/domain=cs.LG/"

response = s3.list_objects_v2(
    Bucket=bucket_name,
    Prefix=prefix,
    Delimiter="/" 
)

if "Contents" in response:
    print("檔案列表：")
    files = [obj["Key"] for obj in response["Contents"] if not obj["Key"].endswith(".keep")]
    for f in files:
        print(f)
else:
    print("此 prefix 下沒有檔案")


In [None]:
from src.core.db import get_pg
from src.core.pg_engine import PsqlEngine
pg = get_pg()

def paper_exists(pg: PsqlEngine, category: str, entry_id: str) -> bool:
    stmt = f"""
        SELECT 1
        FROM papers.downloaded_papers
        WHERE category = '{category}' AND entry_id = '{entry_id}'
        LIMIT 1;
    """
    result = pg.execute_query(stmt)
    return bool(result)


paper_exists(pg, "cs_LG",'dsfd')

In [None]:
# 手動觸發 Collector lambda, 會取得完整的執行紀錄

from dotenv import load_dotenv
import os
import boto3
import requests
from requests_aws4auth import AWS4Auth

load_dotenv("../.env")

region_name = os.getenv("AWS_REGION")
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")

session = boto3.Session(
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)
credentials = session.get_credentials().get_frozen_credentials()

region = "ap-northeast-1"
service = "execute-api"

awsauth = AWS4Auth(
    credentials.access_key,
    credentials.secret_key,
    region,
    service,
    session_token=credentials.token
)

url = "https://z1cft4uc6g.execute-api.ap-northeast-1.amazonaws.com/default/Collector"
payload = {"trigger": "manual"}

response = requests.post(url, json=payload, auth=awsauth)

print("Status Code:", response.status_code)
print("Response Body:", response.text)


In [3]:
# 手動觸發 Collector lambda 
from dotenv import load_dotenv
import os
import boto3
import json

load_dotenv("../.env")

region_name = "ap-northeast-1"
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")

lambda_client = boto3.client(
    "lambda",
    region_name=region_name,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key
)

response = lambda_client.invoke(
    FunctionName="Collector", # Collector ETL_A ETL_B
    InvocationType="Event",
    Payload=json.dumps({"trigger": "manual"}).encode()
)


print(response)


{'ResponseMetadata': {'RequestId': 'c0ee223a-6843-4465-813a-ba39a21d4d5a', 'HTTPStatusCode': 202, 'HTTPHeaders': {'date': 'Wed, 22 Oct 2025 07:52:21 GMT', 'content-length': '0', 'connection': 'keep-alive', 'x-amzn-requestid': 'c0ee223a-6843-4465-813a-ba39a21d4d5a', 'x-amzn-remapped-content-length': '0', 'x-amzn-trace-id': 'root=1-68f88d34-177d20e372effc102bef68e2;parent=28ee1fc2b4827594;sampled=0'}, 'RetryAttempts': 0}, 'StatusCode': 202, 'Payload': <botocore.response.StreamingBody object at 0x7f500e5eb730>}
