In [1]:
from dotenv import load_dotenv
import os

# Tải các biến từ tệp .env
load_dotenv()

# Lấy email và password từ tệp .env
email = os.getenv("EMAIL")
password = os.getenv("PASSWORD")




In [15]:
from concurrent.futures import ThreadPoolExecutor
import undetected_chromedriver as uc
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select
import time
import pandas as pd
from typing import Optional, List, Dict

class Ratio:
    
    ratio_target = [
        "Nhóm chỉ số Định giá",
        "Nhóm chỉ số Sinh lợi",
        "Nhóm chỉ số Tăng trưởng",
        "Nhóm chỉ số thanh khoản",
        "Nhóm chỉ số Hiệu quả hoạt động",
        "Nhóm chỉ số Đòn bẩy tài chính",
        "Nhóm chỉ số Dòng tiền",
        "Cơ cấu Chi phí",
        "Cơ cấu Tài sản ngắn hạn",
        "Cơ cấu Tài sản dài hạn"
    ]

    def __init__(self):
        # Không còn cần ticker_url trong init
        chrome_options = uc.ChromeOptions()
        self.driver = uc.Chrome(driver_executable_path=ChromeDriverManager().install(), options=chrome_options)
        # Điều hướng đến trang chủ Vietstock
        self.driver.get("https://vietstock.vn")

    def load_page(self, ticker: str):
        self.url = f"https://finance.vietstock.vn/{ticker}/tai-chinh.htm?tab=CSTC"
        self.driver.get(self.url)

    def select_period(self, period: str = "5 Kỳ"):
        try:
            period_dropdown = Select(self.driver.find_element(By.NAME, "period"))
            period_dropdown.select_by_visible_text(period)
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, "tr.CDKT-row-white-color"))
            )
        except Exception as e:
            print(f"Error selecting period: {e}")

    def period_data(self) -> List[str]:
        try:
            # Đảm bảo rằng bạn tìm phần tử lại trước khi lấy dữ liệu
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, "table#tbl-data-CSTC thead th.text-center"))
            )
            
            # Tìm lại các phần tử sau mỗi lần thay đổi DOM
            periods = self.driver.find_elements(By.CSS_SELECTOR, "table#tbl-data-CSTC thead th.text-center")
            periods_list = [period.text for period in periods if period.text]
            return periods_list
        except Exception as e:
            print(f"Error occurred while fetching periods: {e}")
            return []

    def financial_data(self) -> Dict:
        financial_data = {}
        rows = self.driver.find_elements(By.CSS_SELECTOR, "tr.CDKT-row-white-color, tr.CDKT-header-blue-color.p1")
        current_group = None
        for row in rows:
            try:
                group_elements = row.find_elements(By.CSS_SELECTOR, "tr.CDKT-header-blue-color.p1 td.td-stockcode div.report-norm-name")
                if len(group_elements) > 0:
                    current_group = group_elements[0].text
                    continue

                indicator_name = row.find_element(By.CSS_SELECTOR, "div.report-norm-name span").text
                values = row.find_elements(By.CSS_SELECTOR, "td.text-right")
                values_list = [value.text for value in values]
                financial_data[(current_group, indicator_name)] = values_list
            except Exception as e:
                print(f"Error occurred: {e}")
        return financial_data

    def create_dataframe(self, periods_list: List[str], financial_data: Dict) -> pd.DataFrame:
        multi_index = pd.MultiIndex.from_tuples(list(financial_data.keys()), names=['Group', 'Indicator'])
        df = pd.DataFrame(list(financial_data.values()), index=multi_index, columns=periods_list)
        return df

    # Hàm này thực hiện crawling cho từng mã ticker, lấy dữ liệu và trả về ticker cùng dataframe
    def crawl_ticker_thread(self, ticker: str, period: str):
        # Bước 1: Tải trang
        self.load_page(ticker)

        # Chờ trang tải xong bằng cách đợi một phần tử cụ thể xuất hiện, ví dụ: tiêu đề bảng dữ liệu
        WebDriverWait(self.driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "table#tbl-data-CSTC"))
        )
        
        # Bước 2: Chọn kỳ (period)
        self.select_period(period)
        
        # Chờ 3 giây để đảm bảo kỳ đã được chọn và dữ liệu đã được tải lại
        time.sleep(3)  # Nếu bạn muốn chờ tĩnh
        
        # Bước 3: Lấy danh sách các kỳ
        periods_list = self.period_data()
        
        # Chờ thêm 2 giây để đảm bảo dữ liệu đã sẵn sàng
        time.sleep(2)

        # Bước 4: Lấy dữ liệu tài chính
        financial_data = self.financial_data()

        # Trả về ticker và dataframe đã tạo
        return ticker, self.create_dataframe(periods_list, financial_data)

    def crawl_tickers(self, tickers: List[str], period: str = "5 Kỳ"):
        """
        Crawl dữ liệu tuần tự cho nhiều mã ticker.
        """
        all_data = {}
        
        # Thực hiện crawl lần lượt từng mã ticker
        for ticker in tickers:
            print(f"Crawling data for ticker: {ticker}")
            ticker, df = self.crawl_ticker_thread(ticker, period)  # Lấy dữ liệu cho từng mã ticker
            all_data[ticker] = df  # Lưu kết quả vào dictionary
        
        return all_data

    def export_to_excel(self, data_dict: Dict[str, pd.DataFrame], file_name: str):
        with pd.ExcelWriter(file_name, engine='xlsxwriter') as writer:
            for ticker, df in data_dict.items():
                df.to_excel(writer, sheet_name=ticker)
        print(f"Data exported to {file_name}")

    def login(self,
            email: Optional[str] = None,
            password: Optional[str] = None
        ):
 
        # Step 1: 
        # Attempt to click on the first login button
        try:
            self.find_and_interact_button(By.XPATH, locator="//a[contains(@class, 'title-link btnlogin')]", button_name="login_first")
        except Exception:
            # If the first login button is not found, try clicking on the second login button
            self.find_and_interact_button(By.ID, locator="btn-request-call-login", button_name="login_second")

        # Click on the Gmail login button
        self.find_and_interact_button(By.XPATH, locator="//a[contains(@href, 'LoginGooglePlus')]", wait_time=10, button_name="gmail_login")

        # Enter email address in the email input field
        self.find_and_interact_button(By.XPATH, locator="//input[@type='email']", wait_time=10, button_name="email_input", value=email)

        # Click on the "Next" button
        self.find_and_interact_button(By.XPATH, locator="//span[text()='Tiếp theo']", wait_time=10, button_name="next_button")

        # Enter password in the password input field
        self.find_and_interact_button(By.XPATH, locator="//input[@type='password']", button_name="password_input", value=password)

        # Click on the "Next" button again
        self.find_and_interact_button(By.XPATH, locator="//span[text()='Tiếp theo']", wait_time=10, button_name="next_button")

        time.sleep(2)

    def find_and_interact_button(
        self,
        query_method: By,
        locator: str = None,
        value: str = None,
        wait_time: int = 30,
        button_name: str = None,
    ):
        if locator is not None: 
            try:
                WebDriverWait(self.driver, wait_time).until(
                    EC.element_to_be_clickable((query_method, locator))
                )
                button = self.driver.find_element(query_method, locator)  
                if button.is_displayed() and button.is_enabled() and value is None:
                    button.click()
                    print(f"{button_name} is clicked")
                elif button.is_displayed() and button.is_enabled() and value is not None:
                    button.send_keys(value)
                    print(f"{button_name} is clicked")
                else:
                    print(f"{button_name} not found, assuming it's already clicked and continuing")
                
            except TimeoutError as e:
                raise TimeoutError(f"Element not found or not clickable: {locator}") from e
        else: 
            raise ValueError(
                "locator must be passed in "
            )
        

In [3]:
from ratio import Ratio

In [16]:
ratio = Ratio()


In [17]:
ratio.login(email=email, password=password)

login_second is clicked
gmail_login is clicked
email_input is clicked
next_button is clicked
password_input is clicked
next_button is clicked


In [5]:
tickers = ['HDG', 'FPT', 'MSN', "VCB", "MSB", "FTS", "CTG", "VPB", "HPG", "DGC"]  # Ví dụ danh sách mã cổ phiếu
data = ratio.crawl_tickers(tickers, period="20 Kỳ")

Crawling completed for MSB
Crawling completed for FPT
Crawling completed for VCB
Crawling completed for HDG
Crawling completed for MSN
Crawling completed for FTS
Crawling completed for HPG
Crawling completed for VPB
Crawling completed for CTG
Crawling completed for DGC


In [7]:
ratio.export_to_excel(data, "financial_data(1).xlsx")

Data exported to financial_data(1).xlsx


In [53]:
pip install xlsxwriter

Collecting xlsxwriter
  Downloading XlsxWriter-3.2.0-py3-none-any.whl.metadata (2.6 kB)
Downloading XlsxWriter-3.2.0-py3-none-any.whl (159 kB)
   ---------------------------------------- 0.0/159.9 kB ? eta -:--:--
   -- ------------------------------------- 10.2/159.9 kB ? eta -:--:--
   -- ------------------------------------- 10.2/159.9 kB ? eta -:--:--
   -------------- ------------------------ 61.4/159.9 kB 465.5 kB/s eta 0:00:01
   ---------------------- ---------------- 92.2/159.9 kB 476.3 kB/s eta 0:00:01
   -------------------------- ----------- 112.6/159.9 kB 544.7 kB/s eta 0:00:01
   ---------------------------------- --- 143.4/159.9 kB 532.5 kB/s eta 0:00:01
   -------------------------------------- 159.9/159.9 kB 530.8 kB/s eta 0:00:00
Installing collected packages: xlsxwriter
Successfully installed xlsxwriter-3.2.0
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip
