# Pipeline Functions

In this script we are defining the functions that are doing the actual data retrieving part.

## Preamble

Import the necessary libraries.

In [4]:
import numpy as np
import pandas as pd
import requests
import re
import random
import time
from datetime import datetime
from bs4 import BeautifulSoup
from nordvpn_switcher import initialize_VPN, rotate_VPN

# Webscraping Functions

## Paramater Explanation

Function to scrape data from a website in a loop with error handling and logging.

Parameters:
- `user`: str, the username or identifier of the user performing the scraping.
- `sleep_interval`: tuple, interval for sleeping between requests to avoid being blocked by the website.
- `attribute_exception_list`: list, a list of attributes to exclude when scraping car information.
- `instructions_vpn`: str, instructions for rotating the VPN connection.
- `n_tries`: int, number of attempts to scrape data.
- `max_pages`: int, maximum number of pages to scrape for each car model.

<br>
Returns:

- `result_df`: pandas DataFrame, scraped data.

## Base Function

This is the first iteration of the data-retrieving loop. More information on how the loop operates can be found in the comments inside the loop.

In [3]:
# Function to perform web scraping with optimization
def webscrape_loop(user, sleep_interval, attribute_exception_list,
                             instructions_vpn, in_out_path,
                             n_tries = 10, max_pages = 20, do_backup = False,
                             adage = 7, use_recency = False):
    
    try:

        for _ in range(n_tries):

            offer_list = []
            try:
                # Setting up file paths and initializing variables
                input_path = in_out_path
                try:
                    result_df = pd.read_csv(input_path, low_memory=False)
                    all_used_urls = set(result_df["url"])
                    print("Loaded", len(result_df), "entries.")
                    print("")
                except FileNotFoundError:
                    result_df = pd.DataFrame()
                    all_used_urls = set()

                n_duplicates = 0
                logging_df = pd.read_csv(f"logging/logging_data/logging_df_{user}.csv")
                logging_df_short = logging_df[logging_df["user"] == user].copy()

                # Looping through the logging dataframe
                for _, row in logging_df_short.iterrows():
                    curr_brand = str(row["brand"])
                    curr_model = str(row["model"])

                    # Checking year condition for VPN rotation
                    if int(row["curr_year"]) < 2025:
                        rotate_VPN(instructions_vpn)

                        # Iterating over years and pages
                        for curr_year in range(int(row["curr_year"]), int(row["end_year"]) + 1):
                             
                            n_duplicates = 0 # reset number of duplicates for the upcoming scraped year

                            for curr_page in range(1, max_pages + 1):

                                print(f"Current Brand: {curr_brand}\nCurrent Model: {curr_model}\nCurr Year: {curr_year}\nCurr Page: {curr_page}")

                                current_datetime = datetime.now()
                                formatted_date_hour = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
                                print("Current Time:", formatted_date_hour)
                                # Retrieving page HTML and parsing
                                print(f"Reading Page {curr_page}.")
                                
                                if use_recency:
                                    model_url = f"https://www.autoscout24.de/lst/{curr_brand}/{curr_model}/re_{curr_year}?adage={adage}&atype=C&cy=D&damaged_listing=exclude&desc=0&ocs_listing=include&page={curr_page}&powertype=kw&search_id=ehnjs4dnm6&sort=standard&source=listpage_pagination"

                                else:
                                    model_url = f"https://www.autoscout24.de/lst/{curr_brand}/{curr_model}/re_{curr_year}?atype=C&cy=D&damaged_listing=exclude&desc=0&ocs_listing=include&page={curr_page}&powertype=kw&search_id=ehnjs4dnm6&sort=standard&source=listpage_pagination"

                                response = requests.get(model_url)
                                html = response.text
                                doc = BeautifulSoup(html, "html.parser")

                                # Extracting number of offers
                                no_of_offers = int(doc.find('div', class_='ListHeader_title_with_sort__Pf4Zw').find('span').text.strip().split()[0].replace(".", ""))
                                print("Number of total offers:", no_of_offers)
                                
                            
                                
                                # Handling no offers case
                                if no_of_offers == 0:
                                    sleep_time = random.uniform(*sleep_interval)
                                    time.sleep(sleep_time)
                                    print("No offers for this year!")

                                    break

                                sleep_time = random.uniform(*sleep_interval)
                                time.sleep(sleep_time)

                                # Extracting offer URLs
                                offer_list = [paragraph.get("href") for paragraph in doc.find_all("a")
                                              if r'/angebote/' in str(paragraph.get("href")) and r'/leasing/' not in str(paragraph.get("href")) and r'/recommendation/' not in str(paragraph.get("href"))]

                                print(f"Accessing a total of {len(offer_list)} offers on this page!")

                                # Handling last page condition
                                if len(offer_list) == 0:
                                    print("Last page reached!")
                                    break
                                else:
                                    print("Estimated total sleeping time until next page:", len(offer_list) * np.mean(sleep_interval), "seconds.")

                                # Looping through offer URLs
                                
                                
                                for item in offer_list:
                                    try:
                                        curr_item_url = "https://www.autoscout24.de" + item
                                        if curr_item_url not in all_used_urls:
                                            current_delay = random.uniform(*sleep_interval)
                                            time.sleep(current_delay)

                                            response = requests.get(curr_item_url)
                                            html = response.text
                                            doc = BeautifulSoup(html, "html.parser")

                                            # maybe extract the features manually as we get errors for assigning the right values to the right columns
                                            curr_car_dict = {}
                                            for key, value in zip(doc.find_all("dt"), doc.find_all("dd")):
                                                if key.text not in attribute_exception_list:
                                                    curr_car_dict[key.text.replace("\n", "")] = value.text.replace("\n", "")

                                            curr_car_dict["url"] = curr_item_url
                                            curr_car_dict["date"] = datetime.now().strftime("%Y-%m-%d")
                                            curr_car_dict["time"] = datetime.now().strftime("%H-%M-%S")
                                            curr_car_dict["model"] = doc.find("span", class_="StageTitle_model__EbfjC StageTitle_boldClassifiedInfo__sQb0l").get_text()
                                            curr_car_dict["brand"] = doc.find("span", class_="StageTitle_boldClassifiedInfo__sQb0l").get_text()

                                            curr_df = pd.DataFrame.from_dict(curr_car_dict, orient="index").T
                                            
                                            
                                            ################## Janik: New Part ########################
                                            
                                            # Check if the column exists, if not, add it to the beginning
                                            if 'Barzahlungspreis' not in curr_df.columns:
                                                curr_df.insert(0,
                                                                'Barzahlungspreis',
                                                                re.split(r'(?<=-)', doc.find('div', class_='PriceInfo_wrapper__hreB_').find('span', class_='PriceInfo_price__XU0aF').text.strip())[0])
                                            
                                            #############################################################
                                            
                                            
                                            result_df = pd.concat([result_df, curr_df])
                                            all_used_urls.add(curr_item_url)
                                        else:
                                            n_duplicates = n_duplicates + 1
                                            print("Duplicate found:", curr_item_url)
                                        

                                    except Exception as e:
                                        print("Error occurred in accessing car url:", str(e))
                                        print("")

                                        print("CURRENTLY SAVING ALL, DONT STOP!")
                                        logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
                                        result_df.to_csv(in_out_path, index=False)
                                        print("Saved Result and Logging DF and rotating VPN!")
                                        rotate_VPN(instructions_vpn)

                                        break

                                if len(offer_list) <= 5:
                                    print("Last page reached!")
                                    break

                                print("")

                            logging_df.loc[(logging_df["user"] == user) &
                                           (logging_df["brand"] == curr_brand) &
                                           (logging_df["model"] == curr_model), "curr_year"] = curr_year + 1

                            logging_df.loc[(logging_df["user"] == user) &
                                           (logging_df["brand"] == curr_brand) &
                                           (logging_df["model"] == curr_model), "last_scraped"] = datetime.now().strftime("%Y-%m-%d %H %M")

                            print("")
                            
                            try:
                                print("CURRENTLY SAVING LOGGING_DF, DONT STOP!")
                                logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
                                print("SAVING DONE!")
                                
                            except KeyboardInterrupt:
                                
                                print("")
                                print("SAVING ALL DUE TO KEYBOARD INTERRUPT!")
                                logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
                                result_df.to_csv(in_out_path, index=False)
                                print("SAVING DONE!")

                            print("")
                            print(f"Amount of offers for this year: {no_of_offers}\nAmount of Duplicatesfor this year: {n_duplicates}")
                            print("")

                            if no_of_offers - n_duplicates > 0:
                                print("")

                                print("CURRENTLY SAVING RESULTS, DONT STOP!")
                                result_df.to_csv(in_out_path, index=False)
                                print("SAVING DONE!")
                        
  

                            print("")
                            print("########################################")
                            print("")

                            print("")
                            print("Current amount of entries in the Result Dataframe:", len(result_df))


                        try:

                            if do_backup:
                                current_datetime = datetime.now()
                                formatted_date_hour = current_datetime.strftime("%Y-%m-%d %H %M")
                                result_df.to_csv(f"scraped_data/{user}/backups/{user}_data_{formatted_date_hour}.csv")
                                print(f"Saved Dataframe successfully after finishing model {curr_model}!")

                        except Exception as e2:
                            print("Exception while saving occurred:", e2)

                print("Duplicates found during Scraping:", n_duplicates_found)
                return result_df
            
            except PermissionError as e:
                
                print("Main Loop Permission Error:", e)
                sleeping_time_after_permission_error = 180
                
                print("\n")
                print(f"Sleeping for {sleeping_time_after_permission_error} seconds, to wait for cloud update!")
                time.sleep(sleeping_time_after_permission_error)
                print("\n")
                
                print("CURRENTLY SAVING ALL, DONT STOP!")
                logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
                result_df.to_csv(in_out_path, index=False)
                print("SAVING DONE!")
                print("Saved Result and Logging DF and rotating VPN!")
                print("\n")
                rotate_VPN(instructions_vpn)
            
            except Exception as e:

                print("Main Loop Exception:", e)
                print("")  

                print("CURRENTLY SAVING ALL, DONT STOP!")
                logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
                result_df.to_csv(in_out_path, index=False)
                print("SAVING DONE!")
                print("Saved Result and Logging DF and rotating VPN!")
                rotate_VPN(instructions_vpn)

    
    except KeyboardInterrupt:
        
        print("")
        print("SAVING ALL DUE TO KEYBOARD INTERRUPT!")
        logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
        result_df.to_csv(in_out_path, index=False)
        print("SAVING DONE!")
        
    except PermissionError: 
        
        print("")
        print("SAVING ALL DUE PERMISSION ERROR!")
        logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
        result_df.to_csv(in_out_path, index=False)
        print("SAVING DONE!")
        
    except:
        
        print("")
        print("SAVING ALL DUE TO UNSEEN EXCEPTION!")
        logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
        result_df.to_csv(in_out_path, index=False)
        print("SAVING DONE!")

## Optimized Function

### Base Functions

In [None]:
def save_data(logging_df, result_df, user, in_out_path):
    # Save logging and result data
    logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
    result_df.to_csv(in_out_path, index=False)
    print("Data saved successfully!")
    print("")

def scrape_offers(doc):
    # Extract offer URLs from the parsed HTML document
    return [
        "https://www.autoscout24.de" + a.get("href")
        for a in doc.find_all("a")
        if '/angebote/' in str(a.get("href")) and '/leasing/' not in str(a.get("href")) and '/recommendation/' not in str(a.get("href"))
    ]

def extract_car_data(doc, url):
    # Extract car data from the parsed HTML document
    car_data = {}
    for key, value in zip(doc.find_all("dt"), doc.find_all("dd")):
        if key.text not in attribute_exception_list:
            car_data[key.text.strip()] = value.text.strip()
    car_data.update({
        "url": url,
        "date": datetime.now().strftime("%Y-%m-%d"),
        "time": datetime.now().strftime("%H-%M-%S"),
        "model": doc.find("span", class_="StageTitle_model__EbfjC").get_text(),
        "brand": doc.find("span", class_="StageTitle_boldClassifiedInfo__sQb0l").get_text()
    })
    return car_data

### Main Scraping Function

In [None]:
# Define a function to perform web scraping with optimization
def webscrape_loop_optimized(user, sleep_interval, attribute_exception_list, instructions_vpn, in_out_path,
                   n_tries=10, max_pages = 20, do_backup = False, adage = 7, use_recency = False, print_duplicate_url = False):

    n_duplicates = 0
    try:
        for _ in range(n_tries):
            try:
                # Load existing data
                try:
                    result_df = pd.read_csv(in_out_path, low_memory=False)
                    all_used_urls = set(result_df["url"])
                    print(f"Loaded {len(result_df)} entries.\n")
                except FileNotFoundError:
                    result_df = pd.DataFrame()
                    all_used_urls = set()

                # Load logging data
                logging_df = pd.read_csv(f"logging/logging_data/logging_df_{user}.csv")
                logging_df_short = logging_df[logging_df["user"] == user].copy()

                for _, row in logging_df_short.iterrows():
                    curr_brand = str(row["brand"])
                    curr_model = str(row["model"])

                    # Rotate VPN if necessary
                    if int(row["curr_year"]) < row["end_year"]:
                        rotate_VPN(instructions_vpn)

                    for curr_year in range(int(row["curr_year"]), int(row["end_year"]) + 1):
                        n_duplicates = 0  # Reset duplicate count for each year

                        for curr_page in range(1, max_pages + 1):
                            
                            print(f"\nBrand: {curr_brand}\nModel: {curr_model}\nYear: {curr_year}\nPage: {curr_page}\n")
                            print(f"Current Amount of total Entries: {len(result_df)}!")
                            print("Current Time:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  
                            # Generate the URL based on the given parameters
                            if use_recency:
                                model_url = f"https://www.autoscout24.de/lst/{curr_brand}/{curr_model}/re_{curr_year}?adage={adage}&atype=C&cy=D&damaged_listing=exclude&desc=0&ocs_listing=include&page={curr_page}&powertype=kw&search_id=ehnjs4dnm6&sort=standard&source=listpage_pagination"
                            else:
                                model_url = f"https://www.autoscout24.de/lst/{curr_brand}/{curr_model}/re_{curr_year}?atype=C&cy=D&damaged_listing=exclude&desc=0&ocs_listing=include&page={curr_page}&powertype=kw&search_id=ehnjs4dnm6&sort=standard&source=listpage_pagination"

                            response = requests.get(model_url)
                            doc = BeautifulSoup(response.text, "html.parser")

                            # Extract number of offers
                            try:
                                no_of_offers = int(doc.find('div', class_='ListHeader_title_with_sort__Pf4Zw').find('span').text.strip().split()[0].replace(".", ""))
                                print("Number of total offers:", no_of_offers)
                            except AttributeError:
                                no_of_offers = 0
                                print("Failed to extract number of offers.")

                            # If no offers, break the loop
                            if no_of_offers == 0:
                                time.sleep(random.uniform(*sleep_interval))
                                print("No offers for this year, continuing with next year!")
                                break

                            time.sleep(random.uniform(*sleep_interval))

                            # Scrape offer URLs
                            offer_list = scrape_offers(doc)
                            print(f"Found {len(offer_list)} offers on this page.")
                            print("")

                            if len(offer_list) == 0:
                                print("Last page reached!")
                                print("")
                                break

                            # Process each offer
                            for item in offer_list:
                                try:
                                    if item not in all_used_urls:
                                        time.sleep(random.uniform(*sleep_interval))
                                        response = requests.get(item)
                                        doc = BeautifulSoup(response.text, "html.parser")

                                        # Extract car data and update result dataframe
                                        car_data = extract_car_data(doc, item)
                                        
                                                                                
                                        ################## Janik: New Part ########################
                                        
                                        # Check if the column exists, if not, add it to the beginning
                                        if 'Barzahlungspreis' not in car_data:
                                            price_information =  re.split(r'(?<=-)', doc.find('div', class_='PriceInfo_wrapper__hreB_').find('span', class_='PriceInfo_price__XU0aF').text.strip())[0]
                                            car_data = {'Barzahlungspreis':  price_information, **car_data}

                                        #############################################################
                                        
                                        
                                        
                                        result_df = pd.concat([result_df, pd.DataFrame([car_data])], ignore_index=True)
                                        all_used_urls.add(item)
                                    else:
                                        n_duplicates += 1
                                        if print_duplicate_url:
                                    
                                            print("Duplicate found:", item)
                                        
                                except Exception as e:
                                    print("Error accessing car URL:", e)
                                    save_data(logging_df, result_df, user, in_out_path)
                                    rotate_VPN(instructions_vpn)
                                    break

                            if len(offer_list) <= 5:
                                print("Last page reached!")
                                break

                        # Update logging data
                        logging_df.loc[(logging_df["user"] == user) & (logging_df["brand"] == curr_brand) & (logging_df["model"] == curr_model), "curr_year"] = curr_year + 1
                        logging_df.loc[(logging_df["user"] == user) & (logging_df["brand"] == curr_brand) & (logging_df["model"] == curr_model), "last_scraped"] = datetime.now().strftime("%Y-%m-%d %H %M")

                        print(f"Offers for {curr_year}: {no_of_offers}, Duplicates: {n_duplicates}")
                        
                        if no_of_offers - n_duplicates > 0:
                            
                            try:
                                print("")
                                print("CURRENTLY SAVING RESULTS, DONT STOP!")
                                result_df.to_csv(in_out_path, index=False)
                                print("SAVING DONE!")
                                
                            except KeyboardInterrupt:
                                
                                print("Interrupted by user. Saving data...")
                                save_data(logging_df, result_df, user, in_out_path)

                        # Save progress
                        try:
                            print("")
                            print("CURRENTLY SAVING LOGGING_DF, DONT STOP!")
                            logging_df.to_csv(f"logging/logging_data/logging_df_{user}.csv", index=False)
                            print("SAVING DONE!")
                            print("")

                        except KeyboardInterrupt:

                            print("Permission Error. Saving data...")
                            save_data(logging_df, result_df, user, in_out_path)
                            

                        if do_backup:
                            backup_path = f"scraped_data/{user}/backups/{user}_data_{datetime.now().strftime('%Y-%m-%d %H-%M')}.csv"
                            result_df.to_csv(backup_path)
                            print(f"Backup saved to {backup_path}")
                            
                        print("##################################################################")
                        print("")

                print("Scraping completed. Duplicates found:", n_duplicates)
                return result_df

            except PermissionError as e:
                print("Permission Error:", e)
                print("Waiting for cloud update...")
                time.sleep(180)
                save_data(logging_df, result_df, user, in_out_path)
                rotate_VPN(instructions_vpn)

            except Exception as e:
                print("Exception:", e)
                save_data(logging_df, result_df, user, in_out_path)
                rotate_VPN(instructions_vpn)

    except KeyboardInterrupt:
        print("")
        print("Interrupted by user. Saving data...")
        save_data(logging_df, result_df, user, in_out_path)

    except PermissionError:
        print("")
        print("Permission Error. Saving data...")
        save_data(logging_df, result_df, user, in_out_path)

    except Exception as e:
        print("")        
        print("Unexpected Exception:", e)
        save_data(logging_df, result_df, user, in_out_path)

# Concat Scraped Data

In [None]:
def get_scraped_data(user_list, project_location):

    scraped_data = pd.DataFrame()

    for user in user_list:

        input_path = f"{project_location}scraped_data\\{user}\\{user}_data.csv"


        try:
            user_df = pd.read_csv(input_path, low_memory = False)
            print("Data from", user, "succesfully loaded with", len(user_df), "entries!")


        except FileNotFoundError:

            user_df = pd.DataFrame()

        scraped_data = pd.concat([scraped_data, user_df])

    initial_length = len(scraped_data)
    print("")
    print("Concatenated DF created with", initial_length, "entries!")
    
    return scraped_data

In [None]:
def drop_unnamed_columns(df):
    
    df1 = df.copy()
    # List comprehension to get column names containing "Unnamed" 
    unnamed_columns = [col for col in df.columns if 'Unnamed' in col]
    
    print("Dropping " + str(len(unnamed_columns)) + " columns:\n" + str(unnamed_columns))
    # Drop the unnamed columns
    df1 = df1.drop(columns=unnamed_columns, errors = 'ignore')
    
    return df1