In [None]:
%pip install -U beautifulsoup4
%pip install -U selenium
%pip install -U pandas
%pip install -U matplotlib
%pip install -U psycopg2 
%pip install -U pyyaml


KeyboardInterrupt



In [None]:
import random
from bs4 import BeautifulSoup, Comment
#from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException
import requests
import re #regex
import datetime
import json
import time
#makes it so all modules are reloaded, allowing...
#...me to not restart the kernel when they are edited!
%load_ext autoreload 
%autoreload 2  
from CallPostgre import Database #custom module for db commands

In [None]:
def getSoupStartingPage(url, chrome_options, current_page_number) -> BeautifulSoup:
    driver = webdriver.Chrome(options=chrome_options)
    driver.get(url)  

    # Load the page when a certain attribute is found, or after a set amount of time
    # Helps ensuring the driver has the fully loaded page content. 
    # Also mimics normal browser behaviour in case a scrapping verification that
    # takes (say) 5 seconds exists.
    max_wait_time = 6
    try:
        WebDriverWait(driver, max_wait_time).until(EC.presence_of_element_located((By.XPATH, f"//link[@href='https://www.zoopla.co.uk/for-sale/property/liverpool/?pn={current_page_number}']")))
        print (f"Page loaded after finding the desired content!")
    except TimeoutException:
        print (f"Page loaded by timeout... ({max_wait_time} seconds)")
    
    html = driver.page_source
    
    # Drivers must be closed otherwise the process of the connection started
    # will keep running in the background, leading to many processes stacking
    # up as the program is ran multiple times, draining my RAM (and CPU) memory
    # until an error starts triggering.
    # PS: works here but not at the end of the program, not yet sure why.
    driver.quit()  
    
    soup = BeautifulSoup(html)
    return soup
    
    
def getID(url_homedetails_portion) -> int:
    regex_home_id = re.compile("(?:.+details\/)(\d+)(?:\/)")
    home_id = regex_home_id.search(url_homedetails_portion).group(1)
    return int(home_id)


def getPrice() -> int:
    elem_price = soup_home.find(attrs={"data-testid":"listing-price"})
    price = elem_price.string
    # 'r' makes it so the parameter is read as a regex, 
    #  which makes it very slightly faster
    price = price.replace("£",r"").replace(",",r"") 
    return int(price)



def getRoomCountsAndPropertySize() -> tuple[list[int|None], int | None]: 
    elem_room_counts = soup_home.find_all(class_=re.compile("^_1qv6swd1$")) #elem->html element
    # Able to save room for a possible "square meters" information, 
    # yet it is almost never present in listings, so, for the time being, 
    # that value will be ignored.
    room_counts_values = [None for i in range(3)]
    room_counts_keys = ["Bedrooms", "Bathrooms", "Living rooms"]
    property_size = None
    
    for elem_room_count in elem_room_counts: 
        if elem_room_count.contents[0].string == "Property size":
            property_size = re.search("^[^\d]*(\d+)" , elem_room_count.contents[2].string).group(1)
        else:
            # get the name of the division
            for i, room_counts_key in enumerate(room_counts_keys):
                # when the website matches one of those names, update that index
                if elem_room_count.contents[0].string == room_counts_key:
                    # regular expression retrieves the first number found,
                    # covering the "n sq. ft case" too
                    room_counts_values[i] = elem_room_count.contents[2].string
                    break
    #Convert from string to int, so they can be stored in the db     
    room_counts = [int(n) if n is not None else None for n in room_counts_values]
    
    if property_size is not None:
        property_size = int(property_size)
    return room_counts, property_size


def getTitleAndLocation() -> tuple[str, str]:
    elem_description = soup_home.find(class_=re.compile("^_1vvnr3j0$"))  
    title = elem_description.contents[0].string
    location = elem_description.contents[1].string
    return title, location


def getListingDate() -> str:
    datestring_unprocessed = soup_home.find(class_=re.compile("^_65yptp1$")).string
    regexdate = re.compile("(3[01]|[0-2][0-9]|[0-9])(?:st|nd|rd|th) ([jJ]an(uary)?|[fF]eb(ruary)?|[mM]ar(ch)?|[aA]pr(il)?|[mM]ay|[jJ]un(e)?|[jJ]ul(y)?|[aA]ug(ust)?|[sS]ep(tember)?|[oO]ct(ober)?|[nN]ov(ember)?|[dD]ec(ember)?) (2[0-9][0-9][0-9])") 
    regexmatch = regexdate.search(datestring_unprocessed)
    datestringcapture = regexmatch.groups() 
    datestring = " ".join([strg for strg in reversed(datestringcapture) if strg is not None])  
    # parse the date, and format it into 'YYYY-MM-DD' 
    dateformatted = datetime.datetime.strptime(datestring, '%Y %b %d') 
    return dateformatted


def getHasPriceInfluences() -> tuple[bool, bool]:
    is_auction = False
    is_shared_ownership = False
    
    try:
        elem_tags = soup_home.find(class_=re.compile("^_1rnkq5r0$"))
        for elem_tag in elem_tags:
            tag = elem_tag.find(class_=re.compile("^_1p8nftv0$")).string
            if tag.casefold() == "auction":
                is_auction = True
            # -There is a slight risk in this method, happening if the website
            # creates another tag with the word "shared".
            # -However the fact that seems unlikely plus the risk of one of the 
            # developers simply changing the wording in "shared ownership"
            # to anything else with the word "shared", made this, in my opinion,
            # be the more resilient choice.
            elif "shared" in tag.casefold():
                is_shared_ownership = True
        return is_auction, is_shared_ownership
    
    #If there's no tag on the house we assume it's not an auction or shared ownership.
    finally:
        return is_auction, is_shared_ownership

    
def getSoupHomeDetails(url_homedetails_portion, chrome_options) -> BeautifulSoup:
    url_homedetails = "https://www.zoopla.co.uk"+ url_homedetails_portion 
    print(f"\nurl_homedetails: {url_homedetails}")
    driver = webdriver.Chrome(options=chrome_options) 
    driver.get(url_homedetails)
    # Similar reasons to WebDriverWait but enforces it so as to not overload 
    # the website with requests, which may not be deemed a "humanly" speed
    # to access the home details page.
    time.sleep(5) 
    
    html = driver.page_source
    driver.quit()
    soup_homedetails = BeautifulSoup(html)
    return soup_homedetails


def getJsonInfo() -> tuple[str, str, str, float, float, list]:
    jsonhtml = soup_homedetails.find(attrs={"type":"application/ld+json"})
    # convert a valid json string into a dict
    jsondict = json.loads(jsonhtml.contents[0]) 
    
    real_estate_agent_name = jsondict['@graph'][1]['name']
    postal_code = jsondict['@graph'][1]['address']['postalCode']
    real_estate_agent_telephone = jsondict['@graph'][1]['telephone']
    # Note: website has "latitude" and "longitude" names mixed, I believe.
    coordinates = [jsondict['@graph'][3]['geo']['latitude'] , jsondict['@graph'][3]['geo']['longitude']] 
    photos = [jsondict['@graph'][3]['photo'][i]['contentUrl'] for i in range(len(jsondict['@graph'][3]['photo']))]
    return real_estate_agent_name, postal_code, real_estate_agent_telephone, float(coordinates[0]), float(coordinates[1]), photos
    

def getEPCRating() -> str | None:
    elem_epc = soup_homedetails.find(class_=re.compile("^_1fuu7p80 _1fuu7p85 _1dgm2fc8 _1jdsy140$"))
    if elem_epc != None: #if house displays epc rating
        elem_epc = elem_epc.find(class_=re.compile("^_1p8nftv0$"))
        elem_epc.div.decompose()
        epc_rating = elem_epc.string
        epc_rating = epc_rating.split(": ")[1] #get only the letter
    else:
        epc_rating = None
    return epc_rating


def getExtras() -> tuple[str|None, int|None, int|None, str|None, int|None]:
    # Tenure, Time remaining on lease, Service charge, Council tax band, Ground rent
    extras_values = [None for i in range(5)] 
    # Susceptible to term changes, but can't dynamically retrieve the names
    # (to prepare for the eventuality of a term change in the website)
    # as some houses don't have all 5 extras terms
    extras_keys = ["Tenure:", "Time remaining on lease:", "Service charge:",
                   "Council tax band:", "Ground rent:"]    
    soup_extras = soup_homedetails.find(class_=re.compile("^_1k66bqh1$"))                                     
    
    for elem_extra in soup_extras.find_all(class_=re.compile("^_1p8nftv1n _1p8nftvk _1p8nftv12$")):       
        # removing comment
        for string in elem_extra(string=True): 
            if isinstance(string, Comment):
                string.extract()
        
        elem_extra_key = elem_extra.contents[0] #(Ex: "Tenure:")
        elem_extra_value = elem_extra.contents[1] #(Ex: "Freehold")

        # REPLACE (word + :) strings with (word:) string #    
        #
        # merge the strings (word + :) separated by the comment into one,
        # so it can be returned by .string
        merged = "".join(string for string in elem_extra_key(string=True)) 
        # leave only one string in the soup (Ex: "Tenure") so that tag.string
        #  can be called for tag.string.replace.
            # Could have simply replaced the value by using
            #  elem_extra_key.string="merged". However it deletes any other content
            #  inside such as tags and although it could have worked in this case,
            #  the solution I used is more general and robust, despite a couple
            #  more computations.
        for i, string in enumerate(elem_extra_key(string=True)): 
            if  i>0:
                string.extract()  
        elem_extra_key.string.replace_with(merged)
        
        for i, extra_key in enumerate(extras_keys):
            # when the extra name matches one of the established extra names,
            #  update its value on the correct extras[] index
            if elem_extra_key.string == extra_key: 
                extras_values[i] = elem_extra_value.string
                break
                
    
    #Since these are UK homes, the monetary values are given in pounds(£)
    tenure = extras_values[0]
    time_remaining_on_lease_years = extras_values[1]
    annual_service_charge = extras_values[2]
    council_tax_band = extras_values[3]
    ground_rent = extras_values[4]
    
    # Council tax band in this website is given in a letter (A/B/C/D,E,F,G,H)
    #  with no more text. Therefore, if there is more text than 1 character,
    #  it's, with a good enough degree of confidence, a "null-equivalent"
    #  message such as: "A band has not yet been confirmed" which can be discarded
    if council_tax_band is not None and len(council_tax_band) > 1:
        council_tax_band = None
    
    # Get the numericals only
    try:
        # Use ("^[^\d]*(\d+(\.*\d+))") if the website ever includes pence (floats)
        time_remaining_on_lease_years = re.search("^[^\d]*(\d+)", time_remaining_on_lease_years).group(1)
        int(time_remaining_on_lease_years)
    except:
        # Any other description on it means it doesn't have a concrete number
        time_remaining_on_lease_years = None
    
    try:
        annual_service_charge = annual_service_charge.replace(",",r"")
        annual_service_charge = re.search("^[^\d]*(\d+)", annual_service_charge).group(1)
        int(annual_service_charge)
    except:
        annual_service_charge = None
    
    try:
        ground_rent = ground_rent.replace(",",r"")
        ground_rent = re.search("^[^\d]*(\d+)", ground_rent).group(1)
        int(ground_rent)
    except:
        ground_rent = None
    
    return (tenure, time_remaining_on_lease_years, annual_service_charge, 
            council_tax_band, ground_rent)
    
    
def getFeatureSet() -> list[str]:
    feature_set = []
    soup_featureset = soup_homedetails.find_all(class_=re.compile("^swbww71$"))
    for elem_featureset in soup_featureset:
        feature_set.append(elem_featureset.span.string)
    return feature_set


def getDescription() -> str:
    soup_descript = soup_homedetails.find(class_=re.compile("^ru2q7m3$")) 
    for br in soup_descript("br"):
        # there are 2 breaks separating each paragraph so instead of double
        # replacing with "\n" I will add it below
        br.replace_with("\n") 
    
    description = "".join(string for string in soup_descript.find_all(string=True))
    return description
        

'''
    DISCLAIMER:

    This project acknowledges Zoopla (ZPG Ltd.) which holds publicly available
    all the data collected for this non-commercial educational project.
    
    After looking into "https://www.zoopla.co.uk/robots.txt" no disallowance 
    was found for the url directories used in this project ("/for-sale/houses",
    "/for-sale/details" and "/new-homes/details") 
    
    No data which requires preiileged access (ex: log-in), 
    sensitive data belonging to an individual or any other non publicly available
    source was used.
    
    There was a further attempt to avoid any copyright infringements, by
    ensuring the https://www.gov.uk/guidance/exceptions-to-copyright under the 
    "Text and data mining for non-commercial research" section was followed.
    
    This project was done in good-faith for educational and non-commercial reasons.
    It MUST NOT be used in any other context. If so, the user may be liable
    to any actions ZPG Ltd. decide to take.
    
    Should a party belonging to ZPG Ltd. find a disagreement with any of the claims
    made above and consider itself aggrieved, it is welcome and encouraged to 
    contact me so it can be resolved.

'''
    
    
    
    
## No main_name idiom as  it's meant to be ran as  a script! ##
'''
Javascript note:

If the content one is looking for is dynamically generated by client-side
javascript as the page loads, then one can only access it using a headless
browser such as selenium.
'''

# Establish connection with the database and instantiate cursor
db = Database()

# Define a custom user agent
user_agents =[
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
]

# Checked <<mainwebsite>>/robots.txt
# Start on page 3 due to page 1 and even 2 homes occasionally being uploaded
#  without all info inserted yet
starting_page_number, ending_page_number = 3, 41
# Scanning and collecting from 10 pages each run 
for current_page_number in range(starting_page_number, ending_page_number):
    print(f"""
    ########################################
    # Web scraping page: {current_page_number}
    ########################################""")
    
    random_user_agent = random.choice(user_agents)
    chrome_options = ChromeOptions()
    chrome_options.add_argument('--headless')
    chrome_options.add_argument(f"--user-agent={random_user_agent}")
    
    url = "https://www.zoopla.co.uk/for-sale/property/liverpool/?q=Liverpool%2C%20Merseyside&search_source=home&pn=" + str(current_page_number)
    soup = getSoupStartingPage(url, chrome_options, current_page_number)
    #Locate html of the listing sections
    soup_homes = soup.find_all(class_=re.compile("^rgd66w1$")) #len25
    for i, soup_home in enumerate(soup_homes):
        print(f"\n ----------Home: {i+1} (in this page)------------ \n")
        # Used on ID and to go the home details page
        url_homedetails_portion = soup_home.get('href')  

        # Home ID #
        home_id = getID(url_homedetails_portion)  

        # Price # (Initial, as it may be slightly changed by the real estate
        #          agents over time)
        price = getPrice()  

        # Room Counts and Property size #
        #
        # property_size given in sq. feet
        room_counts, property_size = getRoomCountsAndPropertySize()     
        bedroom_count = room_counts[0]
        bathroom_count = room_counts[1]
        living_room_count = room_counts[2]


        #Title & Location #
        title, location = getTitleAndLocation()

        # Listing Date # 
        #
        # Store date as dd/mm/yyyy string and use .to_datetime() on pandas to
        #  convert it to datetime. 
        #  Ex: df['date'] = pd.to_datetime(df['date'], dayfirst=True). 
        #   (dayfirst ensures pandas stores it the European/nonUS way)
        listing_date = getListingDate() 

        # Auction and Shared Onwership tags, which will be useful to, for instance,
        # (not) consider their prices, due to bias #
        is_auction, is_shared_ownership = getHasPriceInfluences()


        print(f"\nID: {home_id}")
        print(f"\nPrice: {price}")
        print(f"\nTitle: {title}")
        print(f"Location: {location}")
        print(f"\nBedroom Count: {bedroom_count}")
        print(f"Bathroom Count: {bathroom_count}")
        print(f"Living Room Count: {living_room_count}")
        print(f"Property Size (sq. ft): {property_size}")
        print(f"\nListing Date: {listing_date}") 
        print(f"\nIs Auction: {is_auction}")
        print(f"Is Shared Ownership: {is_shared_ownership}")

        '''
        Go deeper into the house's page
        '''
        # Get html of the houses' page #
        soup_homedetails = getSoupHomeDetails(url_homedetails_portion, chrome_options)

        # Get json info # (agent, postal c., phone, coords, photos' urls)
        real_estate_agent_name, postal_code, real_estate_agent_telephone, coordinate_x, coordinate_y, photos = getJsonInfo()
        print(f"\nReal Estate Agent Name: {real_estate_agent_name}")
        print(f"Postal code: {postal_code}")
        print(f"Real Estate Agent Telephone: {real_estate_agent_telephone}")
        print(f"Coordinates: {coordinate_x}, {coordinate_y}")

        # EPC Rating #
        epc_rating = getEPCRating()
        print(f"EPC Rating: {epc_rating}")

        # Extras # 
        # (Tenure, Time Remaining On Lease, Service Charge,
        #  Council Tax Band, Ground Rent)
        tenure, time_remaining_on_lease_years, annual_service_charge, council_tax_band, ground_rent = getExtras()
        #p#print(f"\nTenure: {tenure}")
        #p#print(f"Time remaining on lease (years): {time_remaining_on_lease_years}")
        #p#print(f"Service charge (p/year): {annual_service_charge}")
        #p#print(f"Council tax band: {council_tax_band}")
        #p#print(f"Ground rent (£): {ground_rent}")

        # Feature Set #
        feature_set = getFeatureSet()
        #p#print(f"\nFeature Set: {feature_set}")

        # Description #
        description = getDescription()
        #p#print(f"\nDescription: {description}")
        
        # Insert data in the db #
        db.insert(home_id, price, is_auction, is_shared_ownership,
                  title, location, coordinate_x, coordinate_y, postal_code,
                  bedroom_count, bathroom_count, living_room_count,
                  property_size, epc_rating, feature_set, listing_date,
                  photos, description, tenure, time_remaining_on_lease_years,
                  annual_service_charge, council_tax_band, ground_rent,
                  real_estate_agent_name, real_estate_agent_telephone)

print(f"\n End of the listing pages has been reached!")

db.disconnect()      