Skip to content

Commit

Permalink
Crawler implementation (#8)
Browse files Browse the repository at this point in the history
* Crawler implementation

* Fixing lint

* Storing variables
  • Loading branch information
brahle committed Sep 8, 2023
1 parent ec2b42c commit 7fd6d20
Show file tree
Hide file tree
Showing 13 changed files with 567 additions and 6 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,5 @@ dmypy.json

# templates
.github/templates/*

notebooks
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fmt: ## Format code using black & isort.

.PHONY: lint
lint: ## Run pep8, black, mypy linters.
$(ENV_PREFIX)flake8 --max-line-length 120 --ignore=E203 brds/
$(ENV_PREFIX)flake8 --max-line-length 120 --ignore=E203,W503 brds/
$(ENV_PREFIX)black -l 119 --check brds/
$(ENV_PREFIX)black -l 119 --check tests/
$(ENV_PREFIX)mypy --ignore-missing-imports brds/
Expand Down
Empty file added brds/core/crawler/__init__.py
Empty file.
40 changes: 40 additions & 0 deletions brds/core/crawler/browser_emulator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import Optional

from requests import Response, Session

from brds.core.crawler.domain_rate_limiter import DomainRateLimiter


class BrowserEmulator:
def __init__(self, rate_limiter: Optional[DomainRateLimiter] = None):
if rate_limiter is None:
rate_limiter = DomainRateLimiter()
self.rate_limiter = rate_limiter
self.session = Session()
self.session.headers.update(
{
"User-Agent": self.user_agent(),
"Accept": self.accept_header(),
"Accept-Language": "en-US,en;q=0.5",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
)

def get(self, url, **kwargs) -> Response:
self.rate_limiter.limit(url)
return self.session.get(url, **kwargs)

def post(self, url, data=None, json=None, **kwargs) -> Response:
self.rate_limiter.limit(url)
return self.session.post(url, data=data, json=json, **kwargs)

def accept_header(self: "BrowserEmulator") -> str:
return "text/html"

def user_agent(self: "BrowserEmulator") -> str:
return (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 "
+ "Safari/537.36 Edg/116.0.1938.69"
)
73 changes: 73 additions & 0 deletions brds/core/crawler/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Tuple

import yaml


def find_md_files(dir: str) -> Iterable[Tuple[str, str]]:
for a, b, c in os.walk(dir):
for file in c:
if file.endswith(".md"):
yield (file, (os.path.join(a, file)))


def get_configs(file_name: str) -> Iterable[List[str]]:
config: List[str] = []
in_config = False

with open(file_name, "r") as file:
for line in file:
clean = line.strip()
if clean == "```yaml":
in_config = True
continue
if in_config:
if clean == "```":
in_config = False
yield config
config = []
continue
config.append(line)


def get_all_configs(root: str) -> Any:
for file_name, full_path in find_md_files(root):
for config in get_configs(full_path):
try:
res = yaml.safe_load("".join(config))
except yaml.scanner.ScannerError as err:
raise RuntimeError(f"Error processing {full_path}") from err
res["_filepath"] = full_path
yield res


def remove_default_params(params: Dict[str, Any]) -> Dict[str, Any]:
ret = deepcopy(params)
for key in ["type"]:
if key in ret:
del ret[key]
return ret


class ConfigStore:
def __init__(self: "ConfigStore", root: str) -> None:
self.root = root
self.configs = list(get_all_configs(root))

self.name_index: Dict[str, Any] = {}
for config in self.configs:
assert config["name"] not in self.name_index, f"Duplicate config with name '{config['name']}'"
self.name_index[config["name"]] = config

self.type_index: Dict[str, Any] = {}
for config in self.configs:
if config["type"] not in self.type_index:
self.type_index[config["type"]] = []
self.type_index[config["type"]].append(config)

def __getitem__(self: "ConfigStore", name: str) -> Any:
return self.name_index[name]

def get_by_type(self: "ConfigStore", type: str) -> Any:
return self.type_index[type]
34 changes: 34 additions & 0 deletions brds/core/crawler/domain_rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from collections import defaultdict
from time import sleep, time
from typing import Callable, Dict, Union
from urllib.parse import urlparse

Number = Union[int, float]
CallableOrNumber = Union[Number, Callable[[], Number]]


class DomainRateLimiter:
def __init__(self: "DomainRateLimiter", delay: CallableOrNumber = 5) -> None:
self.last_request_time: Dict[str, float] = defaultdict(float)
self._delay = delay

def get_domain(self: "DomainRateLimiter", url: str) -> str:
return urlparse(url).netloc

def wait_if_needed(self: "DomainRateLimiter", domain: str) -> None:
elapsed_time = time() - self.last_request_time[domain]
delay = self.delay
if elapsed_time < delay:
time_to_wait = delay - elapsed_time
sleep(time_to_wait)

def limit(self: "DomainRateLimiter", url: str) -> None:
domain = self.get_domain(url)
self.wait_if_needed(domain)
self.last_request_time[domain] = time()

@property
def delay(self: "DomainRateLimiter") -> Number:
if callable(self._delay):
return self._delay()
return self._delay
156 changes: 156 additions & 0 deletions brds/core/crawler/root_crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from copy import deepcopy
from itertools import product
from os.path import join
from typing import Any, Dict, Iterable, List, Tuple
from urllib.parse import urlparse

from brds.core.crawler.browser_emulator import BrowserEmulator
from brds.core.crawler.config import ConfigStore, remove_default_params
from brds.core.crawler.variables import VariableHolder
from brds.core.fs.writer import FileWriter
from brds.db.init_db import Database


class Crawler:
def __init__(
self: "Crawler",
configs: ConfigStore,
database: Database,
browser_emulator: BrowserEmulator,
file_writer: FileWriter,
name: str,
variables: List[str],
inputs: List[str],
urls: List[Dict[str, Any]],
loop_variables: List[str],
_filepath: str,
) -> None:
self.configs = configs
self.database = database
self.browser_emulator = browser_emulator
self.file_writer = file_writer

self.name = name
self.variables = variables
self.inputs = inputs
self.urls = urls
self.loop_variables = loop_variables
self._filepath = _filepath

def execute(self: "Crawler") -> None:
for input_variables in self.iterate_inputs():
vars = self.merge_variables(input_variables)
orig_vars = deepcopy(vars)
print("ORIG:", orig_vars.variables)
for loop_vars in self.iterate_loop_variables(orig_vars):
for key, value in zip(self.loop_variables, loop_vars):
if key not in ["name", "_filepath"]:
vars[key] = value
print("CALLED:", vars.variables)
self._process(vars)

def merge_variables(self: "Crawler", input_variables: Tuple[Dict[str, Any]]) -> VariableHolder:
variables = VariableHolder()
variables.extend(
{
"name": self.name,
"_filepath": self._filepath,
}
)
for input in input_variables:
variables.extend(remove_variable_parameters(input))
for variable in self.variables:
variables.extend(remove_variable_parameters(self.configs[variable]))
return variables

def iterate_inputs(self: "Crawler") -> Iterable[Tuple[Dict[str, Any]]]:
return product(*[self.configs.get_by_type(input) for input in self.inputs])

def iterate_loop_variables(self: "Crawler", variables: VariableHolder) -> Iterable[Tuple[str]]:
return product(*[variables[loop_variable] for loop_variable in self.loop_variables])

def _process(self: "Crawler", variables: VariableHolder) -> None:
self.process(variables)

def process(self: "Crawler", variables: VariableHolder) -> None:
raise NotImplementedError("You need to override this function")

def url(self: "Crawler", variables: VariableHolder) -> str:
return variables["url"] + self.urls[0]["url"].format(**variables.variables)


def remove_variable_parameters(params: Dict[str, Any]) -> Dict[str, Any]:
copy = deepcopy(params)
for key in ["name", "_filepath"]:
if key in copy:
del copy[key]
return copy


class RootCrawler(Crawler):
TYPE_NAME = "root-crawl"

def __init__(self: "RootCrawler", *args, **kwargs) -> None:
super(RootCrawler, self).__init__(*args, **kwargs)
self.templated_urls = [TemplatedUrl(database=self.database, **remove_default_params(url)) for url in self.urls]

def process(self: "RootCrawler", variables: VariableHolder) -> None:
for templated_url in self.templated_urls:
url = templated_url.resolve(variables)
url_id = self.database.get_url_id(url)
assert url_id is not None
self.database.set_vriables(url_id, variables.variables)
if self.should_load(url, templated_url.cache):
self.download(url)
else:
print(f"Will not download '{url}', as I've already downloaded it")

def should_load(self: "RootCrawler", url: str, cache: bool) -> bool:
if not cache:
return True
url_id = self.database.register_web_page(url)
last_crawl = self.database.latest_download(url_id)
return not last_crawl

def download(self: "RootCrawler", url: str) -> None:
url_id = self.database.get_url_id(url)
assert url_id is not None
file_path = get_path_from_url(url)
print(f"Downloading '{url}' to '{file_path}'")

response = self.browser_emulator.get(url)
full_path = self.file_writer.write(file_path, response)
self.database.register_download(
url_id,
self.name,
self._filepath,
file_path,
str(full_path),
response.status_code,
)


class TemplatedUrl:
def __init__(self: "TemplatedUrl", database: Database, name: str, url: str, cache: bool) -> None:
self.name = name
self.url = url
self.cache = cache

def resolve(self: "TemplatedUrl", variables: VariableHolder) -> str:
return variables["base_url"] + self.url.format(**variables.variables)


def sanitize_component(component: str) -> str:
return "".join(c if c.isalnum() or c in "-_." else "_" for c in component)


def get_path_from_url(url: str) -> str:
parsed = urlparse(url)

domain_path = join(*sanitize_component(parsed.netloc).split("."))

path = parsed.path if parsed.path else "/"
path_components = [sanitize_component(component) for component in path.strip("/").split("/")]

base_path = join(domain_path, *path_components)
return base_path
20 changes: 20 additions & 0 deletions brds/core/crawler/variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Any, Dict, Optional, Union


class VariableHolder:
def __init__(self: "VariableHolder", variables: Optional[Dict[str, Any]] = None) -> None:
if variables is None:
variables = {}
self.variables = variables

def __getitem__(self: "VariableHolder", key: str) -> Any:
return self.variables[key]

def __setitem__(self: "VariableHolder", key: str, value: Any) -> None:
self.variables[key] = value

def extend(self: "VariableHolder", other: Union["VariableHolder", Dict[str, Any]]) -> Any:
if isinstance(other, VariableHolder):
self.variables.update(other.variables)
else:
self.variables.update(other)
6 changes: 6 additions & 0 deletions brds/core/fs/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Type as _Type
from typing import TypeVar as _TypeVar

from pandas import read_html as _read_html
from pandas import read_parquet as _read_parquet

from ..environment import reader_folder_path as _reader_folder_path
Expand Down Expand Up @@ -66,6 +67,11 @@ def load(self: "FileReader", filename: _Optional[str] = None) -> _Any:
return _load(input_file)
if new_file_name.endswith(".parquet"):
return _read_parquet(new_file_name)
if new_file_name.endswith(".html"):
try:
return _read_html(new_file_name)
except ValueError as ve:
raise ValueError(f"Error parsing HTML from '{new_file_name}'") from ve
raise NotImplementedError(f"Do not know how to load the file `{filename}`: `{new_file_name}`")


Expand Down
Loading

0 comments on commit 7fd6d20

Please sign in to comment.