### Login with account

In [1]:
from selenium import webdriver
from time import sleep
from selenium.webdriver.common.by import By
from dotenv import load_dotenv
import os

load_dotenv()

driver = webdriver.Chrome()

driver.get("https://www.facebook.com/")

sleep(5)
txtUser = driver.find_element(By.ID, "email")
txtUser.send_keys(os.getenv("user_email"))

txtPass = driver.find_element(By.ID, "pass")
txtPass.send_keys(os.getenv("user_password"))

login = driver.find_element(By.NAME, "login")
login.click()

sleep(3)

### Get login cookie

In [None]:
import json

cookies = driver.get_cookies()

with open("facebook_cookies.json", "w") as file:
    json.dump(cookies, file)

driver.quit()

### Login with cookie

In [1]:
from selenium import webdriver
import json
import time
from selenium.webdriver.common.by import By

with open("facebook_cookies.json", "r") as file:
    cookies = json.load(file)

driver = webdriver.Chrome()


driver.get("https://www.facebook.com/groups/281184089051767")

time.sleep(2)

for cookie in cookies:
    driver.add_cookie(cookie)

time.sleep(2)
driver.refresh()
time.sleep(5)

try:
    driver.find_element(By.CSS_SELECTOR, "div[aria-label='Search']")
    print("Đăng nhập thành công!")
except:
    print("Đăng nhập thất bại!")

# time.sleep(3)
# driver.quit()

Đăng nhập thành công!


### Remove old log

In [6]:
import os
import logging

log_file = 'facebook_scraper.log'
if os.path.exists(log_file):
    # Close all handlers to release the file
    for handler in logging.root.handlers[:]:
        handler.close()
        logging.root.removeHandler(handler)

    # Now try to remove the file
    try:
        os.remove(log_file)
        print(f"Successfully removed {log_file} log")
    except PermissionError:
        print(
            f"Could not remove {log_file} - file is still in use. Will create a new log file.")

# Reconfigure logging after removing handlers
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w',
    filename='facebook_scraper.log'
)
logger = logging.getLogger(__name__)

Successfully removed facebook_scraper.log log


In [5]:
import logging
logging.shutdown()

## FB CSV

In [None]:
import re
import json
import os
import time
import random
import logging
import hashlib
import csv
from dotenv import load_dotenv
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from webdriver_manager.chrome import ChromeDriverManager

load_dotenv()

class FacebookScraperLogger:
    @staticmethod
    def setup():
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s - %(levelname)s - %(message)s",
            handlers=[
                logging.FileHandler("facebook_scraper.log", encoding='utf-8'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger("FacebookGroupScraper")

class BrowserManager:
    @staticmethod
    def get_random_user_agent():
        user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0 Safari/537.36"
        ]
        return random.choice(user_agents)

    @staticmethod
    def create_browser(headless=False):
        options = Options()
        if headless:
            options.add_argument("--headless")
        options.add_argument("--disable-notifications")
        options.add_argument("--no-sandbox")
        options.add_argument("--disable-dev-shm-usage")
        options.add_argument("--disable-blink-features=AutomationControlled")
        options.add_argument("start-maximized")
        options.add_argument(f"user-agent={BrowserManager.get_random_user_agent()}")
        return webdriver.Chrome(options=options)

class FacebookGroupScraper:
    def __init__(self, headless, cookies_file, config_file):
        self.logger = FacebookScraperLogger.setup()
        self.driver = BrowserManager.create_browser(headless)
        self.cookies_file = cookies_file
        self.districts = []
        self.wards = {}
        self.streets = {}
        self.amenity_patterns = {}
        self.load_location_config(config_file)
        self.logger.info("Facebook group scraper initialized")

    def load_location_config(self, config_file):
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)
            self.districts = config.get('districts', [])
            self.wards = config.get('wards', {})
            self.amenity_patterns = config.get('amenity_patterns', {})
            self.logger.info(f"Loaded {len(self.districts)} districts, {len(self.wards)} ward mappings")
        except Exception as e:
            self.logger.error(f"Error loading config file: {e}")
            self.districts, self.wards, self.amenity_patterns = [], {}, {}

    def generate_content_hash(self, content):
        if not content:
            return ""
        normalized_content = ' '.join(content.lower().split())
        return hashlib.md5(normalized_content.encode('utf-8')).hexdigest()

    def load_cookies(self):
        try:
            with open(self.cookies_file, "r") as file:
                cookies = json.load(file)
            for cookie in cookies:
                self.driver.add_cookie(cookie)
            self.logger.info(f"Cookies loaded from {self.cookies_file}")
        except (FileNotFoundError, json.JSONDecodeError) as e:
            self.logger.error(f"Cookie file error: {e}")

    def login(self):
        self.logger.info("Logging into Facebook...")
        self.driver.get("https://www.facebook.com/")
        WebDriverWait(self.driver, 5).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
        if self.cookies_file:
            self.load_cookies()
            self.driver.refresh()
        return self.verify_login_status()

    def verify_login_status(self):
        try:
            WebDriverWait(self.driver, 5).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='Search Facebook']"))
            )
            self.logger.info("Login successful")
            return True
        except (TimeoutException, NoSuchElementException):
            self.logger.error("Login failed - search bar not found")
            return False

    def expand_post_content(self, post_element):
        try:
            see_more_buttons = post_element.find_elements(
                By.XPATH, ".//div[@role='button' and contains(text(), 'See more') or contains(text(), 'Xem thêm')]")
            for btn in see_more_buttons:
                self.driver.execute_script("arguments[0].click();", btn)
                time.sleep(0.5)
        except Exception as e:
            self.logger.warning(f"Failed to expand post: {e}")

    def extract_post_date(self, post_element):
        try:
            span_elem = post_element.find_element(By.CSS_SELECTOR,
                "span.html-span.xdj266r.x11i5rnm.xat24cr.x1mh8g0r.xexx8yu.x4uap5.x18d9i69.xkhd6sd.x1hl2dhg.x16tdsg8.x1vvkbs.x4k7w5x.x1h91t0o.x1h9r5lt.x1jfb8zj.xv2umb2.x1beo9mf.xaigb6o.x12ejxvf.x3igimt.xarpa2k.xedcshv.x1lytzrv.x1t2pt76.x7ja8zs.x1qrby5j"
            )
            self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", span_elem)
            ActionChains(self.driver).move_to_element(span_elem).perform()
            WebDriverWait(self.driver, 5).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, "div.x11i5rnm.x1mh8g0r.xexx8yu.x4uap5.x18d9i69.xkhd6sd.x78zum5.xjpr12u.xr9ek0c.x3ieub6.x6s0dn4")))
            date_tooltip = self.driver.find_element(By.CSS_SELECTOR, "div.x11i5rnm.x1mh8g0r.xexx8yu.x4uap5.x18d9i69.xkhd6sd.x78zum5.xjpr12u.xr9ek0c.x3ieub6.x6s0dn4")
            return self.format_date(date_tooltip.text.strip())
        except Exception as e:
            self.logger.warning(f"Failed to extract date: {e}")
            return ""

    def format_date(self, date_string):
        try:
            if 'at' not in date_string:
                return date_string
            date_part, time_part = date_string.split(' at ')
            date_words = date_part.split()
            day = date_words[-3].zfill(2)
            month_map = {
                'January': '01', 'February': '02', 'March': '03', 'April': '04',
                'May': '05', 'June': '06', 'July': '07', 'August': '08',
                'September': '09', 'October': '10', 'November': '11', 'December': '12'
            }
            month = month_map.get(date_words[-2], '01')
            year = date_words[-1]
            return f"{year}-{month}-{day} {time_part}:00"
        except Exception as e:
            self.logger.warning(f"Failed to format date '{date_string}': {e}")
            return date_string

    def extract_post_content(self, post_element):
        """Extract post content using multiple fallback methods."""
        selectors = [
            ".//div[@data-ad-rendering-role='story_message']",
            ".//div[contains(@class, 'x6s0dn4') and contains(@class, 'xh8yej3')]",
            ".//div[@class='xdj266r x11i5rnm xat24cr x1mh8g0r x1vvkbs x126k92a']"
        ]

        for selector in selectors:
            try:
                content_element = post_element.find_element(By.XPATH, selector)
                return content_element.text
            except NoSuchElementException:
                continue

        self.logger.error("No content extracted with any selector")
        return ""

    def _parse_price(self, content: str) -> int:
        try:
            content_lower = content.lower()
            
            match = re.search(r'(\d+[.,]?\d*)\s*(triệu|tr|tỷ|ty|trieu)\b', content_lower)
            if match:
                value = float(match.group(1).replace(',', '.'))
                unit = match.group(2)
                return int(value * (1_000_000_000 if unit in ['tỷ', 'ty'] else 1_000_000))
            
            match = re.search(r'(\d+)tr(\d)\b', content_lower)
            if match:
                value = float(f"{match.group(1)}.{match.group(2)}")
                return int(value * 1_000_000)
        except (ValueError, AttributeError):
            self.logger.warning("Could not parse price")
            return 0
        return 0

    def _parse_location(self, content: str) -> tuple[str, str]:
        if not content or not self.districts:
            return "", ""
        content_lower = content.lower()
        detected_district = next((d for d in self.districts if re.search(r"\b" + re.escape(d.lower()) + r"\b", content_lower)), "")
        detected_ward = ""
        if detected_district and detected_district in self.wards:
            detected_ward = next((w for w in self.wards[detected_district] if re.search(r"\b" + re.escape(w.lower()) + r"\b", content_lower)), "")
        return detected_district, detected_ward

    def _parse_amenities(self, content: str) -> str:
        if not content or not self.amenity_patterns:
            return ""
        content_lower = content.lower()
        amenities = {label for label, pattern in self.amenity_patterns.items() if re.search(pattern, content_lower, re.IGNORECASE)}
        return ", ".join(sorted(amenities))

    def _parse_area(self, content: str) -> str:
        if not content:
            return ""
        matches = re.findall(r'(\d+(?:\.\d+)?)\s*(?:m2|m²|met vuong)\b', content, re.IGNORECASE)
        return float(matches[0]) if matches else ""

    def _parse_address(self, content: str, found_district: str, found_ward: str) -> str:
        if not content:
            return ""
        
        # Check explicit address patterns first
        patterns = [r'(?:địa chỉ|đc|dc|tại|ở)[\s:]+(\d*\s*[^\n;]+)']
        for pattern in patterns:
            matches = re.findall(pattern, content, re.IGNORECASE)
            for match in matches:
                address = match.strip()
                if address.lower().startswith("là "):
                    address = address[3:].strip()

                for district, streets in self.streets.items():
                    for street in streets:
                        if re.search(r'\b' + re.escape(street) + r'\b', address, re.IGNORECASE):

                            street_match = re.search(r'(\d*\s*' + re.escape(street) + r'\s*\d*)', address, re.IGNORECASE)
                            return street_match.group(0).strip() if street_match else street
                return address  

        for district, streets in self.streets.items():
            for street in streets:
                street_match = re.search(r'(\d*\s*' + re.escape(street) + r'\s*\d*)', content, re.IGNORECASE)
                if street_match:
                    return street_match.group(0).strip()
        
        return ""

    def _parse_contact(self, content: str) -> str:
        if not content:
            return ""
        pattern = r'\b(?:0|\+84)\d{1,2}[\s.-]?\d{3,4}[\s.-]?\d{3,4}\b'
        matches = re.findall(pattern, content, re.IGNORECASE)
        if not matches:
            return ""
        contact = matches[0].replace('O', '0').replace('o', '0')
        contact = re.sub(r'[^\d+]', '', contact)
        if contact.startswith('+84') and 9 <= len(contact[3:]) <= 11:
            return contact
        if contact.startswith('0') and 9 <= len(contact) <= 11:
            return contact
        if contact.startswith('84') and 9 <= len(contact[2:]) <= 11:
            return "0" + contact[2:]
        if 9 <= len(contact) <= 10:
            return "0" + contact
        return ""

    def parse_property_details(self, content):
        if not content:
            return {
                "area": "", "district": "", "ward": "", "address": "",
                "amenities": "", "price": 0, "contact": ""
            }
        price = self._parse_price(content)
        district, ward = self._parse_location(content)
        amenities = self._parse_amenities(content)
        area = self._parse_area(content)
        address = self._parse_address(content, district, ward)
        contact = self._parse_contact(content)
        return {
            "area": area, "district": district, "ward": ward, "address": address,
            "amenities": amenities, "price": price, "contact": contact
        }

    def load_existing_csv_data(self, csv_file_path):
        existing_posts = []
        content_hashes = set()
        if os.path.exists(csv_file_path):
            try:
                with open(csv_file_path, 'r', encoding='utf-8', newline='') as f:
                    csv_reader = csv.DictReader(f)
                    for row in csv_reader:
                        existing_posts.append(row)
                        content_hashes.add(row.get("postID", ""))
            except Exception as e:
                self.logger.error(f"Error loading CSV: {e}")
        return existing_posts, content_hashes

    def scrape_group_posts(self, group_url, max_posts, csv_file_path):
        self.logger.info(f"Scraping group: {group_url}")
        self.driver.get(group_url)
        try:
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, "div.x1yztbdb.xh8yej3")))
        except TimeoutException:
            self.logger.error("Posts did not load")
            return 0

        csv_columns = ["postID", "postDate", "content", "area", "district", "ward", "address", "amenities", "price", "contact"]
        all_posts, content_hashes = self.load_existing_csv_data(csv_file_path)
        posts_scraped = 0

        while posts_scraped < max_posts:
            post_elements = self.driver.find_elements(By.CSS_SELECTOR, "div.x1yztbdb.xh8yej3")
            new_posts = 0

            for post in post_elements:
                if posts_scraped >= max_posts:
                    break
                try:
                    self.driver.execute_script("arguments[0].scrollIntoView(true);", post)
                    self.expand_post_content(post)
                    content = self.extract_post_content(post)
                    content_hash = self.generate_content_hash(content)
                    if content_hash in content_hashes:
                        continue
                    content_hashes.add(content_hash)
                    post_date = self.extract_post_date(post)
                    property_details = self.parse_property_details(content)
                    all_posts.append({
                        "postID": content_hash, "postDate": post_date, "content": content,
                        **property_details
                    })
                    posts_scraped += 1
                    new_posts += 1
                    self.logger.info(f"Scraped post {posts_scraped}/{max_posts}")
                    time.sleep(random.uniform(1, 2))
                except Exception as e:
                    self.logger.warning(f"Error scraping post: {e}")
                    continue

            if not new_posts:
                break
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            try:
                WebDriverWait(self.driver, 5).until(
                    lambda d: len(d.find_elements(By.CSS_SELECTOR, "div.x1yztbdb.x1n2onr6.xh8yej3.x1ja2u2z")) > len(post_elements))
            except TimeoutException:
                break

        try:
            with open(csv_file_path, 'w', encoding='utf-8', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=csv_columns)
                writer.writeheader()
                writer.writerows(all_posts)
            self.logger.info(f"Saved {len(all_posts)} posts to {csv_file_path}")
        except Exception as e:
            self.logger.error(f"Error saving CSV: {e}")

        return posts_scraped

    def close(self):
        try:
            self.driver.quit()
            self.logger.info("Browser closed")
        except Exception:
            self.logger.info("No browser instance to close")

def main():
    headless = False
    cookies_file = "facebook_cookies.json"
    config_file = "config.json"
    max_posts = 10
    csv_file_path = 'scrapData.csv'
    groups = ["https://www.facebook.com/groups/281184089051767"]

    scraper = FacebookGroupScraper(headless, cookies_file, config_file)
    try:
        if not scraper.login():
            logging.error("Login failed")
            return
        for group_url in groups:
            posts_scraped = scraper.scrape_group_posts(group_url, max_posts, csv_file_path)
            scraper.logger.info(f"Scraped {posts_scraped} posts from {group_url}")
    except Exception as e:
        logging.error(f"Script error: {e}")
    finally:
        scraper.close()

if __name__ == "__main__":
    main()

## Import FBdata to dtbs

In [None]:
import re
import json
import os
import time
import random
import logging
import hashlib
import csv
import pandas as pd
import mysql.connector
from mysql.connector import Error
from dotenv import load_dotenv
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w',
    filename='facebook_scraper.log'
)
logger = logging.getLogger(__name__)


def load_location_config(config_file):
    """Load district, ward, and amenities data from config file."""
    with open(config_file, 'r', encoding='utf-8') as f:
        config = json.load(f)

    districts = config.get('districts', [])
    wards = config.get('wards', [])
    amenities_keywords = config.get('amenities_keywords', [])

    logger.info(
        f"Loaded {len(districts)} districts, {len(wards)} wards, and {len(amenities_keywords)} amenities from config")

    return districts, wards, amenities_keywords


# Load location configuration
try:
    districts, wards, amenities_keywords = load_location_config("config.json")
except Exception as e:
    logger.error(f"Failed to load config: {str(e)}")
    # Default empty lists if config fails to load
    districts, wards, amenities_keywords = [], [], []


def generate_content_hash(content):
    """Generate a hash for post content to detect duplicates."""
    if not content:
        return ""

    # Normalize content by removing extra whitespace and lowercasing
    normalized_content = ' '.join(content.lower().split())
    return hashlib.md5(normalized_content.encode('utf-8')).hexdigest()


def setup_driver():
    """Set up and configure the Chrome WebDriver."""
    try:
        chrome_options = Options()
        # Uncomment the following line to run headless
        # chrome_options.add_argument('--headless')
        chrome_options.add_argument('--disable-notifications')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')

        driver = webdriver.Chrome(options=chrome_options)
        return driver
    except Exception as e:
        logger.error(f"Failed to set up WebDriver: {str(e)}")
        raise


def login_facebook(driver, cookies_file):
    """Login to Facebook using saved cookies."""
    try:
        # First load the Facebook site
        driver.get("https://www.facebook.com")
        time.sleep(3)

        # Check if cookies file exists
        if not os.path.exists(cookies_file):
            logger.error(f"Cookies file not found: {cookies_file}")
            return False

        # Load cookies from file
        try:
            with open(cookies_file, "r") as file:
                cookies = json.load(file)

            # Add cookies to the driver
            for cookie in cookies:
                driver.add_cookie(cookie)

            # Refresh to apply cookies
            driver.refresh()
            time.sleep(5)
        except json.JSONDecodeError:
            logger.error("Invalid JSON in cookies file")
            return False
        except Exception as e:
            logger.error(f"Error loading cookies: {str(e)}")
            return False

        # Verify login status
        return verify_login_status(driver)
    except Exception as e:
        logger.error(f"Login error: {str(e)}")
        return False


def verify_login_status(driver):
    """Verify if the login was successful by checking for key elements."""
    try:
        success = False
        selectors = [
            (By.XPATH,
             "//div[@aria-label='Search Facebook' or @aria-label='Search']"),
            (By.CSS_SELECTOR, "input[placeholder='Search Facebook']"),
            (By.CSS_SELECTOR, "div[aria-label='Search']"),
            (By.XPATH,
             "//div[contains(@class, 'x1iorvi4') and contains(@class, 'x1pi30zi')]"),
            (By.XPATH, "//span[text()='Home' or text()='Feed']"),
            (By.XPATH, "//div[@role='navigation']")
        ]

        for selector_type, selector in selectors:
            try:
                WebDriverWait(driver, 5).until(
                    EC.presence_of_element_located((selector_type, selector))
                )
                success = True
                logger.info(
                    f"Login successful - found element with selector: {selector}")
                break
            except (TimeoutException, NoSuchElementException):
                continue

        if not success:
            try:
                login_button = driver.find_element(
                    By.XPATH, "//button[contains(text(), 'Log In') or contains(text(), 'Sign In')]")
                logger.error("Login failed - login button still present")
                return False
            except NoSuchElementException:
                logger.info("No login button found, assuming login successful")
                return True

        return success
    except Exception as e:
        logger.error(f"Error during login verification: {str(e)}")
        return False


def expand_post_content(driver, post_element):
    """Expand 'See more' buttons in post content."""
    try:
        see_more_buttons = post_element.find_elements(
            By.XPATH, "//div[@role='button' and (text()='See more' or text()='See More')]")
        for btn in see_more_buttons:
            try:
                driver.execute_script("arguments[0].click();", btn)
                time.sleep(0.5)
            except Exception as e:
                logger.warning(f"Failed to click 'See more': {e}")
        return True
    except Exception as e:
        logger.warning(f"Error expanding post content: {e}")
        return False


def extract_post_url(post_element):
    """Extract clean post URL (remove query params) from post element."""
    try:
        link = post_element.find_element(
            By.XPATH, ".//div[contains(@class, 'xdj266r')]//a[@role='link' and contains(@href, '/posts/')]"
        )
        full_url = link.get_attribute("href")

        if "?" in full_url:
            clean_url = full_url.split("?")[0]
        else:
            clean_url = full_url

        return clean_url

    except Exception as e:
        logger.warning(f"Could not extract post URL: {str(e)}")
        return ""


def extract_post_date(post_element):
    """Extract post date from post element."""
    try:
        date_element = post_element.find_element(
            By.XPATH, ".//span[contains(@class,'x4k7w5x')]"
        )
        return date_element.text
    except Exception as e:
        logger.warning(f"Failed to extract date: {str(e)}")
        return ""


def extract_post_content(post_element):
    """Extract post content using multiple fallback methods."""
    try:
        content_element = post_element.find_element(
            By.XPATH, ".//div[@data-ad-rendering-role='story_message']")
        return content_element.text
    except NoSuchElementException:
        try:
            content_element = post_element.find_element(
                By.XPATH, ".//div[contains(@class, 'x6s0dn4') and contains(@class, 'xh8yej3')]")
            return content_element.text
        except NoSuchElementException:
            try:
                content_element = post_element.find_element(
                    By.XPATH, ".//div[@class='x1yztbdb x1n2onr6 xh8yej3 x1ja2u2z']")
                return content_element.text

            except Exception as e:
                logger.error(f"No content extracted: {str(e)}")
                return ""


def parse_property_details(content):
    """Parse property details from post content."""
    if not content:
        return {
            "area": "",
            "district": "",
            "ward": "",
            "address": "",
            "amenities": "",
            "price": "",
            "contact": ""
        }

    # Extract area
    area_matches = re.findall(
        r'(\d+(?:\.\d+)?)\s*(?:m2|m²|meter square|square meter)', content, re.IGNORECASE)
    area = area_matches[0] if area_matches else ""

    # Extract price
    price_matches = re.findall(
        r'(\d+(?:\.\d+)?)\s*(?:million|m|billion|b|tr|tỷ)\s*(?:vnd|đ|vnđ)?', content, re.IGNORECASE)
    price = price_matches[0] if price_matches else ""

    # Extract district
    found_district = ""
    for d in districts:
        if re.search(r'\b' + re.escape(d) + r'\b', content, re.IGNORECASE):
            found_district = d
            break

    # If no district found from the list, try regex pattern as fallback
    if not found_district:
        district_matches = re.findall(
            r'(?:district|quận|quan|huyen|huyện)\s*(\d+|[A-Za-z]+)', content, re.IGNORECASE)
        found_district = district_matches[0] if district_matches else ""

    # Extract ward
    found_ward = ""
    for w in wards:
        if re.search(r'\b' + re.escape(w) + r'\b', content, re.IGNORECASE):
            found_ward = w
            break

    # If no ward found from the list, try regex pattern as fallback
    if not found_ward:
        ward_matches = re.findall(
            r'(?:ward|phường|phuong|phương|xã|xa)\s*(\d+|[A-Za-z]+)', content, re.IGNORECASE)
        found_ward = ward_matches[0] if ward_matches else ""

    # Extract address
    address = ""
    address_patterns = [
        r'(?:address|địa chỉ|dia chi)[\s:]+([^\n]+)',
        r'(?:located at|located in|at|in)[\s:]+([^\n]+)'
    ]

    for pattern in address_patterns:
        address_matches = re.findall(pattern, content, re.IGNORECASE)
        if address_matches:
            address = address_matches[0].strip()
            break

    # If no structured address found, look for address format with district/ward
    if not address and (found_district or found_ward):
        lines = content.split('\n')
        for line in lines:
            if (found_district and found_district.lower() in line.lower()) or \
               (found_ward and found_ward.lower() in line.lower()):
                address = line.strip()
                break

    # Extract amenities
    found_amenities = []
    for keyword in amenities_keywords:
        if re.search(r'\b' + re.escape(keyword) + r'\b', content, re.IGNORECASE):
            found_amenities.append(keyword)

    amenities = ", ".join(found_amenities)

    # Extract contact info
    phone_patterns = [
        r'(?:\+84|0)\d{9,10}',  # Vietnamese phone numbers
        r'(?:phone|tel|telephone|contact|số|sdt|số điện thoại)[:\s]+(\d[\d\s.-]{8,})'  # Labeled phone numbers
    ]
    
    contact = ""
    for pattern in phone_patterns:
        matches = re.findall(pattern, content, re.IGNORECASE)
        if matches:
            contact = matches[0] if isinstance(matches[0], str) else matches[0][0]
            contact = re.sub(r'\s+', '', contact) 
            break

    return {
        "area": area,
        "district": found_district,
        "ward": found_ward,
        "address": address,
        "amenities": amenities,
        "price": price,
        "contact": contact
    }


def load_existing_csv_data(csv_file_path):
    """Load existing data from CSV file if it exists."""
    existing_posts = []
    processed_urls = set()
    content_hashes = set()
    
    if os.path.exists(csv_file_path):
        try:
            with open(csv_file_path, 'r', encoding='utf-8', newline='') as f:
                csv_reader = csv.DictReader(f)
                for row in csv_reader:
                    existing_posts.append(row)
                    post_id = row.get("postID", "")
                    if post_id:
                        content_hashes.add(post_id)
        except Exception as e:
            logger.error(f"Error loading existing CSV file: {str(e)}")
    
    return existing_posts, processed_urls, content_hashes


def scrape_facebook_group(driver, group_url, max_posts, csv_file_path):
    """Scrape posts from Facebook group and save to CSV."""
    driver.get(group_url)
    time.sleep(3)  # Allow page to load

    # Define CSV columns
    csv_columns = [
        "postID", "postDate", "content",
        "area", "district", "ward", "address", "amenities", 
        "price", "contact"
    ]

    # Load existing data if file exists
    all_posts, processed_urls, content_hashes = load_existing_csv_data(csv_file_path)
    # Convert list of dictionaries to dictionary with postID as key for easy updating
    posts_dict = {post.get("postID", ""): post for post in all_posts if post.get("postID", "")}

    posts_scraped = 0
    last_height = driver.execute_script("return document.body.scrollHeight")

    while posts_scraped < max_posts:
        # Find all post elements - refresh this query after each scroll
        post_elements = driver.find_elements(
            By.XPATH, ".//div[@class='x1yztbdb x1n2onr6 xh8yej3 x1ja2u2z']"
        )

        logger.info(f"Found {len(post_elements)} post elements")

        # Keep track of how many new posts we processed in this batch
        new_posts_in_batch = 0

        # Process each post element individually
        for post in post_elements:
            try:
                # Scroll to the post to ensure it's in view
                driver.execute_script("arguments[0].scrollIntoView(true);", post)
                time.sleep(1)  # Allow content to load

                # Expand post content
                expand_post_content(driver, post)

                # Extract post URL first to check if we've already processed this post
                post_url = extract_post_url(post)

                # Skip if already processed by URL (but not in content_hashes to allow updates)
                if post_url in processed_urls:
                    logger.info(f"Skipping already processed post: {post_url}")
                    continue

                # Extract content and generate hash
                content = extract_post_content(post)
                content_hash = generate_content_hash(content)

                # Mark URL as processed
                processed_urls.add(post_url)

                # Extract other data
                post_date = extract_post_date(post)

                # Parse property details
                property_details = parse_property_details(content)

                # Create post data structure
                post_data = {
                    "postID": content_hash,
                    "postDate": post_date,
                    "content": content,
                    "area": property_details["area"],
                    "district": property_details["district"],
                    "ward": property_details["ward"],
                    "address": property_details["address"],
                    "amenities": property_details["amenities"],
                    "price": property_details["price"],
                    "contact": property_details["contact"]
                }

                # Update or add to posts dictionary
                if content_hash in posts_dict:
                    logger.info(f"Updating existing post with ID: {content_hash}")
                    posts_dict[content_hash] = post_data
                else:
                    logger.info(f"Adding new post with ID: {content_hash}")
                    posts_dict[content_hash] = post_data
                    content_hashes.add(content_hash)

                posts_scraped += 1
                new_posts_in_batch += 1
                logger.info(f"Scraped post {posts_scraped}/{max_posts}")

                # Exit the loop if we've scraped enough posts
                if posts_scraped >= max_posts:
                    break

                # Add random delay to avoid detection
                time.sleep(random.uniform(1.5, 3.0))

            except Exception as e:
                logger.error(f"Error scraping post: {str(e)}")
                continue

        # If we didn't find any new posts in this batch, scroll down
        if new_posts_in_batch == 0:
            logger.info("No new posts found in current view, scrolling down...")
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(3)  # Wait for new content to load

            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                # If no new content loaded, break out of the loop
                logger.warning("No more posts to load")
                break

            last_height = new_height
        elif posts_scraped < max_posts:
            # If we found posts but need more, scroll down to load more
            logger.info(f"Found {new_posts_in_batch} new posts, scrolling down to get more...")
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(3)  # Wait for new content to load

            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                logger.warning("No more posts to load")
                break

            last_height = new_height

    # Convert dictionary back to list for CSV export
    updated_posts = list(posts_dict.values())

    # Save all posts to CSV file
    try:
        with open(csv_file_path, 'w', encoding='utf-8', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=csv_columns)
            writer.writeheader()
            writer.writerows(updated_posts)
        logger.info(f"Successfully saved {len(updated_posts)} posts to {csv_file_path}")
    except Exception as e:
        logger.error(f"Error saving to CSV file: {str(e)}")

    return posts_scraped, updated_posts


def export_to_database(csv_file_path):
    """Export data from CSV to MySQL database with update/insert logic."""
    logger.info(f"Starting database export from {csv_file_path}")
    
    try:
        # Load CSV into DataFrame
        df = pd.read_csv(csv_file_path)
        logger.info(f"Loaded {len(df)} rows from CSV")
        
        # Connect to database
        conn = mysql.connector.connect(
            host=os.getenv('db_host'),
            user=os.getenv('db_user'),
            password=os.getenv('db_password'),
            database=os.getenv('db_name'),
            connection_timeout=10
        )
        
        conn.autocommit = False  # Disable autocommit for batch processing
        cursor = conn.cursor()
        
        # Prepare SQL statements for insert and update
        insert_sql = """
            INSERT INTO post (
                postID, p_date, content, district, ward,
                street_address, price, area, amenities, contact_info
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """
        
        update_sql = """
            UPDATE post SET
                p_date = %s,
                content = %s,
                district = %s,
                ward = %s,
                street_address = %s,
                price = %s,
                area = %s,
                amenities = %s,
                contact_info = %s
            WHERE postID = %s
        """
        
        # Check which records already exist in the database
        existing_ids_query = "SELECT postID FROM post"
        cursor.execute(existing_ids_query)
        existing_ids = set(row[0] for row in cursor.fetchall())
        
        BATCH_SIZE = 100
        retry_limit = 3
        inserts = 0
        updates = 0
        
        for i, row in df.iterrows():
            post_id = None if pd.isna(row["postID"]) else str(row["postID"])
            
            if not post_id:
                continue  # Skip rows without a valid post ID
                
            try:
                # Prepare values tuple
                values = (
                    None if pd.isna(row["postDate"]) else row["postDate"],
                    None if pd.isna(row["content"]) else row["content"],
                    None if pd.isna(row["district"]) else row["district"],
                    None if pd.isna(row["ward"]) else row["ward"],
                    None if pd.isna(row["address"]) else row["address"],
                    None if pd.isna(row["price"]) else float(row["price"]),
                    None if pd.isna(row["area"]) else float(row["area"]) if row["area"] and row["area"].replace('.', '').isdigit() else None,
                    None if pd.isna(row["amenities"]) else row["amenities"],
                    None if pd.isna(row["contact"]) else row["contact"]
                )
                
                # Retry mechanism for database operations
                for attempt in range(retry_limit):
                    try:
                        if post_id in existing_ids:
                            # Update existing record (append post_id to the end of values)
                            cursor.execute(update_sql, values + (post_id,))
                            updates += 1
                        else:
                            # Insert new record (prepend post_id to the values)
                            cursor.execute(insert_sql, (post_id,) + values)
                            existing_ids.add(post_id)  # Add to existing IDs set
                            inserts += 1
                        break  # success
                    except mysql.connector.errors.DatabaseError as e:
                        if "Lock wait timeout exceeded" in str(e):
                            logger.warning(f"Lock timeout on row {i}, retrying ({attempt + 1}/{retry_limit})...")
                            time.sleep(2)
                        else:
                            raise
                
                # Commit batch
                if i % BATCH_SIZE == 0 and i > 0:
                    conn.commit()
                    logger.info(f"Committed batch, processed {i} rows (inserts: {inserts}, updates: {updates})")
            
            except Exception as e:
                logger.error(f"Error processing row {i}: {e}")
        
        # Final commit
        conn.commit()
        logger.info(f"Database export complete. Total: {len(df)} rows, {inserts} inserts, {updates} updates")
        
        return inserts, updates
    
    except Error as e:
        logger.error(f"Database error: {e}")
        if 'conn' in locals() and conn.is_connected():
            conn.rollback()
        return 0, 0
    except Exception as e:
        logger.error(f"General error during database export: {e}")
        if 'conn' in locals() and conn.is_connected():
            conn.rollback()
        return 0, 0
    finally:
        if 'conn' in locals() and conn.is_connected():
            cursor.close()
            conn.close()
            logger.info("Database connection closed")


def main():
    """Main function to run the scraper and database export."""
    csv_file_path = 'phongtro_data.csv'
    
    try:
        # Set up WebDriver
        driver = setup_driver()

        # Login to Facebook
        if login_facebook(driver, "facebook_cookies.json"):
            logger.info("Login successful")

            # Define Facebook groups to scrape
            groups = ["https://www.facebook.com/groups/281184089051767"]
            
            total_scraped = 0
            
            # Scrape each group
            for group_url in groups:
                logger.info(f"Scraping group: {group_url}")
                posts_scraped, scraped_posts = scrape_facebook_group(
                    driver, group_url, max_posts=10, csv_file_path=csv_file_path)
                total_scraped += posts_scraped
                logger.info(f"Scraped {posts_scraped} posts from group {group_url}")
            
            if total_scraped > 0 or os.path.exists(csv_file_path):
                logger.info("Starting database export...")
                inserts, updates = export_to_database(csv_file_path)
                logger.info(f"Database export completed: {inserts} new records, {updates} updated records")
            else:
                logger.warning("No data to export to database")
        else:
            logger.error("Failed to login to Facebook")

    except Exception as e:
        logger.error(f"Script error: {str(e)}")
    finally:
        if 'driver' in locals():
            driver.quit()
            logger.info("Browser closed.")


if __name__ == "__main__":
    main()

## Web CSV

In [None]:
import re
import csv
import json
import time
import random
import hashlib
import logging
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime

from selenium import webdriver
from selenium.webdriver.edge.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    NoSuchElementException, TimeoutException, StaleElementReferenceException
)


# ======== CONFIGURATION ========
DEFAULT_CONFIG = {
    "city": "da-nang",           # City to scrape data from (URL path)
    "post_limit": 0,             # Number of posts to scrape (0 = all)
    "output_file": "phongtro_test.csv",  # Output filename
    "headless": True,            # Run browser in headless mode (True) or visible (False)
    "random_delay": True,        # Add random delay between operations (to avoid blocking)
    "min_delay": 1,              # Minimum delay (seconds)
    "max_delay": 3,              # Maximum delay (seconds)
}

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w',
    filename='phongtro_scraper.log'
)
logger = logging.getLogger(__name__)


class WebScraper:
    def __init__(self, config: Dict[str, Any] = None):
        """Initialize scraper with configuration."""
        self.config = config or DEFAULT_CONFIG
        self.driver = None
        self.patterns = self._load_config()
        
    def _load_config(self) -> Dict:
        """Load patterns and location data from config.json file."""
        try:
            with open('config.json', 'r', encoding='utf-8') as config_file:
                return json.load(config_file)
        except Exception as e:
            logger.error(f"Error loading config.json: {str(e)}")
            return {}
    
    def setup_driver(self) -> webdriver.Edge:
        """Set up and return WebDriver instance."""
        options = Options()
        if self.config["headless"]:
            options.add_argument("--headless")  
            options.add_argument("--disable-gpu")
        
        options.add_argument("--window-size=720,1080")
        options.add_argument("--disable-notifications")
        options.add_argument("--no-sandbox")
        options.add_argument("--disable-dev-shm-usage")
        
        # Add user-agent to avoid detection as bot
        options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36")
        
        self.driver = webdriver.Edge(options=options)
        return self.driver

    def random_delay(self) -> float:
        """Create random delay if configured."""
        if self.config["random_delay"]:
            delay = random.uniform(self.config["min_delay"], self.config["max_delay"])
            time.sleep(delay)
            return delay
        return 0

    def check_and_move_to_next_page(self) -> bool:
        """Check if 'Next page' button is available and click it."""
        try:
            next_button = self.driver.find_element(By.XPATH, "//a[text()='Trang sau »']")
            if next_button.is_enabled():
                next_button.click()
                delay = self.random_delay()
                logger.info(f"Moved to next page (waited {delay:.2f}s)")
                return True
            else:
                logger.info("'Next' button not available or not found.")
                return False
        except NoSuchElementException:
            logger.info("'Next' button not found on this page.")
            return False
        except Exception as e:
            logger.error(f"Error moving to next page: {str(e)}")
            return False

    def get_all_urls(self, max_posts: int = 0) -> List[str]:
        """Get all post URLs from the website, limit if specified."""
        all_post_url = []
        current_page = 1
        
        while True:
            try:
                logger.info(f"Getting URLs from page {current_page}")
                # Wait for elements to load
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_all_elements_located((By.XPATH, "//a[contains(@class,'line-clamp-2')]"))
                )
                
                post_elements = self.driver.find_elements(By.XPATH, "//a[contains(@class,'line-clamp-2')]")
                
                for element in post_elements:
                    url = element.get_attribute('href')
                    all_post_url.append(url)
                    logger.debug(f"Added URL: {url}")
                    
                    # Check limit
                    if max_posts > 0 and len(all_post_url) >= max_posts:
                        logger.info(f"Reached limit of {max_posts} posts.")
                        return all_post_url[:max_posts]
                
                logger.info(f"Collected {len(all_post_url)} URLs")
                
                # Go to next page if available
                if not self.check_and_move_to_next_page():
                    break
                    
                current_page += 1
                
            except Exception as e:
                logger.error(f"Error getting URLs: {str(e)}")
                break
        
        return all_post_url

    def extract_datetime(self, date_time_str: str) -> str:
        """Extract date and time from string and format it as 'YYYY-MM-DD HH:MM:SS'."""
        try:
            parts = date_time_str.split(', ')
            if len(parts) < 2:
                return ""

            raw_datetime = parts[1] 
            dt = datetime.strptime(raw_datetime, "%H:%M %d/%m/%Y")
            return dt.strftime("%Y-%m-%d %H:%M:%S")

        except Exception as e:
            logger.error(f"Error extracting date and time: {str(e)}")
            return ""

    def get_post_content(self) -> str:
        """Get post content from description."""
        try:
            # Wait for element to load
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.XPATH, "//div[@class='border-bottom pb-3 mb-4']"))
            )
            
            description_container = self.driver.find_element(By.XPATH, "//div[@class='border-bottom pb-3 mb-4']")
            paragraphs = description_container.find_elements(By.XPATH, "./p")
            
            post_content_paragraphs = [p.text.strip() for p in paragraphs]
            post_content = "\n".join(post_content_paragraphs)
            return post_content.strip()  

        except TimeoutException:
            logger.warning("Timeout waiting for description element")
            return ""
        except Exception as e:
            logger.error(f"Error getting post content: {str(e)}")
            return ""

    def generate_post_id(self, content: str) -> str:
        """Generate unique ID from post content."""
        return hashlib.md5(content.encode('utf-8')).hexdigest()

    def get_district_and_ward(self, address: str) -> Tuple[Optional[str], Optional[str]]:
        """Extract district and ward from address string using keyword matching."""
        if not address or not self.patterns:
            return None, None

        try:
            # Get district list from config
            districts = self.patterns.get("districts", [])
            wards = self.patterns.get("wards", {})

            detected_district = None
            for district in districts:
                if re.search(r"\b" + re.escape(district) + r"\b", address, re.IGNORECASE):
                    detected_district = district
                    break

            detected_ward = None
            if detected_district and detected_district in wards:
                for ward in wards[detected_district]:
                    if re.search(r"\b" + re.escape(ward) + r"\b", address, re.IGNORECASE):
                        detected_ward = ward
                        break

            # Fall back to simple substring match if regex fails
            if not detected_ward and detected_district:
                address_lower = address.lower()
                for ward in wards[detected_district]:
                    if ward.lower() in address_lower:
                        detected_ward = ward
                        break

            return detected_district, detected_ward

        except Exception as e:
            logger.error(f"Error parsing address '{address}': {str(e)}")
            return None, None
        
    def get_amenities(self, content: str) -> List[str]:
        """Get amenities list from post."""
        if not self.patterns:
            return []

        # Get amenity patterns from config
        amenity_patterns = self.patterns.get("amenity_patterns", {})
        detected_amenities = set()

        try:
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_all_elements_located((
                    By.XPATH,
                    "//div[@class='text-body d-flex pt-1 pb-1' and not(contains(@style, '--bs-text-opacity: 0.1;'))]"
                ))
            )

            amenity_elements = self.driver.find_elements(
                By.XPATH,
                "//div[@class='text-body d-flex pt-1 pb-1' and not(contains(@style, '--bs-text-opacity: 0.1;'))]"
            )

            for element in amenity_elements:
                text = element.text.strip()
                if text:
                    matched = False
                    for label, pattern in amenity_patterns.items():
                        if re.search(pattern, text, re.IGNORECASE):
                            detected_amenities.add(label)
                            matched = True
                            break
                    if not matched:
                        detected_amenities.add(text) 

            # Get from content
            for label, pattern in amenity_patterns.items():
                if re.search(pattern, content, re.IGNORECASE):
                    detected_amenities.add(label)

            return list(detected_amenities)

        except TimeoutException:
            logger.warning("Timeout waiting for amenity elements")
            return list(detected_amenities)
        except Exception as e:
            logger.error(f"Error getting amenities: {str(e)}")
            return list(detected_amenities)

    def extract_price_value(self, price_str: str) -> Optional[int]:
        """Extract numeric value from price string and return as integer (VND)."""
        try:
            s = price_str.lower().replace('đồng', '').replace('vnd', '').replace('/tháng', '').strip()

            if m := re.search(r'(\d+)[.,](\d+)\s*triệu', s):
                return int(m.group(1)) * 1_000_000 + int(m.group(2).ljust(2, '0')) * 10_000
            elif m := re.search(r'(\d+)\s*triệu', s):
                return int(m.group(1)) * 1_000_000
            elif m := re.search(r'(\d{3,}(?:[.,]\d{3})*)', s):
                return int(m.group(1).replace('.', '').replace(',', ''))

            return None
        except Exception as e:
            logger.error(f"Error processing price: {e}")
            return None
    
    def extract_area_value(self, area_str: str) -> Optional[float | int]:
        """Extract numeric value from area string. Return int if whole number, else float."""
        try:
            match = re.search(r'([\d.,]+)', area_str)
            if not match:
                return None

            number = float(match.group(1).replace(',', '.'))
            return int(number) if number.is_integer() else number
        except Exception as e:
            logger.error(f"Error processing area: {str(e)}")
            return None

        
    def get_element_text_safely(self, xpath: str, default: str = "") -> str:
        """Safely get text from an element with fallback."""
        try:
            element = self.driver.find_element(By.XPATH, xpath)
            return element.text.strip()
        except (NoSuchElementException, StaleElementReferenceException):
            return default
        
    def _extract_metadata(self) -> Dict[str, Any]:
        raw = self.get_element_text_safely(
            "(//td[@class='border-0 pb-0'])[2]",
            self.get_element_text_safely("(//table[@class='table table-borderless align-middle m-0'])/tbody//tr[5]")
        )
        return {
            "time": self.extract_datetime(raw)
        }

    def _extract_address_and_location(self) -> Dict[str, Any]:
        address = self.get_element_text_safely(
            "(//td[@colspan='3'])[3]",
            self.get_element_text_safely("(//table[@class='table table-borderless align-middle m-0'])/tbody//tr[3]/td[2]")
        )
        district, ward = self.get_district_and_ward(address)
        return {
            "address": address,
            "district": district,
            "ward": ward
        }

    def _extract_price_area(self, content: str) -> Dict[str, Any]:
        price_str = self.get_element_text_safely(
            "//span[@class='text-price fs-5 fw-bold']",
            self.get_element_text_safely("//span[@class='text-green fs-5 fw-bold']")
        )
        area_str = self.get_element_text_safely("//div[@class='d-flex justify-content-between']/div/span[3]")
        return {
            "price": self.extract_price_value(price_str),
            "area": self.extract_area_value(area_str),
            "amenities": self.get_amenities(content)
        }

    def _extract_contact(self) -> str:
        return self.get_element_text_safely(
            "//div[@class='mb-4']//i[@class='icon telephone-fill white me-2']/.."
        ).strip()

    def get_post_data(self, url: str) -> Optional[Dict[str, Any]]:
        """Get post data from URL by extracting parts separately."""
        try:
            self.driver.get(url)
            delay = self.random_delay()
            logger.info(f"Loading page {url} (waited {delay:.2f}s)")

            if "Page not found" in self.driver.title or "Error" in self.driver.title:
                logger.warning(f"Page doesn't exist or has error: {url}")
                return None

            content = self.get_post_content()
            post_id = self.generate_post_id(content)

            metadata = self._extract_metadata()
            address_data = self._extract_address_and_location()
            pricing = self._extract_price_area(content)
            contact = self._extract_contact()

            return {
                "postID": post_id,
                "time": metadata["time"],
                "content": content,
                "address": address_data["address"],
                "ward": address_data["ward"],
                "district": address_data["district"],
                "area": pricing["area"],
                "price": pricing["price"],
                "amenities": ", ".join(pricing["amenities"]),  # Join list for CSV
                "contact": contact,
            }

        except Exception as e:
            logger.error(f"Error getting data from URL {url}: {str(e)}")
            return None

    def save_to_csv(self, data: List[Dict], filename: str) -> bool:
        """Save data to CSV file."""
        try:
            if not data:
                logger.warning("No data to save to CSV.")
                return False
                
            # Get field names from the first item
            fieldnames = list(data[0].keys())
            
            with open(filename, 'w', encoding='utf-8', newline='') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
                for row in data:
                    writer.writerow(row)
            
            logger.info(f"Saved data to {filename}")
            return True
        except Exception as e:
            logger.error(f"Error saving data to CSV file: {str(e)}")
            return False

    def print_summary(self, post_data_list: List[Dict]):
        """Print summary of collected data."""
        if not post_data_list:
            print("No data collected!")
            return
            
        print("\n" + "="*50)
        print(f"🏠 PHONGTRO DATA COLLECTION SUMMARY 🏠")
        print("="*50)
        print(f"✅ Number of posts collected: {len(post_data_list)}")
        
        # District stats
        districts = {}
        for post in post_data_list:
            district = post.get("district", "")
            if district:
                districts[district] = districts.get(district, 0) + 1
        
        if districts:
            print("\n📍 Distribution by district:")
            for district, count in sorted(districts.items(), key=lambda x: x[1], reverse=True):
                print(f"  • District {district}: {count} posts")
        
        # Price stats
        prices = [post.get("price", "") for post in post_data_list if post.get("price")]
        if prices:
            print("\n💰 Price information:")
            print(f"  • Number of posts with price info: {len(prices)}")
        
        print("\n💾 Data saved to: " + self.config["output_file"])
        print("="*50 + "\n")

    def collect_posts(self, urls: List[str]) -> List[Dict[str, Any]]:
        posts = []
        for i, url in enumerate(urls):
            print(f"Processing post {i+1}/{len(urls)}", end='\r')
            logger.info(f"Processing {i+1}/{len(urls)}: {url}")
            data = self.get_post_data(url)
            if data:
                posts.append(data)
        return posts

    def run(self):
        self._print_header()
        if not self.patterns:
            print("Error: Could not load config.json.")
            return

        start_time = time.time()
        self.setup_driver()

        try:
            self.driver.get(f"https://phongtro123.com/tinh-thanh/{self.config['city']}?orderby=moi-nhat")
            self.random_delay()

            urls = self.get_all_urls(self.config["post_limit"])
            posts = self.collect_posts(urls)

            if posts:
                self.save_to_csv(posts, self.config["output_file"])
                self.print_summary(posts)
            else:
                print("No data collected.")
        finally:
            self.driver.quit()
            print(f"⏱️ Execution time: {time.time() - start_time:.2f} seconds")

    
    def _print_header(self):
        """Print program header."""
        print("\n" + "="*50)
        print("🏠 PHONGTRO DATA SCRAPER 🏠")
        print("="*50)
        print(f"• City: {self.config['city']}")
        print(f"• Post limit: {self.config['post_limit'] if self.config['post_limit'] > 0 else 'No limit'}")
        print(f"• Output file: {self.config['output_file']}")
        print(f"• Headless mode: {'On' if self.config['headless'] else 'Off'}")
        print("="*50 + "\n")


if __name__ == "__main__":
    scraper = WebScraper(DEFAULT_CONFIG)
    scraper.run()

## Import webdata to dtbs

In [None]:
import re
import csv
import json
import time
import random
import hashlib
import logging
import os

import mysql.connector
from mysql.connector import Error
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from dotenv import load_dotenv

from selenium import webdriver
from selenium.webdriver.edge.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    NoSuchElementException, TimeoutException, StaleElementReferenceException
)


# ======== CONFIGURATION ========
DEFAULT_CONFIG = {
    "city": "da-nang",                      # City to scrape data from (URL path)
    "post_limit": 0,                        # Number of posts to scrape (0 = all)
    "output_file": "Scraped_data.csv",      # Output filename
    "headless": True,                       # Run browser in headless mode (True) or visible (False)
    "random_delay": True,                   # Add random delay between operations (to avoid blocking)
    "min_delay": 1,                         # Minimum delay (seconds)
    "max_delay": 3,                         # Maximum delay (seconds)
    "import_to_db": True,                   # Import data to database after scraping
    "db_batch_size": 100,                   # Number of records to commit in each batch
    "db_retry_limit": 3,                    # Number of retries for database operations
}

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w',
    filename='data_scraper.log'
)
logger = logging.getLogger(__name__)


class DateScraper:
    def __init__(self, config: Dict[str, Any] = None):
        """Initialize scraper with configuration."""
        self.config = config or DEFAULT_CONFIG
        self.driver = None
        self.patterns = self._load_config()
        self.db_connection = None
        self.db_cursor = None
        
    def _load_config(self) -> Dict:
        """Load patterns and location data from config.json file."""
        try:
            with open('config.json', 'r', encoding='utf-8') as config_file:
                return json.load(config_file)
        except Exception as e:
            logger.error(f"Error loading config.json: {str(e)}")
            return {}
    
    def setup_driver(self) -> webdriver.Edge:
        """Set up and return WebDriver instance."""
        options = Options()
        if self.config["headless"]:
            options.add_argument("--headless")  
            options.add_argument("--disable-gpu")
        
        options.add_argument("--window-size=720,1080")
        options.add_argument("--disable-notifications")
        options.add_argument("--no-sandbox")
        options.add_argument("--disable-dev-shm-usage")
        
        # Add user-agent to avoid detection as bot
        options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36")
        
        self.driver = webdriver.Edge(options=options)
        return self.driver

    def random_delay(self) -> float:
        """Create random delay if configured."""
        if self.config["random_delay"]:
            delay = random.uniform(self.config["min_delay"], self.config["max_delay"])
            time.sleep(delay)
            return delay
        return 0

    def check_and_move_to_next_page(self) -> bool:
        """Check and move to next page"""
        try:
            next_button = self.driver.find_element(By.XPATH, "//a[text()='Trang sau »']")
            if next_button.is_enabled():
                next_button.click()
                delay = self.random_delay()
                logger.info(f"Moved to next page (waited {delay:.2f}s)")
                return True
            else:
                logger.info("'Next' button not available or not found.")
                return False
        except NoSuchElementException:
            logger.info("'Next' button not found on this page.")
            return False
        except Exception as e:
            logger.error(f"Error moving to next page: {str(e)}")
            return False

    def get_all_urls(self, max_posts: int = 0) -> List[str]:
        """Get all post URLs from the website, limit if specified."""
        all_post_url = []
        current_page = 1
        
        while True:
            try:
                logger.info(f"Getting URLs from page {current_page}")
                
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_all_elements_located((By.XPATH, "//a[contains(@class,'line-clamp-2')]"))
                )
                
                post_elements = self.driver.find_elements(By.XPATH, "//a[contains(@class,'line-clamp-2')]")
                
                for element in post_elements:
                    url = element.get_attribute('href')
                    all_post_url.append(url)
                    logger.debug(f"Added URL: {url}")
                    
                    if max_posts > 0 and len(all_post_url) >= max_posts:
                        logger.info(f"Reached limit of {max_posts} posts.")
                        return all_post_url[:max_posts]
                
                logger.info(f"Collected {len(all_post_url)} URLs")
                
                if not self.check_and_move_to_next_page():
                    break
                    
                current_page += 1
                
            except Exception as e:
                logger.error(f"Error getting URLs: {str(e)}")
                break
        
        return all_post_url

    def extract_datetime(self, date_time_str: str) -> str:
        """Extract date and time from string and format it as 'YYYY-MM-DD HH:MM:SS'."""
        try:
            parts = date_time_str.split(', ')
            if len(parts) < 2:
                return ""

            raw_datetime = parts[1] 
            dt = datetime.strptime(raw_datetime, "%H:%M %d/%m/%Y")
            return dt.strftime("%Y-%m-%d %H:%M:%S")

        except Exception as e:
            logger.error(f"Error extracting date and time: {str(e)}")
            return ""

    def get_post_content(self) -> str:
        """Get post content from description."""
        try:
            # Wait for element to load
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.XPATH, "//div[@class='border-bottom pb-3 mb-4']"))
            )
            
            paragraphs = self.driver.find_elements(By.XPATH, "//div[@class='border-bottom pb-3 mb-4']/p")
            
            post_content_paragraphs = [p.text.strip() for p in paragraphs]
            post_content = "\n".join(post_content_paragraphs)
            return post_content.strip()  

        except TimeoutException:
            logger.warning("Timeout waiting for description element")
            return ""
        except Exception as e:
            logger.error(f"Error getting post content: {str(e)}")
            return ""

    def generate_post_id(self, content: str) -> str:
        """Generate unique ID from post content."""
        return hashlib.md5(content.encode('utf-8')).hexdigest()

    def get_district_and_ward(self, address: str) -> Tuple[Optional[str], Optional[str]]:
        """Extract district and ward from address string using keyword matching."""
        if not address or not self.patterns:
            return None, None

        try:
            # Get district list from config
            districts = self.patterns.get("districts", [])
            wards = self.patterns.get("wards", {})

            detected_district = None
            for district in districts:
                if re.search(r"\b" + re.escape(district) + r"\b", address, re.IGNORECASE):
                    detected_district = district
                    break

            detected_ward = None
            if detected_district and detected_district in wards:
                for ward in wards[detected_district]:
                    if re.search(r"\b" + re.escape(ward) + r"\b", address, re.IGNORECASE):
                        detected_ward = ward
                        break

            # Fall back to simple substring match if regex fails
            if not detected_ward and detected_district:
                address_lower = address.lower()
                for ward in wards[detected_district]:
                    if ward.lower() in address_lower:
                        detected_ward = ward
                        break

            return detected_district, detected_ward

        except Exception as e:
            logger.error(f"Error parsing address '{address}': {str(e)}")
            return None, None
        
    def get_amenities(self, content: str) -> List[str]:
        """Get amenities list from post."""
        if not self.patterns:
            return []
        # Get amenity patterns from config
        amenity_patterns = self.patterns.get("amenity_patterns", {})
        detected_amenities = set()
        try:
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_all_elements_located((
                    By.XPATH,
                    "//div[@class='text-body d-flex pt-1 pb-1' and not(contains(@style, '--bs-text-opacity: 0.1;'))]")))
            amenity_elements = self.driver.find_elements(
                By.XPATH,
                "//div[@class='text-body d-flex pt-1 pb-1' and not(contains(@style, '--bs-text-opacity: 0.1;'))]")
            for element in amenity_elements:
                text = element.text.strip()
                if text:
                    matched = False
                    for label, pattern in amenity_patterns.items():
                        if re.search(pattern, text, re.IGNORECASE):
                            detected_amenities.add(label)
                            matched = True
                            break
                    if not matched:
                        detected_amenities.add(text) 
            # Get from content
            for label, pattern in amenity_patterns.items():
                if re.search(pattern, content, re.IGNORECASE):
                    detected_amenities.add(label)
            return list(detected_amenities)
        except TimeoutException:
            logger.warning("Timeout waiting for amenity elements")
            return list(detected_amenities)
        except Exception as e:
            logger.error(f"Error getting amenities: {str(e)}")
            return list(detected_amenities)

    def extract_price_value(self, price_str: str) -> Optional[int]:
        """Extract numeric value from price string and return as integer (VND)."""
        try:
            s = price_str.lower().replace('đồng', '').replace('vnd', '').replace('/tháng', '').strip()

            if m := re.search(r'(\d+)[.,](\d+)\s*triệu', s):
                return int(m.group(1)) * 1_000_000 + int(m.group(2).ljust(2, '0')) * 10_000
            elif m := re.search(r'(\d+)\s*triệu', s):
                return int(m.group(1)) * 1_000_000
            elif m := re.search(r'(\d{3,}(?:[.,]\d{3})*)', s):
                return int(m.group(1).replace('.', '').replace(',', ''))

            return None
        except Exception as e:
            logger.error(f"Error processing price: {e}")
            return None
    
    def extract_area_value(self, area_str: str) -> Optional[float | int]:
        """Extract numeric value from area string."""
        try:
            match = re.search(r'([\d.,]+)', area_str)
            if not match:
                return None

            number = float(match.group(1).replace(',', '.'))
            return number
        except Exception as e:
            logger.error(f"Error processing area: {str(e)}")
            return None

        
    def get_element_text_safely(self, xpath: str, default: str = "") -> str:
        """Safely get text from an element with fallback."""
        try:
            element = self.driver.find_element(By.XPATH, xpath)
            return element.text.strip()
        except (NoSuchElementException, StaleElementReferenceException):
            return default
        
    def extract_metadata(self) -> Dict[str, Any]:
        raw = self.get_element_text_safely(
            "(//td[@class='border-0 pb-0'])[2]",
            self.get_element_text_safely("(//table[@class='table table-borderless align-middle m-0'])/tbody//tr[5]")
        )
        return {
            "time": self.extract_datetime(raw)
        }

    def extract_address_and_location(self) -> Dict[str, Any]:
        address = self.get_element_text_safely(
            "(//td[@colspan='3'])[3]",
            self.get_element_text_safely("(//table[@class='table table-borderless align-middle m-0'])/tbody//tr[3]/td[2]")
        )
        district, ward = self.get_district_and_ward(address)
        return {
            "address": address,
            "district": district,
            "ward": ward
        }

    def extract_info_area(self, content: str) -> Dict[str, Any]:
        price_str = self.get_element_text_safely(
            "//span[@class='text-price fs-5 fw-bold']",
            self.get_element_text_safely("//span[@class='text-green fs-5 fw-bold']")
        )
        area_str = self.get_element_text_safely("//div[@class='d-flex justify-content-between']/div/span[3]")
        return {
            "price": self.extract_price_value(price_str),
            "area": self.extract_area_value(area_str),
            "amenities": self.get_amenities(content)
        }

    def extract_contact(self) -> str:
        return self.get_element_text_safely(
            "//div[@class='mb-4']//i[@class='icon telephone-fill white me-2']/.."
        ).strip()

    def get_post_data(self, url: str) -> Optional[Dict[str, Any]]:
        """Get post data from URL by extracting parts separately."""
        try:
            self.driver.get(url)
            delay = self.random_delay()
            logger.info(f"Loading page {url} (waited {delay:.2f}s)")

            if "Page not found" in self.driver.title or "Error" in self.driver.title:
                logger.warning(f"Page doesn't exist or has error: {url}")
                return None

            content = self.get_post_content()
            post_id = self.generate_post_id(content)

            metadata = self.extract_metadata()
            address_data = self.extract_address_and_location()
            pricing = self.extract_info_area(content)
            contact = self.extract_contact()

            return {
                "postID": post_id,
                "time": metadata["time"],
                "content": content,
                "address": address_data["address"],
                "ward": address_data["ward"],
                "district": address_data["district"],
                "area": pricing["area"],
                "price": pricing["price"],
                "amenities": pricing["amenities"],
                "contact": contact,
            }

        except Exception as e:
            logger.error(f"Error getting data from URL {url}: {str(e)}")
            return None

    def save_to_csv(self, data: List[Dict], filename: str) -> bool:
        """Save data to CSV file."""
        try:
            if not data:
                logger.warning("No data to save to CSV.")
                return False
                
            # Get field names from the first item
            fieldnames = list(data[0].keys())
            
            with open(filename, 'w', encoding='utf-8', newline='') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
                for row in data:
                    row_copy = row.copy()
                    if isinstance(row_copy["amenities"], list):
                        row_copy["amenities"] = json.dumps(row_copy["amenities"], ensure_ascii=False)
                    writer.writerow(row)
            
            logger.info(f"Saved data to {filename}")
            return True
        except Exception as e:
            logger.error(f"Error saving data to CSV file: {str(e)}")
            return False

    def print_summary(self, post_data_list: List[Dict]):
        """Print summary of collected data."""
        if not post_data_list:
            print("No data collected!")
            return
            
        print("\n" + "="*50)
        print(f"🏠 PHONGTRO DATA COLLECTION SUMMARY 🏠")
        print("="*50)
        print(f"✅ Number of posts collected: {len(post_data_list)}")
        
        # District stats
        districts = {}
        for post in post_data_list:
            district = post.get("district", "")
            if district:
                districts[district] = districts.get(district, 0) + 1
        
        if districts:
            print("\n📍 Distribution by district:")
            for district, count in sorted(districts.items(), key=lambda x: x[1], reverse=True):
                print(f"  • District {district}: {count} posts")
        
        # Price stats
        prices = [post.get("price", "") for post in post_data_list if post.get("price")]
        if prices:
            print("\n💰 Price information:")
            print(f"  • Number of posts with price info: {len(prices)}")
        
        print("\n💾 Data saved to: " + self.config["output_file"])
        
        if self.config["import_to_db"]:
            print("📊 Data imported to database")
            
        print("="*50 + "\n")

    def collect_posts(self, urls: List[str]) -> List[Dict[str, Any]]:
        posts = []
        for i, url in enumerate(urls):
            print(f"Processing post {i+1}/{len(urls)}", end='\r')
            logger.info(f"Processing {i+1}/{len(urls)}: {url}")
            data = self.get_post_data(url)
            if data:
                posts.append(data)
        return posts

    def connect_to_db(self):
        """Connect to the MySQL database."""
        try:
            load_dotenv()
            
            self.db_connection = mysql.connector.connect(
                host=os.getenv('db_host'),
                user=os.getenv('db_user'),
                password=os.getenv('db_password'),
                database=os.getenv('db_name'),
                connection_timeout=10
            )
            self.db_connection.autocommit = False  # Disable autocommit for batch processing
            self.db_cursor = self.db_connection.cursor()
            logger.info("Connected to database")
            return True
        except Error as e:
            logger.error(f"Database connection error: {str(e)}")
            print(f"Database connection error: {str(e)}")
            return False

    def close_db_connection(self):
        """Close the database connection if it's open."""
        if self.db_connection and self.db_connection.is_connected():
            if self.db_cursor:
                self.db_cursor.close()
            self.db_connection.close()
            logger.info("Database connection closed")
            print("Database connection closed")

    def import_to_database(self, data: List[Dict[str, Any]]) -> bool:
        """Import data to MySQL database with upsert (replace if exists)."""
        if not data:
            logger.warning("No data to import to database")
            return False
            
        # Connect to database
        if not self.connect_to_db():
            return False
            
        try:
            # Check if table exists, if not create it
            try:
                self.db_cursor.execute("""
                    CREATE TABLE IF NOT EXISTS post (
                        postID VARCHAR(32) PRIMARY KEY,
                        p_date DATETIME,
                        content LONGTEXT,
                        district VARCHAR(255),
                        ward VARCHAR(255),
                        street_address TEXT,
                        price INT,
                        area FLOAT,
                        amenities JSON,
                        contact_info VARCHAR(255),
                        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
                    )
                """)
                self.db_connection.commit()
                logger.info("Table 'post' checked/created")
            except Error as e:
                logger.error(f"Error creating table: {str(e)}")
                return False
                
            # Prepare SQL for inserting or updating
            upsert_sql = """
                INSERT INTO post (
                    postID, p_date, content, district, ward,
                    street_address, price, area, amenities, contact_info
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    p_date = VALUES(p_date),
                    content = VALUES(content),
                    district = VALUES(district),
                    ward = VALUES(ward),
                    street_address = VALUES(street_address),
                    price = VALUES(price),
                    area = VALUES(area),
                    amenities = VALUES(amenities),
                    contact_info = VALUES(contact_info)
            """
            
            records_processed = 0
            records_updated = 0
            records_inserted = 0
            
            # Process data in batches
            for i, row in enumerate(data):
                try:
                    # Check if post already exists
                    check_sql = "SELECT COUNT(*) FROM post WHERE postID = %s"
                    self.db_cursor.execute(check_sql, (row["postID"],))
                    exists = self.db_cursor.fetchone()[0] > 0
                    
                    amenities_json = json.dumps(row["amenities"], ensure_ascii=False) if isinstance(row["amenities"], list) else row["amenities"]
                    
                    # Prepare values tuple
                    values = (
                        row["postID"],
                        row["time"],
                        row["content"],
                        row["district"],
                        row["ward"],
                        row["address"],
                        row["price"],
                        row["area"],
                        amenities_json,
                        row["contact"]
                    )
                    
                    # Try to insert/update with retries
                    for attempt in range(self.config["db_retry_limit"]):
                        try:
                            self.db_cursor.execute(upsert_sql, values)
                            records_processed += 1
                            
                            if exists:
                                records_updated += 1
                            else:
                                records_inserted += 1
                                
                            break 
                        except mysql.connector.errors.DatabaseError as e:
                            if "Lock wait timeout exceeded" in str(e) and attempt < self.config["db_retry_limit"] - 1:
                                logger.warning(f"Lock timeout on row {i}, retrying ({attempt + 1}/{self.config['db_retry_limit']})...")
                                time.sleep(2)
                            else:
                                raise
                    
                    # Commit every batch_size records
                    if i % self.config["db_batch_size"] == 0 and i > 0:
                        self.db_connection.commit()
                        logger.info(f"Committed batch of {self.config['db_batch_size']} records (total: {records_processed})")
                        print(f"Processed {records_processed} records ({records_inserted} new, {records_updated} updated)")
                
                except Exception as e:
                    logger.error(f"Error processing row {i}: {str(e)}")
                    print(f"Error processing row {i}: {str(e)}")
            
            # Final commit for remaining records
            self.db_connection.commit()
            logger.info(f"Database import complete. Total: {records_processed} records ({records_inserted} new, {records_updated} updated)")
            print(f"Database import complete. Total: {records_processed} records ({records_inserted} new, {records_updated} updated)")
            return True
            
        except Error as e:
            logger.error(f"Database error: {str(e)}")
            print(f"Database error: {str(e)}")
            return False
            
        finally:
            self.close_db_connection()

    def run(self):
        """Run the complete workflow: scrape data, save to CSV, and import to database."""
        self.print_header()
        if not self.patterns:
            print("Error: Could not load config.json.")
            return

        start_time = time.time()
        self.setup_driver()

        try:
            self.driver.get(f"https://phongtro123.com/tinh-thanh/{self.config['city']}?orderby=moi-nhat")
            self.random_delay()

            urls = self.get_all_urls(self.config["post_limit"])
            posts = self.collect_posts(urls)

            if posts:
                # Save to CSV
                self.save_to_csv(posts, self.config["output_file"])
                
                # Import to database if configured
                if self.config["import_to_db"]:
                    self.import_to_database(posts)
                    
                self.print_summary(posts)
            else:
                print("No data collected.")
        finally:
            if self.driver:
                self.driver.quit()
            print(f"⏱️ Execution time: {time.time() - start_time:.2f} seconds")

    
    def print_header(self):
        """Print program header."""
        print("\n" + "="*50)
        print("🏠 PHONGTRO DATA SCRAPER & DATABASE IMPORTER 🏠")
        print("="*50)
        print(f"• City: {self.config['city']}")
        print(f"• Post limit: {self.config['post_limit'] if self.config['post_limit'] > 0 else 'No limit'}")
        print(f"• Output file: {self.config['output_file']}")
        print(f"• Headless mode: {'On' if self.config['headless'] else 'Off'}")
        print(f"• Import to database: {'Yes' if self.config['import_to_db'] else 'No'}")
        print("="*50 + "\n")


if __name__ == "__main__":
    scraper = DateScraper(DEFAULT_CONFIG)
    scraper.run()