In [None]:
import os
import numpy as np
import pandas as pd
from newsapi import NewsApiClient
import yfinance as yf
from database.db_connect import get_engine
from datetime import datetime, timedelta
from dotenv import load_dotenv
from newspaper import Article
from pathlib import Path
from textblob import TextBlob
from sentence_transformers import SentenceTransformer
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import text
from database.db_connect import get_engine
import logging

NEWS_API_KEY = os.getenv('NEWS_API_KEY')
newsapi = NewsApiClient(api_key=NEWS_API_KEY)

logger = logging.getLogger(__name__)

In [None]:
# extract.py

def get_asset_id():
    engine = get_engine()
    query = 'SELECT asset_id FROM dim_assets'

    with engine.connect() as conn:
        df = pd.read_sql(query, conn)
    
    return df['asset_id'].to_list()


def get_asset_name():
    engine = get_engine()
    query = 'SELECT asset_name FROM dim_assets'

    with engine.connect() as conn:
        df = pd.read_sql(query, conn)
    
    return df['asset_name'].to_list()


def fetch_price(execution_date):

    start_dt = execution_date
    end_dt = (pd.to_datetime(execution_date) + timedelta(days=1)).strftime('%Y-%m-%d')

    assets = get_asset_id()
    # print(assets)
    logging.info('fetch stock list successfully')

    all_df = []

    for asset in assets:
        try:
            ticker = yf.Ticker(asset)
            data = ticker.history(start=start_dt, end=end_dt, interval="1h")

            if data.empty:
                logger.info(f"Cannot fetch {asset} -- No Data Found from {start_dt} to {end_dt}")
                continue
            else:
                data = data.reset_index()
                data['asset_id'] = asset
                data = data[['asset_id', 'Datetime', 'Close', 'Volume']]

                all_df.append(data)
                logger.info(f"Fetch {len(data)} rows successfully for {asset}")

        except Exception as e:
            logger.info(f"Cannot fetch {asset} from YF -- Error happened")
            continue
    
    if all_df:
        final_df = pd.concat(all_df, ignore_index=True)
        return final_df
    else:
        return pd.DataFrame()    
         
    
def fetch_raw_news(execution_date):

    start_dt = execution_date
    end_dt = (pd.to_datetime(execution_date) + timedelta(days=1)).strftime('%Y-%m-%d')
    assets_name = get_asset_name()
    # print(assets)

    query_cmd = '"XAG/USD" OR '

    for name in assets_name:
        query_cmd = query_cmd + f'"{name}"'

        if name != assets_name[-1]:
            query_cmd = query_cmd + " OR "
    
    query = f'({query_cmd})'
    # print(query)

    raw_news = newsapi.get_everything(
        q=query,
        language='en',
        sort_by='publishedAt', 
        page_size=100,
        from_param=start_dt, 
        to=end_dt
    )

    articles = raw_news.get('articles', [])

    df_news = pd.json_normalize(articles)
    if df_news.empty:
        return None
    
    df_news = df_news.rename(columns={'source.id':'source_id', 'source.name':'source_name'})

    def extract_full_content(url):
        try:
            article = Article(url)
            article.download()
            article.parse()
            return article.text
        except:
            return np.nan
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        df_news['full_content'] = list(executor.map(extract_full_content, df_news['url']))

    # df_news['title'] = df_news['title'].fillna('')
    # df_news['description'] = df_news['description'].fillna('')
    # df_news['content'] = df_news['content'].fillna('')

    # df_news.loc[df_news[df_news['full_content'].isna()].index, 'full_content'] = df_news['title'].fillna(" ") + df_news['description'].fillna(" ") + df_news['content'].fillna(" ")

    df_news = df_news.replace({pd.NA: None, np.nan: None})

    if os.path.exists('/opt/airflow'):
        base_path = Path('/opt/airflow/data')
    else:
        base_path = Path(os.getcwd()) / '../data'

    base_path = f"{base_path}/raw_news"
    os.makedirs(base_path, exist_ok=True)
    parquet_path = f"{base_path}/raw_news_{execution_date}.parquet" 

    df_news.to_parquet(parquet_path, index=False)    
    
    return parquet_path

In [162]:
fetch_raw_news('2026-01-08')

'/Users/taninudomthanakij/Documents/my-de-portfolio/scripts/../data/raw_news/raw_news_2026-01-08.parquet'

In [None]:
# load.py

def load_raw_news(parquet_path):
    
    df_news = pd.read_parquet(parquet_path)

    query = text("""
        INSERT INTO stg_news (author, source_id, source_name, title, url, raw_content, published_at)
        VALUES (:author, :source_id, :source_name, :title, :url, :full_content, :publishedAt)
        ON CONFLICT (url)
        DO NOTHING;
    """)

    engine = get_engine()

    with engine.begin() as conn:
        conn.execute(query, df_news.to_dict('records'))




In [288]:
load_raw_news("/Users/taninudomthanakij/Documents/my-de-portfolio/data/raw_news/raw_news_2026-01-08.parquet")

In [286]:
fetch_raw_news('2026-01-12')

'/Users/taninudomthanakij/Documents/my-de-portfolio/scripts/../data/raw_news/raw_news_2026-01-12.parquet'

In [274]:
def vectorize_news(execution_date):

    engine = get_engine()

    query = text("""
        SELECT id, author, source_id, source_name, title, url, raw_content, is_vectorized, published_at
        FROM stg_news
        WHERE is_vectorized=FALSE;
    """)

    df_raw = pd.read_sql(query, con=engine)
    # print(df_raw)
    df_raw['raw_content'] = df_raw['title'] + df_raw['raw_content'].fillna("")

    model = SentenceTransformer('all-MiniLM-L6-v2')
    embeddings = model.encode(df_raw['raw_content'].tolist())
    
    df_raw['embedded_content'] = embeddings.tolist()
    df_raw = df_raw[['id', 'published_at', 'title', 'embedded_content', 'url']]

    # df_raw['embedded_content'] =  df_raw['embedded_content'].apply(lambda x: x.tolist() if hasattr(x, "tolist") else x)
    df_raw['embedded_content'] = df_raw['embedded_content'].tolist()

    if os.path.exists('/opt/airflow'):
        base_path = Path('/opt/airflow/data')
    else:
        base_path = Path(os.getcwd()) / '../data'

    base_path = f"{base_path}/embedded_news"
    os.makedirs(base_path, exist_ok=True)
    parquet_path = f"{base_path}/embedded_news_{execution_date}.parquet" 

    df_raw.to_parquet(parquet_path, index=False) 

    return parquet_path

vectorize_news('2026-01-12')

'/Users/taninudomthanakij/Documents/my-de-portfolio/scripts/../data/embedded_news/embedded_news_2026-01-12.parquet'

In [275]:
a = pd.read_parquet('/Users/taninudomthanakij/Documents/my-de-portfolio/data/embedded_news/embedded_news_2026-01-12.parquet')
a.embedded_content.iloc[0]

array([-2.29319353e-02, -7.83101469e-02,  5.16077392e-02,  9.87057686e-02,
       -1.02972779e-02,  1.81232132e-02,  6.21238053e-02,  5.13026863e-02,
       -1.53876515e-02, -1.37066599e-02, -6.27769576e-03,  2.86273658e-02,
        4.66485089e-03,  3.35200578e-02, -3.60155255e-02,  6.67894538e-03,
        1.07337713e-01, -6.55775666e-02, -7.18824863e-02,  5.32007962e-02,
       -3.75504903e-02, -1.26222208e-01, -2.04248428e-02,  2.31762007e-02,
        8.65184441e-02,  4.69277985e-02, -6.94098473e-02,  5.40264063e-02,
       -3.31671699e-03, -9.29238796e-02, -1.82850547e-02, -4.53238972e-02,
        8.41724500e-02,  3.02147549e-02, -8.81858543e-02, -7.79937953e-02,
       -1.54826147e-02,  1.15647335e-02, -3.45937349e-02, -4.76112217e-02,
        4.99173664e-02,  3.07624005e-02,  3.39095946e-03, -8.54899064e-02,
       -4.92604971e-02, -1.54874502e-02, -9.12956614e-03, -2.67885961e-02,
        2.05751769e-02, -2.61087390e-03, -1.55200036e-02, -2.63462379e-03,
        3.49243544e-02, -

In [283]:
def load_embedded_news(ti):
    try:
        vector_path = ti.xcom_pull(task_ids='vectorize_step')
    except:
        vector_path = ti

    df_news = pd.read_parquet(vector_path)

    df_news['embedded_content'] = df_news['embedded_content'].apply(
        lambda x: x.tolist() if isinstance(x, np.ndarray) else x
    )

    query = text("""
        INSERT INTO fact_news_vector (stg_id, datetime, title, embedding, url)
        VALUES (:id, :published_at, :title, :embedded_content, :url)
        ON CONFLICT (stg_id)
        DO NOTHING;
    """)
    
    engine = get_engine()

    with engine.begin() as conn:
        conn.execute(query, df_news.to_dict('records'))

In [289]:
load_embedded_news('/Users/taninudomthanakij/Documents/my-de-portfolio/data/embedded_news/embedded_news_2026-01-12.parquet')

In [217]:
import yfinance as yf

def get_company_description(ticker_symbol):
    try:
        # สร้าง Ticker Object
        ticker = yf.Ticker(ticker_symbol)
        
        # ดึงข้อมูล info (จะคืนค่าเป็น Dictionary ขนาดใหญ่)
        info = ticker.info
        
        # ดึงเฉพาะคำอธิบายธุรกิจ
        description = info.get('longBusinessSummary')
        
        # ข้อมูลอื่นๆ ที่น่าสนใจสำหรับ stock_dim
        company_name = info.get('longName')
        sector = info.get('sector')
        industry = info.get('industry')
        
        return {
            "name": company_name,
            "description": description,
            "sector": sector,
            "industry": industry
        }
    except Exception as e:
        print(f"Error fetching {ticker_symbol}: {e}")
        return None

# ตัวอย่างการใช้งานกับหุ้น AG (First Majestic Silver)
data = get_company_description("HL")
if data:
    print(f"Company: {data['name']}")
    print(f"Description: {data['description']}...") # ตัดมาโชว์แค่ 200 ตัวอักษร

Company: Hecla Mining Company
Description: Hecla Mining Company, together with its subsidiaries, provides precious and base metals in the United States, Canada, Japan, Korea, and China. The company mines for silver, gold, lead, and zinc concentrates, as well as carbon material containing silver and gold for custom smelters, metal traders, and third-party processors; and doré containing silver and gold. Its flagship project is the Greens Creek mine located on Admiralty Island in southeast Alaska. The company was incorporated in 1891 and is headquartered in Coeur d'Alene, Idaho....


In [218]:
# TO-DO 
# Add company' detail to db and process sentence embedding
# process by AI and insert to table. check
# Try to use semantic search to query information about stock from news.

"Hecla Mining Company, together with its subsidiaries, provides precious and base metals in the United States, Canada, Japan, Korea, and China. The company mines for silver, gold, lead, and zinc concentrates, as well as carbon material containing silver and gold for custom smelters, metal traders, and third-party processors; and doré containing silver and gold. Its flagship project is the Greens Creek mine located on Admiralty Island in southeast Alaska. The company was incorporated in 1891 and is headquartered in Coeur d'Alene, Idaho."

In [212]:
execution_date = '2026-01-12'
fetch_raw_news(execution_date)
b = transform_news_to_vectors(execution_date)

In [None]:
eb = get_engine()

pd.read_sql()


In [300]:
engine = get_engine()
query = 'SELECT * FROM dim_assets'

with engine.connect() as conn:
    df = pd.read_sql(query, conn)

tickers = df['asset_id'].to_list()

description_lst = []
for ticker in tickers:
    ticker = yf.Ticker(ticker)

    info = ticker.info
    description = info.get('longBusinessSummary')

    description_lst.append(description)
    
df['description'] = description_lst

model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(df['description'].tolist())
    
df['embedded_description'] = embeddings.tolist()
df['embedded_description'] = df['embedded_description'].apply(
        lambda x: x.tolist() if isinstance(x, np.ndarray) else x
    )

query = text("""
    INSERT INTO dim_assets (asset_id, asset_name, asset_type, country, description, embedded_description)
    VALUES (:asset_id, :asset_name, :asset_type, :country, :description, :embedded_description)
    ON CONFLICT (asset_id) 
    DO UPDATE SET 
        embedded_description = EXCLUDED.embedded_description;
    """)

engine = get_engine()

with engine.begin() as conn:
    conn.execute(query, df.to_dict('records'))

In [301]:
df

Unnamed: 0,asset_id,asset_name,asset_type,country,description,embedded_description
0,HL,Hecla Mining,Stock,USA,"Hecla Mining Company, together with its subsid...","[-0.028417279943823814, 0.006844936404377222, ..."
1,CDE,Coeur Mining,Stock,USA,"Coeur Mining, Inc. operates as a gold and silv...","[-0.06286340951919556, -0.0464494414627552, 0...."
2,PAAS,Pan American Silver,Stock,USA,Pan American Silver Corp. engages in the explo...,"[-0.008198989555239677, -0.08683352917432785, ..."
3,AG,First Majestic Silver,Stock,USA,First Majestic Silver Corp. engages in the acq...,"[0.0224342942237854, -0.016171323135495186, 0...."
4,EXK,Endeavour Silver,Stock,USA,"Endeavour Silver Corp., a silver mining compan...","[0.006876664701849222, -0.10067670047283173, 0..."
5,FRES.L,Fresnillo plc,Stock,MEX,"Fresnillo plc mines, develops, and produces no...","[-0.05300211161375046, 0.014197456650435925, -..."
6,KGH.WA,KGHM Polska Miedź,Stock,POL,KGHM Polska Miedz S.A. engages in the explorat...,"[-0.08661152422428131, 0.05402582138776779, -0..."
