# Automatic Scraping with GPT and google search

In this notebook we illustrate an example on how to perform automatic scraping with Google Search and GPT.

In the notebooks we scrape data for publishing companies in Switzerland, but it can be easily extended to other use cases.

In [None]:
import json
from urllib.parse import urlparse
from urllib.robotparser import RobotFileParser

import requests
from bs4 import BeautifulSoup
from googlesearch import search
from openai import BadRequestError, OpenAI
from requests.exceptions import ConnectionError


## Input Parameters

- `google_query`: The query to get a list of websites from google search. The resulting websites are used to identify a list of `target`.
- `target`: A short description of the items we are looking for.
- `fields`: A list of fields which we would like to fill for each item.

In [None]:
# You can adapt the inputs to your needs.
# Note: you might need to add exception handling if you encounter an error.
google_query = "Switzerland list of book publishing companies"
target = "publishing companies"
fields = [
    "genres",
    "year founded",
    "location",
    "official website",
    "short description",
]

## Helper functions

Let's define some helper functions to use later on.

In [None]:
def can_scrape(url):
    """
    Returns True if the useragent is allowed to fetch the url
    according to the rules contained in the parsed robots.txt file.
    """
    parsed_url = urlparse(url)
    robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
    rp = RobotFileParser(robots_url)
    rp.read()
    return rp.can_fetch("*", url)


def fetch_html_content(search_results):
    """Get html for websites not forbidding scraping"""
    html_content = {}
    for result in search_results:
        print(f"Parsing {result}")

        # Check if web scraping is allowed by robots.txt
        if can_scrape(result):
            # Send an HTTP GET request to the website
            try:
                response = requests.get(result)
            except ConnectionError:
                print("Failed to retrieve the webpage.")

            # Check if the request was successful
            if response.status_code == 200:
                html_content[result] = response.text

            else:
                print("Failed to retrieve the webpage.")
        else:
            print("Web scraping is not allowed by robots.txt.")
    return html_content


def extract_html_text(html_content):
    """
    Parse the webpage content with Beautiful Soup
    """
    html_text = {}
    for website, content in html_content.items():
        html_text[website] = BeautifulSoup(content, "html.parser").get_text()
    return html_text


def search_fetch_text(google_query, num_results=3):
    """
    Perform a google search and return the text content of the websites
    for which scraping is not forbidden.
    """
    search_results = list(search(google_query, num_results=num_results))
    html_content = fetch_html_content(search_results)
    return extract_html_text(html_content)


def get_prompt_list_targets(target, website, website_text):
    return (
        f"Extract a list of {target} from the text content of {website}. "
        f"Use `{target}` as key in the json. "
        f"If the content is not about {target}, return an empty list. "
        f"Here's the text content extracted from html: ```{website_text}```"
    )


def get_prompt_parsing_assistant():
    return "You are a parsing assistant, returning answers in json format."


def get_prompt_extract_fields(fields, website, website_text):
    return (
        f"Extract the following fields from the text content of {website}: {fields}. "
        f"Use the fields as key in the output json. If a field is not available, return null for that field."
        f"Here's the text content extracted from html: ```{website_text}```"
    )


def run_gpt(prompt, client):
    return client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": get_prompt_parsing_assistant()},
            {"role": "user", "content": prompt},
        ],
    )


def completion_to_json(completion):
    """
    Convert completion to json.
    Assumes that the message is json de-serializable.
    """
    return json.loads(completion.choices[0].message.model_dump()["content"])


## Scraping

We perform the following steps:
1. Fetch a website using the google query
2. Create a list of `target` using GPT on the website above
3. For each target, use the google search to fetch a corresponding website.
4. Use GPT to look for `fields` in the website content.

In [None]:
# Let's instantiate an OpenAI client. 
# Instructions to setup the OpenAI key are available at: https://platform.openai.com/docs/quickstart/step-2-setup-your-api-key
client = OpenAI()

In [None]:
# Fetch websites and their content using google_query
html_text = search_fetch_text(google_query, 3)

In [None]:
# Let's use only the first result for this example.
website = next(iter(html_text))
website_text = html_text[website]

In [None]:
%%time
# Fetch a list of targets
prompt = get_prompt_list_targets(target,website,website_text)
completion = run_gpt(prompt,client)
json_list = completion_to_json(completion)

The following cell should output something like:
```
{'publishing companies': ['Betty Bossi',
  'BirkhäuserD',
  'Diogenes VerlagE',
  'Editions Librisme',
  'Edizioni Casagrande',
  'JRP-Ringier',
  'Manesse Verlag',
  'NZZ Mediengruppe',
  'Orell Füssli',
  'RCL Benziger',
  'Schwabe (publisher)',
  'Skira (publisher)',
  'Société typographique de Neuchâtel',
  'Stämpfli (publisher)']}
```

In [None]:
json_list

In [None]:
%%time
# For each target in the list, fetch fields

json_data = {}
for target_name in json_list[target]:
    print(f"*** Processing {target_name} ***")

    # Fetch websites and their content
    try:
        html_text = search_fetch_text(f"{target} {target_name}", 3)
    except OSError:
        print("Fetching websites content failed.")
        continue
        
    if len(html_text) == 0:
        print(f"Skipping {target_name}, no websites.")
        continue
        
    # Let's use only the first website available
    website = next(iter(html_text))
    website_text = html_text[website]
    
    # Fetch fields
    prompt = get_prompt_extract_fields(fields,website,website_text)
    try:
        completion = run_gpt(prompt,client)
    except BadRequestError:
        print("Bad request from openai")
        continue
    output = completion_to_json(completion)

    # Store fields
    output["source"] = website
    json_data[target_name] = output

    print("output:", output)

The following cell should output something like:
```
{'Betty Bossi': {'genres': None,
  'year founded': None,
  'location': None,
  'official website': None,
  'short description': 'Swiss cookbook publisher',
  'source': 'https://en.wikipedia.org/wiki/Betty_Bossi'},
 'Diogenes VerlagE': {'genres': None,
  'year founded': '1952',
  'location': 'Zurich',
  'official website': 'https://www.diogenes.ch/',
  'short description': "Diogenes is the largest independent fiction publisher in Europe, with international bestselling authors and a comprehensive collection of classics, art and cartoon volumes, and children's books.",
  'source': 'https://www.diogenes.ch/foreign-rights/about.html'},
 'Editions Librisme': {'genres': None,
  'year founded': '2005',
  'location': None,
  'official website': 'editions-librisme.com',
  'short description': 'publishing association',
  'source': 'https://en.wikipedia.org/wiki/Editions_Librisme'},
 'Edizioni CasagrandeJ': {'genres': None,
  'year founded': '1949',
  'location': 'Bellinzona in Switzerland',
  'official website': 'http://www.edizionicasagrande.com/',
  'short description': 'Italian-language publisher, focused on the art and history of Italian Switzerland.',
  'source': 'https://en.wikipedia.org/wiki/Edizioni_Casagrande'},
 'JRP-RingierM': {'genres': 'Contemporary art magazines',
  'year founded': '2004',
  'location': 'Zurich',
  'official website': 'jrp-editions.com',
  'short description': 'Swiss publisher of high-quality books on contemporary art',
  'source': 'https://en.wikipedia.org/wiki/JRP-Ringier'},
 'NZZ MediengruppeO': {'genres': 'Media',
  'year founded': '1780',
  'location': 'Zurich',
  'official website': 'www.nzzmediengruppe.ch',
  'short description': 'Swiss media company',
  'source': 'https://en.wikipedia.org/wiki/NZZ_Mediengruppe'},
 'RCL BenzigerS': {'genres': None,
  'year founded': '1792',
  'location': 'Cincinnati, Ohio',
  'official website': 'www.rclbenziger.com',
  'short description': 'Catholic publishing house',
  'source': 'https://en.wikipedia.org/wiki/RCL_Benziger'},
 'Schwabe (publisher)': {'genres': None,
  'year founded': '1488',
  'location': 'Basel',
  'official website': 'www.schwabe.ch',
  'short description': 'Swiss printer and publisher',
  'source': 'https://en.wikipedia.org/wiki/Schwabe_(publisher)'},
 'Skira (publisher)': {'genres': None,
  'year founded': None,
  'location': None,
  'official website': None,
  'short description': 'A boundless brand',
  'source': 'https://www.skira.net/en/skira-around-the-world/'},
 'Stämpfli (publisher)': {'genres': None,
  'year founded': '1599',
  'location': 'Wölflistrasse 1, 3001 Bern, Switzerland',
  'official website': 'www.staempfli.com/en',
  'short description': 'Swiss printing and publishing house',
  'source': 'https://en.wikipedia.org/wiki/St%C3%A4mpfli_(publisher)'}}
```

In [None]:
json_data

In [None]:
# We can e.g. convert the output to a dataframe and store it for later use.
import pandas as pd

data = pd.DataFrame(json_data).transpose().reset_index().rename(columns={"index": target})
data.to_csv(f"scraped_{target.replace(' ','_')}.csv", index=False)
data