# WebScraping on ImmoWelt.de

## Imports

In [17]:
import time

import pandas as pd
import bs4

from datetime import datetime

from bs4 import BeautifulSoup
from bs4 import ResultSet
from bs4.element import Tag as HtmlTag
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager

## Script

In [18]:
BUY_CATEGORIES = ["kaufen"]  # ["kaufen", "mieten"]
PROPERTY_CATEGORIES = ["wohnungen", "haeuser"]
HITS_PER_PAGE = 20
BASE_URL = "https://www.immowelt.de/liste/${state}/${property_category}/${buy_category}?d=true&efs=NEW_BUILDING_PROJECT&sd=DESC&sf=RELEVANCE&sp=${page}"

In [19]:
states = "Baden-Württemberg, Bayern, Berlin, Brandenburg, Bremen, Hamburg, Hessen, Mecklenburg-Vorpommern, Niedersachsen, Nordrhein-Westfalen, Rheinland-Pfalz, Saarland, Sachsen, Sachsen-Anhalt, Schleswig-Holstein, Thüringen"
states = states.replace("ü", "ue").lower().split(", ")
STATES = [f"bl-{state}" for state in states]
STATES

['bl-baden-wuerttemberg',
 'bl-bayern',
 'bl-berlin',
 'bl-brandenburg',
 'bl-bremen',
 'bl-hamburg',
 'bl-hessen',
 'bl-mecklenburg-vorpommern',
 'bl-niedersachsen',
 'bl-nordrhein-westfalen',
 'bl-rheinland-pfalz',
 'bl-saarland',
 'bl-sachsen',
 'bl-sachsen-anhalt',
 'bl-schleswig-holstein',
 'bl-thueringen']

### General Plan

<ol>
    <li>Get list of Properties</li>
    <li>Calculate the total Property pages count</li>
    <li>Get list of Expose URLs of the page</li>
    <li>Navigate to the Page and grab the Infos you want</li>
    <li>Loop</li>
</ol>

In [20]:
class ImmoWeltScraper:
    def __init__(
        self,
        base_url: str,
        states: list[str],
        hits_per_page: int,
        property_categories: list[str],
        buy_categories: list[str],
    ) -> None:
        self.TIME_DELAY = 5  # 60 * 2.5
        self.driver: webdriver.Chrome = None

        self.base_url = base_url
        self.states = states
        self.hits_per_page = hits_per_page
        self.property_categories = property_categories
        self.buy_categories = buy_categories
    
    def scrape(self) -> pd.DataFrame:
        try:
            self._create_driver()
            df = self._scrape()
            self._log("Script successful finished!", "SUCCESS")
        except Exception as exp:
            self._destroy_driver()
            self._log("Script failed. Try again!", "ERR")
            self._log(str(exp), "ERR_MSG")
            # raise exp
    
    def _scrape(self) -> pd.DataFrame:
        total_loops = len(self.states) * len(self.property_categories) * len(self.buy_categories)
        curr_loop = 0
        total_property_data: list[dict] = []
        for state in self.states:
            state_property_data: list[dict] = []
            for property_category in self.property_categories:
                for buy_category in self.buy_categories:
                    curr_loop += 1
                    self._log(f"Current-Loop: {curr_loop} of {total_loops}")
                    url = self._get_prep_url(state, property_category, buy_category)
                    soup = self._navigate_to(url)
                    if not self._is_page_found(soup):
                        continue
                    
                    page_count = self._calc_total_relevant_page_count(soup)
                    page_count = 35 if page_count > 35 else page_count
                    # page_count = 2 if page_count > 2 else page_count  # just for testing
                    for i in range(0, page_count):
                        try:
                            page = i + 1
                            self._log(f"State: {state}, Category: {property_category}, Type: {buy_category}, Page: {page} of {page_count}")
                            page_url = self._get_prep_url_page(url, page)
                            soup = self._navigate_to(url)

                            expose_urls = self._get_expose_urls(soup)
                            for expose_url in expose_urls:
                                self.driver.get(expose_url)
                                property_data = self._get_property_data(
                                    property_category,
                                    buy_category,
                                    state
                                )
                                state_property_data.append(property_data)
                                total_property_data.append(property_data)
                        except Exception as exp:
                            self._log(f"Err URL: {expose_url}")
                            self._log(str(exp), "WARN")
                            continue
                self._log(f"Category delay of {self.TIME_DELAY}s", "SLEEP")
                time.sleep(self.TIME_DELAY)
            filepath = f"./data2/property_immowelt_{state}.csv"
            _ = self._export(state_property_data, filepath)
            self._log(f"State delay of {self.TIME_DELAY}s", "SLEEP")
            time.sleep(self.TIME_DELAY)
        filepath = "./data2/property_immowelt_data.csv"
        return self._export(total_property_data, filepath)

    def scrape_obj_test(
        self,
        expose_url: str,
    ) -> dict:
        self._create_driver()
        soup = self._navigate_to(expose_url)
        property_data = self._get_property_data(
            soup,
            "property_category",
            "buy_category",
            "state"
        )
        self._destroy_driver()
        return property_data

    def _create_driver(self) -> None:
        if self.driver:
            self._destroy_driver()
        options = Options()
        options.add_argument("--headless")
        self.driver = webdriver.Chrome(
            service=Service(
                ChromeDriverManager().install()
            ),
            options=options
        )
        self._log("Driver created")
        self._check_permission_requirements()
        self._log("Permissions accepted")

    def _destroy_driver(self) -> None:
        self._log("Destroy driver")
        if not self.driver:
            return
        self.driver.close()
        self.driver = None

    def _check_permission_requirements(self, err_loop: int = 0):
        try:
            self.driver.get("https://www.immowelt.de/immobilienpreise")
            check_element = None
            while not check_element:
                check_element = self.driver.execute_script(
                    """return document.querySelector('#usercentrics-root').shadowRoot.querySelector("button[data-testid='uc-accept-all-button']")"""
                )
                if check_element:
                    check_element.click()
                else:
                    self._log("Permission loop. Wait 2 seconds.")
                    time.sleep(2)
        except Exception as exp:
            err_loop += 1
            if err_loop == 5:
                raise exp
            self._log(f"Permission ERR#{err_loop}-loop", "WARN")
            self._check_permission_requirements(err_loop)

    def _export(self, data: list[dict], filepath: str) -> pd.DataFrame:
        df = pd.DataFrame(data)
        df.replace("|", "_", inplace=True)
        df.to_csv(filepath, sep="|", index=False)
        self._log(f"CSV-Export to: {filepath}")
        return df

    def _navigate_to(self, url: str) -> BeautifulSoup:
        self.driver.get(url)
        return BeautifulSoup(self.driver.page_source)

    def _is_page_found(self, soup: BeautifulSoup) -> bool:
        is_page_found = not soup.find("div", {"class": "NotFound-d39a0"})
        if not is_page_found:
            url = self.driver.current_url
            self._log(f"Page not Found: {url}", "ERROR")
        return is_page_found

    def _log(self, msg: str, tag: str = "INFO") -> None:
        tag = tag.upper()
        ts = datetime.now().strftime("%Y-%m-%d, %H:%M:%S")
        print(f"[{ts}] {tag}:\t{msg}")

    def _get_prep_url(
        self,
        state: str,
        property_category: str,
        buy_category: str,
        page: int = 1
    ) -> str:
        page = str(page)
        return (
            self.base_url
                .replace("${state}", state)
                .replace("${property_category}", property_category)
                .replace("${buy_category}", buy_category)
                .replace("${page}", page)
        )

    def _get_prep_url_page(self, url: str, page: int) -> str:
        url_page_removed = url[:url.find("sp=")]
        return f"{url_page_removed}sp={page}"

    def _calc_total_relevant_page_count(self, soup: BeautifulSoup) -> int:
        hits_emt = soup.find("h1", {"class": "MatchNumber-a225f"})
        if not hits_emt:
            return 0
        content = hits_emt.text
        hits = content[:content.find(" ")].replace(".", "")
        page_count = int(int(hits) / self.hits_per_page)
        return page_count

    def _get_expose_urls(self, soup: BeautifulSoup) -> list[str]:
        expose_urls_emt = soup.find_all("a", {"class": "mainSection-b22fb"})
        expose_urls = [link["href"] for link in expose_urls_emt]
        return expose_urls

    def _expand_read_more_areas(self, err_loop = 0) -> BeautifulSoup:
        try:
            script_expand_all_read_more = """
            let links = document.getElementsByClassName("link--read-more");
            while(links.length > 0) {
                links[0].click(); 
                links = document.getElementsByClassName("link--read-more");
            }
            """
            self.driver.execute_script(script_expand_all_read_more)
            return BeautifulSoup(self.driver.page_source)
        except Exception as exp:
            print("EXEPTION!!")
            if err_loop >= 5:
                raise exp
            err_loop += + 1
            self._log(f"Can't expand read-more. Try: {err_loop} of 5", "WARN")
            self._expand_read_more_areas(err_loop)

    def _get_overview_container(self, soup: BeautifulSoup) -> HtmlTag:
        return soup.find("app-objectmeta", {"id": "aUebersicht"})
    
    def _get_data_title(self, overview_container: HtmlTag) -> str:
        emt = overview_container.find("h1", {"class": "ng-star-inserted"})
        return emt.text if emt else "Na"
    
    def _get_data_hardfacts(self, overview_container: HtmlTag) -> tuple[str, str, str]:
        hardfact_emts = overview_container.find_all("div", {"class": "hardfact ng-star-inserted"})

        def prep_text(emt):
            content: str = emt.text.strip()
            return content[:content.find(" ")]

        hardfact_emt: HtmlTag
        price = "Na"
        living_space = "Na"
        rooms = "Na"
        for hardfact_emt in hardfact_emts:
            hardfact_label = hardfact_emt.find("div", {"class": "hardfact__label"}).text.strip()
            value = prep_text(hardfact_emt)
            if hardfact_label == "Kaufpreis" or hardfact_label == "Mindestpreis":
                price = value
            elif hardfact_label == "Wohnfläche ca.":
                living_space = value
            elif hardfact_label == "Zimmer":
                rooms = value
        
        if price == "Na":
            emt: HtmlTag = overview_container.find("div", {"class": "hardfact hardfact__price ng-star-inserted"})
            price = emt.findChild().text.strip() if emt and emt.findChild() else "Na"
        
        return price, living_space, rooms
    
    def _get_data_badges(self, overview_container: BeautifulSoup) -> str:
        badge_emts = overview_container.find_all("sd-badge")
        badges = "Na"
        if badge_emts:
            badges = ";".join([legal_info.text for legal_info in badge_emts])
        return badges

    def _get_data_ratings(self, soup: BeautifulSoup) -> tuple[str, str]:
        rating_emts = soup.find_all("div", {"class": "rating-meter__value"})
        location_rating = "Na"
        public_transport_rating = "Na"
        if len(rating_emts) == 2:
            location_rating = rating_emts[0].text
            public_transport_rating = rating_emts[1].text
        return location_rating, public_transport_rating

    def _get_data_equipments(self, soup: BeautifulSoup) -> str:
        equipment_container = soup.find("div", {"class": "equipment card-content ng-star-inserted"})
        if not equipment_container:
            return "Na"

        equipment_cells = equipment_container.find_all("sd-cell-row", {"class": "cell__row"})
        if not equipment_cells:
            return "Na"

        equipments = ""
        equipment_cell: HtmlTag
        for equipment_cell in equipment_cells:
            values = equipment_cell.find_all("p")
            if len(values) >= 2:
                title = values[0].text
                value = values[1].text
                equipments += f"{title}: {value};"

        return equipments[:len(equipments)-1] if equipments else "Na"
    
    def _get_data_features(self, soup: BeautifulSoup) -> str:
        feature_list_emts = soup.find_all("div", {"class": "textlist"})
        if not feature_list_emts:
            return "Na"
        features = ""
        feature_list_emt: HtmlTag
        for feature_list_emt in feature_list_emts:
            feature_emts = feature_list_emt.find_all("li")
            if feature_emts:
                feature_emt: HtmlTag
                for feature_emt in feature_emts:
                    features += f"{feature_emt.text.strip()};"

        return features[:len(features)-1]

    def _get_data_energy_data(self, soup: BeautifulSoup) -> str:
        energy_data = ""

        def get_data(curr_energy_data: str, energy_cells: ResultSet) -> str:
            if not energy_cells:
                return curr_energy_data

            energy_cell: HtmlTag
            for energy_cell in energy_cells:
                content_emts = energy_cell.find_all("p")
                title = content_emts[0].text
                content = content_emts[1].text
                curr_energy_data += f"{title}: {content};"

            return curr_energy_data

        energy_container1 = soup.find("app-energy-equipment")
        if energy_container1:
            energy_cells = energy_container1.find_all("sd-cell-col", {"data-cy": "energy-equipment"})
            energy_data = get_data(energy_data, energy_cells)

        energy_container2 = soup.find("div", {"class": "energy_information ng-star-inserted"})
        if energy_container2:
            energy_cells = energy_container2.find_all("sd-cell-col", {"class": "cell__col"})
            energy_data = get_data(energy_data, energy_cells)

        return energy_data[:len(energy_data)]

    def _get_data_keywords(self, soup: BeautifulSoup) -> str:
        read_more_emts = soup.find_all("sd-read-more")
        keywords = "Na"
        if not read_more_emts:
            return keywords.strip()

        last_emt_idx = len(read_more_emts) - 1
        keywords: str = read_more_emts[last_emt_idx].text
        KEYWORDS_NAME = "Stichworte"
        if not KEYWORDS_NAME in keywords:
            return keywords
        start_idx = keywords.find(KEYWORDS_NAME) + len(KEYWORDS_NAME)
        keywords = keywords[start_idx:]
        return keywords.strip()

    def _get_property_data(
        self,
        category: str,
        buy_rent: str,
        state: str,
    ) -> dict:
        soup = self._expand_read_more_areas()
        overview_container = self._get_overview_container(soup)

        url = self.driver.current_url
        title = self._get_data_title(overview_container)
        price, living_space, rooms = self._get_data_hardfacts(overview_container)
        badges = self._get_data_badges(overview_container)
        rating_location, rating_public_transport = self._get_data_ratings(soup)
        equipments = self._get_data_equipments(soup)
        features = self._get_data_features(soup)
        energy_data = self._get_data_energy_data(soup)
        keywords = self._get_data_keywords(soup)

        return {
            # "url": url,
            "category": category,
            "buy_rent": buy_rent,
            "state": state,
            "title": title,
            "price": price,
            "living_space": living_space,
            "rooms": rooms,
            "badges": badges,
            "rating_location": rating_location,
            "rating_public_transport": rating_public_transport,
            "equipments": equipments,
            "features": features,
            "energy_data": energy_data,
            "keywords": keywords,
            # image_paths: str,
        }


## Run Script

In [21]:
immo_scraper = ImmoWeltScraper(
    BASE_URL,
    STATES,
    HITS_PER_PAGE,
    PROPERTY_CATEGORIES,
    BUY_CATEGORIES
)
immo_scraper.scrape()

[2022-12-24, 11:15:45] INFO:	Driver created
[2022-12-24, 11:15:46] INFO:	Permission loop. Wait 2 seconds.
[2022-12-24, 11:15:48] INFO:	Permissions accepted
[2022-12-24, 11:15:48] INFO:	Current-Loop: 1 of 32
[2022-12-24, 11:15:53] INFO:	State: bl-baden-wuerttemberg, Category: wohnungen, Type: kaufen, Page: 1 of 35
[2022-12-24, 11:17:05] INFO:	State: bl-baden-wuerttemberg, Category: wohnungen, Type: kaufen, Page: 2 of 35
[2022-12-24, 11:18:16] INFO:	State: bl-baden-wuerttemberg, Category: wohnungen, Type: kaufen, Page: 3 of 35
[2022-12-24, 11:19:26] INFO:	State: bl-baden-wuerttemberg, Category: wohnungen, Type: kaufen, Page: 4 of 35
[2022-12-24, 11:20:37] INFO:	State: bl-baden-wuerttemberg, Category: wohnungen, Type: kaufen, Page: 5 of 35
[2022-12-24, 11:21:47] INFO:	State: bl-baden-wuerttemberg, Category: wohnungen, Type: kaufen, Page: 6 of 35
[2022-12-24, 11:22:58] INFO:	State: bl-baden-wuerttemberg, Category: wohnungen, Type: kaufen, Page: 7 of 35
[2022-12-24, 11:24:08] INFO:	State: b