In [None]:
from bs4 import BeautifulSoup
import requests
import re
import pandas as pd
from datetime import datetime
from selenium import webdriver
import csv
import os
import shutil
import json
import time
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from requests.exceptions import ConnectionError
import glob

In [None]:
class ValueExtractor(BaseEstimator, TransformerMixin):
    """Extracts property details from real estate advertisements."""

    def fit(self, X, y=None):
        """Fit method, required by scikit-learn, does nothing in this case."""
        return self

    def transform(self, html):
        """Extracts property details without relying on unstable class names."""
        
        soup = BeautifulSoup(html, "html.parser")
        
        # Find unique advertisement containers (assumed to be divs with at least one property link)
        ad_containers = set(ad.find_parent("div") for ad in soup.find_all("a", href=True, target="_blank"))

        extracted_data = []

        for container in ad_containers:
            if container:
                # Extract URL - Take only the first valid <a> link within the container
                a_tag = container.find("a", href=True, target="_blank")
                url = a_tag["href"] if a_tag else None

                # Extract Title - Look for the nearest <h2> inside the container
                title_elem = container.find("h2")
                title = title_elem.text.strip() if title_elem else None

                # Extract Price - Find first <p> containing "€"
                price_elem = container.find("p", string=lambda text: text and ("€" in text or "Cena Dohodou" in text))
                price = price_elem.text.strip() if price_elem else None

                # Extract Price per m² - Only extract if Price is NOT "Cena Dohodou"
                price_per_m_elem = None
                price_per_m = None
                if price and "Cena Dohodou" not in price:
                    price_per_m_elem = container.find("p", string=lambda text: text and "€/m²" in text)
                    price_per_m = price_per_m_elem.text.strip() if price_per_m_elem else None

                # Extract Property Size - Look for <p> containing "m²"
                size_elem = container.find("p", string=lambda text: text and "m²" in text)
                size = size_elem.text.strip() if size_elem else None

                # Extract Address using specific "data-test-id" attribute
                address_elem = container.find("p", {"data-test-id": "text"}, string=lambda text: text and "," in text)
                address = address_elem.text.strip() if address_elem else None

                # Extract Property Type - Look for a <p> that does not contain "m²" or "€"
                property_type_elem = container.find("p", {"data-test-id": "text"}, string=lambda text: text and "€" not in text and "m²" not in text and "," not in text)
                property_type = property_type_elem.text.strip() if property_type_elem else None

                # Append extracted values if the title and URL are valid
                if title and url:
                    extracted_data.append({
                        "Title": title,
                        "Url": url,
                        "Property_Type": property_type,
                        "Size": size,
                        "Price": price,
                        "Price_per_m²": price_per_m,
                        "Address": address
                    })
        # Convert extracted data into a DataFrame
        return pd.DataFrame(extracted_data)


In [None]:
class InfoExtractor(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        """Transform method that processes a DataFrame of URLs and other columns."""
        extracted_data = []
        
        # Loop over each row in the DataFrame to process each URL and include the other columns
        for index, row in X.iterrows():
            url = row['Url']
            size = row['Size']
            property_type = row['Property_Type']
            
            # Remove 'm²' from the size column if present
            size_cleaned = self.clean_size(size)
            
            # Extract the information from the URL
            data = self.extract_info(url, row)
            
            # Extract Semantic Metadata (texts inside MuiGrid-container until "MAKLÉR" is found)
            semantic_metadata = self.extract_semantic_metadata(url)
            
            # Add the size and property_type from X to the extracted data
            data['size_of_property'] = size_cleaned
            data['type_of_property'] = property_type
            data['url'] = url  # Add the URL to the extracted data
            data['description_text'] = data['description_text'] + " " + str(semantic_metadata)

            
            extracted_data.append(data)
        
        # Return the extracted data as a DataFrame
        return pd.DataFrame(extracted_data)

    def extract_info(self, url, data):
        """Extract information from a single URL."""
        headers = {'User-Agent': 'Mozilla/5.0'}
        response = requests.get(url, headers=headers, allow_redirects=True)
        soup = BeautifulSoup(response.text, 'html.parser')

        time.sleep(0.5)  # Be respectful of server rate limits
        clean_phone_number = self.extract_phone_number(soup)
        image = self.extract_image(soup)
        address = data['Address']
        title = data['Title']
        type_of_property = data["Property_Type"]
        size_of_property = self.extract_property_size(soup)
        description_text = self.extract_description(soup)
        transaction_type = self.extract_transaction_type(soup, data['Title'], description_text)
        price, price_per_m, price_per_month, price_per_m_per_month = self.extract_price_info(soup, transaction_type)
        
        return {
            "title":title,
            "address": address,
            "type_of_property": type_of_property,
            "size_of_property": size_of_property,
            "price": price,
            "price_per_m": price_per_m,
            "price_per_month": price_per_month,
            "price_per_m_per_month": price_per_m_per_month,
            "description_text": description_text,
            "transaction_type": transaction_type,
            "clean_phone_number": clean_phone_number,
            "image": image
        }

    def extract_semantic_metadata(self, url):
        """Extract texts from MuiGrid-container with data-test-id='text' until 'MAKLÉR' is found."""
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')

        # Find the container with id="main-detail-content"
        main_detail_content = soup.find(id="main-detail-content")
        
        # If the container is found, search for the MuiGrid-container within it
        if main_detail_content:
            # Find all elements with class="MuiGrid-container" inside the main-detail-content
            mui_grid_containers = main_detail_content.find_all('div', class_='MuiGrid-container')

            # Initialize a list to store the extracted texts
            texts = []

            # Loop through each MuiGrid-container and extract the texts with data-test-id="text"
            for container in mui_grid_containers:
                elements = container.find_all(attrs={"data-test-id": "text"})
                for element in elements:
                    text = element.get_text(strip=True)
                    
                    # Stop collecting if we encounter the target text "MAKLÉR"
                    if "MAKLÉR" in text:
                        return texts  # Return collected texts without including this one
                    
                    texts.append(text)
            
            return texts
        else:
            return []

    def clean_size(self, size):
        """Ensure size is a string, remove 'm²' if present, and handle NaN."""
        if pd.isna(size):  # Check if the size is NaN
            return size  # Return NaN if the value is NaN

        if not isinstance(size, str):  # If size is not a string, convert it
            size = str(size)

        # Remove 'm²', extra spaces, and any non-numeric characters
        size_cleaned = re.sub(r'[^\d.,]', '', size)  # Keep digits, commas, and periods
        return size_cleaned

    def extract_phone_number(self, soup):
        a_tag = soup.find_all('a', href=lambda href: href and href.startswith('tel:'))
        if a_tag:
            phone_number = a_tag[1]['href'].replace('tel:', '')
            clean_phone_number = phone_number.replace(' ', '').replace('&nbsp;', '')
        else:
            clean_phone_number = 'None'
        return clean_phone_number

    def extract_image(self, soup):
        img_tag = soup.find('img')
        return img_tag['src'] if img_tag else None

    def extract_title_and_address(self, soup):
        main_detail_content = soup.find('div', id='main-detail-content')
        first_box = main_detail_content.find('div', class_='MuiBox-root') if main_detail_content else None
        address = self.extract_text(first_box.find('p', class_='MuiTypography-body2')) if first_box else "Not found"
        return address

    def extract_property_type(self, soup):
        type_of_property = "Not found"
        type_label = soup.find('span', string="Druh")
        if type_label:
            type_value = type_label.find_next('span')
            if type_value:
                type_of_property = type_value.get_text(strip=True)
        return type_of_property

    def extract_property_size(self, soup):
        size_of_property = "Not found"
        size_label = soup.find('span', string="Úžitková plocha")
        if size_label:
            size_value = size_label.find_next('span')
            if size_value:
                size_of_property = size_value.get_text(strip=True)
        return size_of_property

    def extract_description(self, soup):
        description_text = "None"
        h3_tag = soup.find('h3', string="Popis nehnuteľnosti")
        if h3_tag:
            p_tag = h3_tag.find_next('p')
            if p_tag:
                description_text = p_tag.get_text(strip=True)
        return description_text

    def extract_transaction_type(self, soup, title, description_text):
        transaction_type = "Not found"
        if "kúpa" in title.lower() or "kúpa " in description_text.lower() or "kúpim " in title.lower() or "kúpim " in description_text.lower():
            transaction_type = "kúpa"
        elif "predaj" in title.lower() or "predaj " in description_text.lower() or "predám" in title.lower() or "predám" in description_text.lower():
            transaction_type = "predaj"
        elif "prenájom " in title.lower() or "prenájom " in description_text.lower() or "prenajmem" in title.lower() or "prenajmem" in description_text.lower():
            transaction_type = "prenájom"
        
        # If transaction type is still "Not found", check the #main-detail-content container for keywords
        if transaction_type == "Not found":
            main_detail_content = soup.find(id="main-detail-content")
            if main_detail_content:
                if "kúpa" in main_detail_content.get_text().lower():
                    transaction_type = "kúpa"
                elif "predaj" in main_detail_content.get_text().lower():
                    transaction_type = "predaj"
                elif "prenájom" in main_detail_content.get_text().lower():
                    transaction_type = "prenájom"
        
        return transaction_type

    def extract_price_info(self, soup, transaction_type):
        price, price_per_m, price_per_month, price_per_m_per_month = None, None, None, None
        main_detail_content = soup.find('div', id='main-detail-content')
        first_box = main_detail_content.find('div', class_='mui-19idom') if main_detail_content else None
        if transaction_type == "predaj":
            price_raw = self.extract_text(first_box.find('p', class_='MuiTypography-h3')) if first_box else None
            if price_raw and "Cena dohodou" not in price_raw:
                price_cleaned = re.sub(r'[^\d]', '', price_raw)
                price = int(price_cleaned) if price_cleaned else None
                price_per_m2_raw = self.extract_text(first_box.find('p', class_='MuiTypography-label2')) if first_box else None
                if price_per_m2_raw and "Cena dohodou" not in price_per_m2_raw:
                    price_per_m2_cleaned = re.sub(r'[^\d,]', '', price_per_m2_raw).replace(',', '.')
                    try:
                        price_per_m = float(price_per_m2_cleaned) if price_per_m2_cleaned else None
                    except ValueError:
                        price_per_m = None
            
        elif transaction_type == "prenájom":
            price_per_month_raw = self.extract_text(first_box.find('p', class_='MuiTypography-h3')) if first_box else None
            if price_per_month_raw and "Cena dohodou" not in price_per_month_raw and price_per_month_raw:
                price_per_month_cleaned = re.sub(r'[^\d]', '', price_per_month_raw)
                price_per_month = int(price_per_month_cleaned) if price_per_month_cleaned else None

                price_per_m_per_month_raw = self.extract_text(first_box.find('p', class_='MuiTypography-label2')) if first_box else None
                if price_per_m_per_month_raw and "Cena dohodou" not in price_per_m_per_month_raw:
                    price_per_m_per_month_cleaned = re.sub(r'[^\d,]', '', price_per_m_per_month_raw).replace(',', '.')
                    try:
                        price_per_m_per_month = float(price_per_m_per_month_cleaned) if price_per_m_per_month_cleaned else None
                    except ValueError:
                        price_per_m_per_month = None
        
        return price, price_per_m, price_per_month, price_per_m_per_month

    def extract_text(self, tag):
        return tag.get_text(strip=True) if tag else "Not found"


# Main Pipeline

In [None]:
# Define the pipeline
nehnutelnosti_sk_pipeline = Pipeline(steps=[
    ('value_extractor', ValueExtractor()),
    ('info_extractor', InfoExtractor())
])

# Startup function

In [None]:
# Define category URLs
categories = {
    "byty_predaj": "https://www.nehnutelnosti.sk/vysledky/byty/predaj?page=",
    "domy_predaj": "https://www.nehnutelnosti.sk/vysledky/domy/predaj?page=",
    "pozemky_predaj": "https://www.nehnutelnosti.sk/vysledky/pozemky/predaj?page=",
    "rekreacne_predaj": "https://www.nehnutelnosti.sk/vysledky/rekreacne-nehnutelnosti/predaj?page=",
    "priestory_predaj": "https://www.nehnutelnosti.sk/vysledky/priestory-a-objekty/predaj?page=",
    "prenajom": "https://www.nehnutelnosti.sk/vysledky/prenajom?page=",
    "kupa": "https://www.nehnutelnosti.sk/vysledky/kupa?page="
}

# Headers to simulate a real browser request
headers = {
    'Accept-Encoding': 'gzip, deflate, sdch',
    'Accept-Language': 'en-US,en;q=0.8',
    'Upgrade-Insecure-Requests': '1',
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Cache-Control': 'max-age=0',
    'Connection': 'keep-alive',
}

# Assuming nehnutelnosti_sk_pipeline is defined elsewhere
new_pipeline = Pipeline(steps=[
    ('real_estate_pipeline', nehnutelnosti_sk_pipeline)
])

def fetch_page_html(base_url, page_number):
    url = f"{base_url}{page_number}"
    response = requests.get(url, headers=headers, allow_redirects=False)
    if response.status_code == 200:
        return response.text
    else:
        return None

def scrape_with_retries(base_url, start_page, end_page, max_retries=20, delay=5):
    retries = 0
    while retries < max_retries:
        try:
            return scrape_multiple_pages(base_url, start_page, end_page)
        except ConnectionError as e:
            print(f"Connection error: {e}. Retrying {retries + 1}/{max_retries}...")
            retries += 1
            time.sleep(delay)
    print(f"Failed after {max_retries} attempts.")
    return pd.DataFrame()

def scrape_multiple_pages(base_url, start_page, end_page):
    all_data = pd.DataFrame()
    seen_urls = set()

    for page in range(start_page, end_page + 1):
        print(f"Scraping page {page}")
        raw_html = fetch_page_html(base_url, page)
        if raw_html:
            result_df = new_pipeline.fit_transform(raw_html)
            if isinstance(result_df, pd.DataFrame):
                if 'url' in result_df.columns:
                    result_df = result_df[~result_df['url'].isin(seen_urls)]
                    seen_urls.update(result_df['url'].dropna().tolist())
                else:
                    print("Warning: 'url' column not found; deduplication skipped.")
                all_data = pd.concat([all_data, result_df], ignore_index=True)
            else:
                print(f"Pipeline did not return DataFrame on page {page}. Skipping.")
    all_data.insert(0, 'id', range(1, len(all_data) + 1))
    return all_data

# Scrape each category
for category, base_url in categories.items():
    total_pages = 40  
    pages_per_file = 1
    for start_page in range(1, total_pages + 1, pages_per_file):
        end_page = min(start_page + pages_per_file - 1, total_pages)
        result_df = scrape_with_retries(base_url, start_page, end_page)
        if result_df.empty:
            print(f"Skipping {category} pages {start_page}-{end_page} due to repeated failures.")
            continue
        file_name = f'{category}_Pack{(start_page - 1) // pages_per_file + 1}.json'
        result_json = result_df.to_json(orient="records", lines=False, force_ascii=False)
        formatted_json = json.dumps(json.loads(result_json), indent=4, ensure_ascii=False)
        with open(file_name, 'w', encoding='utf-8') as f:
            f.write(formatted_json)

# Merge Listings (Optional)

In [None]:
# Automatically find all per-category JSON files (like byty_predaj_Pack1.json, etc.)
file_names = sorted(glob.glob("*_Pack*.json"))

merged_data = []
seen_urls = set()
current_id = 1

for file_name in file_names:
    with open(file_name, "r", encoding="utf-8") as file:
        data = json.load(file)
        for entry in data:
            url = entry.get("url")
            if url and url not in seen_urls:
                entry["id"] = current_id
                current_id += 1
                seen_urls.add(url)
                merged_data.append(entry)

# Save to one merged JSON file
with open("Merged_Listings.json", "w", encoding="utf-8") as outfile:
    json.dump(merged_data, outfile, indent=4, ensure_ascii=False)

print(f"Merged {len(file_names)} files with {len(merged_data)} unique listings into 'Merged_Listings.json'")
