In [1]:
import pandas as pd, numpy as np
import os, io
from hdfs import InsecureClient
import getpass
import fsspec

In [None]:
# client = InsecureClient('http://<namenode-host>:9870', user=getpass.getuser())
# client = InsecureClient('http://localhost:9870', user='user')

In [None]:
# save_dir = r"C:\Users\user\Documents\python\Samsung\Data"
# os.makedirs(save_dir, exist_ok=True)

In [2]:
fs = fsspec.filesystem(
    'webhdfs',
    host='localhost',
    port=9870,
    user='user'
)


In [3]:
# Danh sách file cần đọc
file_names = [
    'all_company.csv',
    'balance_sheet_all_company.csv',
    'income_statement_all_company.csv',
    'ratio_summary_all_company.csv',
    'cash_flow_all_company.csv',
    'ratio_all_company.csv',
    'stock_price_all_company.csv',
    'info_all_company.csv'
]

# Đọc file vào dict tên -> DataFrame
dataframes = {}

for file_name in file_names:
    file_path = f'/datalake/raw/{file_name}'
    with fs.open(file_path, 'rb') as f:
        if file_name == 'ratio_all_company.csv':
            df = pd.read_csv(f, engine='pyarrow', header=1)
            dataframes[file_name.replace('.csv', '')] = df 
            continue 
        df = pd.read_csv(f, engine='pyarrow')
        dataframes[file_name.replace('.csv', '')] = df 



In [4]:
all_company = dataframes['all_company']
balance_sheet= dataframes['balance_sheet_all_company']
income_statement = dataframes['income_statement_all_company']
ratio_summary= dataframes['ratio_summary_all_company']
cash_flow = dataframes['cash_flow_all_company']
ratio = dataframes['ratio_all_company']
stock_price = dataframes['stock_price_all_company']
info_all_company = dataframes['info_all_company']

In [5]:
def transform_stock_price(df):
    """
    Chuyển đổi định dạng cột 'symbol' thành 'stock_name' trong DataFrame.
    """
    if 'symbol' not in stock_price.columns:
        raise ValueError("DataFrame không chứa cột 'symbol'.")
    df = df.rename(columns={'symbol': "stock_name"})
    df['time'] = pd.to_datetime(df['time'])
    return df

transform_stock_price(stock_price)

Unnamed: 0,time,open,high,low,close,volume,stock_name
0,2015-01-05,3.44,3.51,3.41,3.44,207020,HPG
1,2015-01-06,3.38,3.47,3.38,3.47,260380,HPG
2,2015-01-07,3.47,3.51,3.44,3.44,232770,HPG
3,2015-01-08,3.44,3.47,3.41,3.44,107380,HPG
4,2015-01-09,3.44,3.51,3.41,3.51,409230,HPG
...,...,...,...,...,...,...,...
35761,2025-07-08,19.80,20.40,19.60,20.10,205600,GDA
35762,2025-07-09,20.00,20.30,19.90,20.00,76100,GDA
35763,2025-07-10,20.00,20.00,19.60,19.70,156800,GDA
35764,2025-07-11,19.90,19.90,19.40,19.70,156600,GDA


In [6]:
def transform_description_company(df):
    required_columns = ['symbol', 'organ_name']
    for col in required_columns:
        if col not in df.columns:
            raise ValueError(f"DataFrame không chứa cột '{col}'.")
    df = df.rename(columns={'symbol': 'stock_name', 'organ_name': 'company_name'})
    return df
transform_description_company(all_company)

Unnamed: 0,stock_name,company_name
0,YTC,Công ty Cổ phần Xuất nhập khẩu Y tế Thành phố ...
1,YEG,Công ty Cổ phần Tập đoàn Yeah1
2,YBM,Công ty Cổ phần Khoáng sản Công nghiệp Yên Bái
3,YBC,Công ty Cổ phần Xi măng và Khoáng sản Yên Bái
4,XPH,Công ty Cổ phần Xà phòng Hà Nội
...,...,...
1711,AAS,Công ty Cổ phần Chứng khoán SmartInvest
1712,AAM,Công ty Cổ Phần Thủy Sản MeKong
1713,AAH,Công ty Cổ phần Hợp Nhất
1714,AAA,Công ty Cổ phần Nhựa An Phát Xanh


In [7]:
def transform_info_company(df):
    required_columns = ['symbol', 'id', 'issue_share', 'history', 'company_profile',
                        'icb_name3', 'icb_name2', 'icb_name4', 'financial_ratio_issue_share',
                        'charter_capital']
    for col in required_columns:
        if col not in df.columns:
            raise ValueError(f"DataFrame không chứa cột '{col}'.")
    df = df.drop(columns=['history', 'company_profile', 'id'])
    df = df.rename(columns={'symbol': 'stock_name'})
    return df
transform_info_company(info_all_company)

Unnamed: 0,stock_name,issue_share,icb_name3,icb_name2,icb_name4,financial_ratio_issue_share,charter_capital
0,HPG,6396250200,Kim loại,Tài nguyên Cơ bản,Thép và sản phẩm thép,7675465855,63962502000000
1,HSG,620982309,Kim loại,Tài nguyên Cơ bản,Thép và sản phẩm thép,620982309,6209823090000
2,DGC,379779286,Hóa chất,Hóa chất,"Sản phẩm hóa dầu, Nông dược & Hóa chất khác",379778413,3797792860000
3,GVR,4000000000,Hóa chất,Hóa chất,"Nhựa, cao su & sợi",4000000000,40000000000000
4,VNM,2089955445,Sản xuất thực phẩm,Thực phẩm và đồ uống,Thực phẩm,2089955445,20899554450000
5,FPT,1481330122,Phần mềm & Dịch vụ Máy tính,Công nghệ Thông tin,Phần mềm,1481330122,14813301220000
6,MWG,1479693177,Bán lẻ,Bán lẻ,Phân phối hàng chuyên dụng,1478609048,14796931770000
7,SSI,1973863918,Dịch vụ tài chính,Dịch vụ tài chính,Môi giới chứng khoán,1971872450,19738639180000
8,CTG,5369991748,Ngân hàng,Ngân hàng,Ngân hàng,5369991748,53699917480000
9,BID,7021361917,Ngân hàng,Ngân hàng,Ngân hàng,7021361917,70213619170000


In [8]:
def transform_ratio(df):
    required_columns = ['CP', 'Năm', 'Kỳ', '(Vay NH+DH)/VCSH', 'Nợ/VCSH', 'TSCĐ / Vốn CSH',
                        'Vốn CSH/Vốn điều lệ', 'Vòng quay tài sản', 'Vòng quay TSCĐ',
                        'Số ngày thu tiền bình quân', 'Số ngày tồn kho bình quân',
                        'Số ngày thanh toán bình quân', 'Chu kỳ tiền', 'Vòng quay hàng tồn kho',
                        'Biên EBIT (%)', 'Biên lợi nhuận gộp (%)', 'Biên lợi nhuận ròng (%)',
                        'ROE (%)', 'ROIC (%)', 'ROA (%)', 'EBITDA (Tỷ đồng)', 'EBIT (Tỷ đồng)',
                        'Tỷ suất cổ tức (%)', 'Chỉ số thanh toán hiện thời',
                        'Chỉ số thanh toán tiền mặt', 'Chỉ số thanh toán nhanh',
                        'Khả năng chi trả lãi vay', 'Đòn bẩy tài chính', 'Vốn hóa (Tỷ đồng)',
                        'Số CP lưu hành (Triệu CP)', 'P/E', 'P/B', 'P/S', 'P/Cash Flow',
                        'EPS (VND)', 'BVPS (VND)', 'EV/EBITDA']
    for col in required_columns:
        if col not in df.columns:
            raise ValueError(f"DataFrame không chứa cột '{col}'.")
    df = df.rename(columns={'CP': 'stock_name'})
    return df
transform_ratio(ratio)

Unnamed: 0,stock_name,Năm,Kỳ,(Vay NH+DH)/VCSH,Nợ/VCSH,TSCĐ / Vốn CSH,Vốn CSH/Vốn điều lệ,Vòng quay tài sản,Vòng quay TSCĐ,Số ngày thu tiền bình quân,...,Đòn bẩy tài chính,Vốn hóa (Tỷ đồng),Số CP lưu hành (Triệu CP),P/E,P/B,P/S,P/Cash Flow,EPS (VND),BVPS (VND),EV/EBITDA
0,HPG,2025,1,0.985606,0.939556,0.727461,1.844782,0.668503,1.996491,14.720558,...,1.939556,1.976432e+14,7.675466e+09,15.817610,1.679199,1.357212,64.133449,435.710973,15334.695560,12.285072
1,HPG,2024,4,0.959843,0.958087,0.588137,1.792417,0.673609,1.991800,13.605410,...,1.958087,1.653431e+14,6.396250e+09,13.754011,1.445857,1.190760,25.020437,439.108115,17878.673250,10.948688
2,HPG,2024,3,0.923392,0.891108,0.612598,1.747572,0.687111,1.971378,16.210015,...,1.891108,1.653431e+14,6.396250e+09,13.568751,1.481703,1.191679,18.528919,472.612318,17446.146994,10.877249
3,HPG,2024,2,0.852670,0.901137,0.644285,1.699067,0.692487,1.865096,16.215280,...,1.901137,1.730186e+14,6.396250e+09,15.493216,1.593555,1.298200,18.620329,518.938915,16974.630099,11.287747
4,HPG,2024,1,0.886144,0.911868,0.672344,1.651352,0.666510,1.730895,16.295421,...,1.911868,1.822931e+14,6.396250e+09,19.584237,1.727562,1.479452,16.099007,448.792434,16497.238964,13.168670
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
624,GDA,2023,1,1.715177,2.194384,0.696067,3.077691,1.588017,7.482421,32.197062,...,3.194384,0.000000e+00,1.146915e+08,0.000000,0.000000,0.000000,0.000000,711.662477,30776.915000,0.000000
625,GDA,2022,4,1.547320,1.988379,0.736662,3.007631,1.883315,8.041710,30.007802,...,2.985395,0.000000e+00,1.146915e+08,0.000000,0.000000,0.000000,0.000000,-3480.641779,30067.976717,0.000000
626,GDA,2022,3,0.000000,0.000000,0.678135,3.355696,0.000000,0.000000,0.000000,...,3.097984,0.000000e+00,1.146915e+08,0.000000,0.000000,0.000000,0.000000,-1802.905412,30097.418382,0.000000
627,GDA,2022,2,0.000000,0.000000,0.660150,3.536022,0.000000,0.000000,0.000000,...,3.690120,0.000000e+00,1.146915e+08,0.000000,0.000000,0.000000,0.000000,1097.436134,30097.418382,0.000000


In [9]:
stock_price = transform_stock_price(stock_price)
all_company = transform_description_company(all_company)
info_all_company = transform_info_company(info_all_company)
ratio = transform_ratio(ratio)

In [10]:
print(all_company.columns)
print(ratio.columns)
print(stock_price.columns)
print(info_all_company.columns)

Index(['stock_name', 'company_name'], dtype='object')
Index(['stock_name', 'Năm', 'Kỳ', '(Vay NH+DH)/VCSH', 'Nợ/VCSH',
       'TSCĐ / Vốn CSH', 'Vốn CSH/Vốn điều lệ', 'Vòng quay tài sản',
       'Vòng quay TSCĐ', 'Số ngày thu tiền bình quân',
       'Số ngày tồn kho bình quân', 'Số ngày thanh toán bình quân',
       'Chu kỳ tiền', 'Vòng quay hàng tồn kho', 'Biên EBIT (%)',
       'Biên lợi nhuận gộp (%)', 'Biên lợi nhuận ròng (%)', 'ROE (%)',
       'ROIC (%)', 'ROA (%)', 'EBITDA (Tỷ đồng)', 'EBIT (Tỷ đồng)',
       'Tỷ suất cổ tức (%)', 'Chỉ số thanh toán hiện thời',
       'Chỉ số thanh toán tiền mặt', 'Chỉ số thanh toán nhanh',
       'Khả năng chi trả lãi vay', 'Đòn bẩy tài chính', 'Vốn hóa (Tỷ đồng)',
       'Số CP lưu hành (Triệu CP)', 'P/E', 'P/B', 'P/S', 'P/Cash Flow',
       'EPS (VND)', 'BVPS (VND)', 'EV/EBITDA'],
      dtype='object')
Index(['time', 'open', 'high', 'low', 'close', 'volume', 'stock_name'], dtype='object')
Index(['stock_name', 'issue_share', 'icb_name3', 'icb_

In [11]:
dim_company = all_company.merge(info_all_company, on='stock_name', how='inner')[['stock_name', 'company_name', 'icb_name2', 'icb_name3', 'icb_name4',
      'issue_share', 'charter_capital', 'financial_ratio_issue_share']].drop_duplicates().reset_index(drop=True)
dim_company['company_key'] = dim_company.index + 1

In [54]:
with fs.open('/datalake/cleaned/dim_company.parquet', 'wb') as f:
    dim_company.to_parquet(f, engine='pyarrow', index=False)

In [32]:
stock_price['time'] = pd.to_datetime(stock_price['time'])
start_date = pd.to_datetime('2005-01-01', format='%Y-%m-%d')
end_date   = stock_price['time'].max()

# 2) Sinh dãy ngày liên tục (ở đây là business days, nếu bạn chỉ quan tâm ngày giao dịch)
all_dates = pd.date_range(start=start_date, end=end_date, freq='D')

In [33]:
dim_date = (
    pd.DataFrame({'full_date': all_dates})
    .assign(
        date_key=lambda x: x['full_date'].dt.strftime('%Y%m%d').astype(int),
        year=lambda x: x['full_date'].dt.year,
        quarter=lambda x: x['full_date'].dt.quarter,
        month=lambda x: x['full_date'].dt.month,
        day=lambda x: x['full_date'].dt.day,
        is_business_day=lambda x: ~x['full_date'].dt.weekday.isin([5,6])
    )
    [['date_key','full_date','year','quarter','month','day','is_business_day']]
    .drop_duplicates()
    .reset_index(drop=True)
)

In [34]:
with fs.open('/datalake/cleaned/dim_date.parquet', 'wb') as f:
    dim_date.to_parquet(f, engine='pyarrow', index=False)

In [35]:
fact_stock_price = (
    stock_price
    .merge(dim_company[['stock_name','company_key']], on='stock_name')
    .merge(dim_date[['full_date','date_key']], left_on='time', right_on='full_date')
    [['company_key','date_key','open','high','low','close','volume']]
)

In [36]:
with fs.open('/datalake/cleaned/fact_stock_price.parquet', 'wb') as f:
    fact_stock_price.to_parquet(f, engine='pyarrow', index=False)

In [37]:
def make_date_key_from_year_period(row):
    # row['Năm'], row['Kỳ'] -> month end: Q1->03, Q2->06, Q3->09, Q4->12
    year = int(row['Năm'])
    period_month = {1:3,2:6,3:9,4:12}.get(int(row['Kỳ']), 12)
    return int(f"{year:04d}{period_month:02d}01")  # use first day of quarter

fact_ratio_temp = ratio.copy()

column_rename_dict = {
    "(Vay NH+DH)/VCSH": "debt_equity_shortlong_ratio",
    "Nợ/VCSH": "debt_equity_ratio",
    "TSCĐ / Vốn CSH": "fixed_assets_equity_ratio",
    "Vốn CSH/Vốn điều lệ": "equity_charter_capital_ratio",
    "Vòng quay tài sản": "asset_turnover",
    "Vòng quay TSCĐ": "fixed_asset_turnover",
    "Số ngày thu tiền bình quân": "avg_receivables_days",
    "Số ngày tồn kho bình quân": "avg_inventory_days",
    "Số ngày thanh toán bình quân": "avg_payable_days",
    "Chu kỳ tiền": "cash_conversion_cycle",
    "Vòng quay hàng tồn kho": "inventory_turnover",
    "Biên EBIT (%)": "ebit_margin_pct",
    "Biên lợi nhuận gộp (%)": "gross_profit_margin_pct",
    "Biên lợi nhuận ròng (%)": "net_profit_margin_pct",
    "ROE (%)": "roe_pct",
    "ROIC (%)": "roic_pct",
    "ROA (%)": "roa_pct",
    "EBITDA (Tỷ đồng)": "ebitda_bil_vnd",
    "EBIT (Tỷ đồng)": "ebit_bil_vnd",
    "Tỷ suất cổ tức (%)": "dividend_yield_pct",
    "Chỉ số thanh toán hiện thời": "current_ratio",
    "Chỉ số thanh toán tiền mặt": "cash_ratio",
    "Chỉ số thanh toán nhanh": "quick_ratio",
    "Khả năng chi trả lãi vay": "interest_coverage_ratio",
    "Đòn bẩy tài chính": "financial_leverage",
    "Vốn hóa (Tỷ đồng)": "market_cap_bil_vnd",
    "Số CP lưu hành (Triệu CP)": "shares_outstanding_mil",
    "P/E": "pe_ratio",
    "P/B": "pb_ratio",
    "P/S": "ps_ratio",
    "P/Cash Flow": "pcashflow_ratio",
    "EPS (VND)": "eps_vnd",
    "BVPS (VND)": "bvps_vnd",
    "EV/EBITDA": "ev_ebitda"
}

fact_ratio_temp.rename(columns=column_rename_dict, inplace=True)

# tạo full_date từ Năm và Kỳ
fact_ratio_temp['date_key'] = fact_ratio_temp.apply(make_date_key_from_year_period, axis=1)
# merge company_key
fact_ratio_temp = (
    fact_ratio_temp
    .merge(dim_company[['stock_name','company_key']], on='stock_name')
    .merge(dim_date[['date_key','full_date']], on='date_key', how='left')
)
# select measures
measures = ['debt_equity_shortlong_ratio',
       'debt_equity_ratio', 'fixed_assets_equity_ratio',
       'equity_charter_capital_ratio', 'asset_turnover',
       'fixed_asset_turnover', 'avg_receivables_days', 'avg_inventory_days',
       'avg_payable_days', 'cash_conversion_cycle', 'inventory_turnover',
       'ebit_margin_pct', 'gross_profit_margin_pct', 'net_profit_margin_pct',
       'roe_pct', 'roic_pct', 'roa_pct', 'ebitda_bil_vnd', 'ebit_bil_vnd',
       'dividend_yield_pct', 'current_ratio', 'cash_ratio', 'quick_ratio',
       'interest_coverage_ratio', 'financial_leverage', 'market_cap_bil_vnd',
       'shares_outstanding_mil', 'pe_ratio', 'pb_ratio', 'ps_ratio',
       'pcashflow_ratio', 'eps_vnd', 'bvps_vnd', 'ev_ebitda']
fact_ratio = fact_ratio_temp[['company_key','date_key'] + measures]


In [38]:
with fs.open('/datalake/cleaned/fact_ratio.parquet', 'wb') as f:
    fact_ratio.to_parquet(f, engine='pyarrow', index=False)