Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/
Empty file modified checks.sh
100755 → 100644
Empty file.
18 changes: 18 additions & 0 deletions fastcrawler/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ def __init__(self, model):
super().__init__(self.message)


class TaskNotFound(BaseModelError):
def __init__(self, task_name):
super().__init__(
f"The Task with name={task_name} has not been found",
"\nPlease check your input and be sure the task name is correct",
)


class NoCrawlerFound(BaseModelError):
def __init__(self):
super().__init__(
"No task has been registered in the application."
"\nPlease make sure that you've assigned the crawlers to the application"
"so the application is aware of the crawlers."
"\nThis may also raise if you have overridden the library's startup built in method"
)


class ProcessorNotSupported(BaseModelError):
def __init__(self, model):
self.model = model
Expand Down
2 changes: 0 additions & 2 deletions fastcrawler/parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
class ParserProtocol(Protocol):
def __init__(self, scraped_data: Any):
"""Initilize the parser with the given data (html/json/etc)"""
...

def parse(self, model: Any) -> Any:
"""
Parse the saved data, with given model, which should be a pydantic model
imported from fastcrawler library
"""
...
2 changes: 1 addition & 1 deletion fastcrawler/parsers/selectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(

def __repr__(self):
"""Represents a selector for debugging purposes"""
return f"Field(type={self.__class__.__name__} extract={self.extract}, " f"many={self.many}, query={self.query})"
return f"Field(type={self.__class__.__name__} extract={self.extract}, many={self.many}, query={self.query})"

def resolve(self, scraped_data, model):
"""Must be implemented by outer classes.
Expand Down
3 changes: 2 additions & 1 deletion fastcrawler/parsers/selectors/css.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from fastcrawler.parsers.utils import _UNSET

from ..processors.base import ProcessorInterface
from .base import BaseSelector


class _CSSField:
class _CSSField(BaseSelector):
"""
CSSSelectorField represents a field that can be retrieved from a given HTML
document using CSS selectors.
Expand Down
4 changes: 3 additions & 1 deletion fastcrawler/parsers/selectors/regex.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from fastcrawler.parsers.pydantic import BaseModelType
from fastcrawler.parsers.utils import _UNSET

from .base import BaseSelector

class _RegexField:

class _RegexField(BaseSelector):
"""
RegexField represents a field that can be retrieved from a given HTML
document using Regex.
Expand Down
3 changes: 2 additions & 1 deletion fastcrawler/parsers/selectors/xpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from fastcrawler.parsers.utils import _UNSET

from ..processors.base import ProcessorInterface
from .base import BaseSelector


class _XPATHField:
class _XPATHField(BaseSelector):
"""
XPATHField represents a field that can be retrieved from a given HTML
document using XPath.
Expand Down
90 changes: 90 additions & 0 deletions fastcrawler/schedule/adopter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import Callable

from rocketry import Rocketry
from rocketry.conditions.api import cron

from fastcrawler.exceptions import TaskNotFound

from .schema import Task


class RocketryApplication:
def __init__(self, *args, **kwargs):
self.task_lib: Rocketry = Rocketry(*args, **kwargs)

async def serve(self, *args, **kwargs): # pragma: no cover
"""Proto to serve with uvicorn"""
await self.start_up()
return await self.task_lib.serve(*args, **kwargs)

async def get_all_tasks(self) -> set[Task]:
return self.task_lib.session.tasks

async def add_task(self, task_func: Callable, settings: Task) -> None:
"""
...
"""
self.task_lib.task(**dict(settings))(task_func)
return None

async def start_up(self) -> None:
"""
Run Startup Event
"""

async def shut_down(self) -> None:
self.task_lib.session.shut_down()
return None


class RocketryManager:
def __init__(self, app: RocketryApplication):
self.app = app

async def all(self) -> set[Task]:
"""
Return all tasks from internal
"""
return await self.app.get_all_tasks()

async def add_task(self, task_func: Callable, settings: Task) -> None:
"""
Add tasks within internal python API
"""
await self.app.add_task(task_func, settings)
return None

async def change_task_schedule(
self,
task_name: str,
schedule: str,
) -> None:
"""
Reschedule a task
schedule:
- can be string
`every 2 seconds`
- can be cron
`*/2 * * * *`
"""
for task in await self.app.get_all_tasks():
if task.name == task_name:
if schedule.count(" ") == 4:
task.start_cond = cron(schedule)
else:
task.start_cond = schedule
return None
raise TaskNotFound(task_name)

async def toggle_task(self, task_name: str) -> None:
"""
Disables or enable one task
"""
for task in await self.app.get_all_tasks():
if task.name == task_name:
if task.disabled:
task.disabled = False
else:
task.disabled = True
return None
raise TaskNotFound(task_name)
61 changes: 61 additions & 0 deletions fastcrawler/schedule/proto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Callable, Protocol

from .schema import Task


class TaskApplicationProto(Protocol): # pragma: no cover
task_lib: Callable

def __init__(self, *args, **kwargs):
"""Initialize task application"""

async def serve(self, *args, **kwargs):
"""Proto to serve with Uvicorn"""

async def get_all_tasks(self) -> set[Task]:
"""Returns all tasks that exists in Fast Crawler"""

async def add_task(self, *args, **kwargs) -> None:
"""Dynamically add a task to fast crawler"""

async def start_up(self) -> None:
"""Manage start up actvity"""

async def shut_down(self) -> None:
"""Manage shut down activity"""


class TaskControllerProto(Protocol): # pragma: no cover
app: TaskApplicationProto

def __init__(self, app: TaskApplicationProto):
"""Initialize task application

Args:
app (TaskProcessor): _description_
"""

async def all(self) -> list[Task]:
"""
Return all tasks from internal
"""

async def add_task(self, task_func: Callable, settings: Task) -> None:
"""
Add tasks within internal python API
"""

async def change_task_schedule(self, task_name: str, schedule: str) -> None:
"""
Reschedule a task
schedule:
- can be string
`every 2 seconds`
- can be cron
`*/2 * * * *`
"""

async def toggle_task(self, task_name: str) -> None:
"""
Disables or enable one task
"""
101 changes: 101 additions & 0 deletions fastcrawler/schedule/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import datetime
from typing import Literal

from pydantic import BaseModel, Field # pylint: disable=no-name-in-module

from .utilties import BaseCondition


class Task(BaseModel):
"""Base class for Tasks.

A task can be a function, command or other procedure that
does a specific thing. A task can be parametrized by supplying


Parameters
----------
name : str, optional
Name of the task. Ideally, all tasks
should have unique name. If None, the
return value of Task.get_default_name()
is used instead.
description : str, optional
Description of the task. This is purely
for task documentation purpose.
start_cond : BaseCondition, optional
Condition that when True the task
is to be started, by default AlwaysFalse()
end_cond : BaseCondition, optional
Condition that when True the task
will be terminated. Only works for for
tasks with execution='process' or 'thread'
if thread termination is implemented in
the task, by default AlwaysFalse()
execution : str, {'main', 'thread', 'process'}, default='process'
How the task is executed. Allowed values
'main' (run on main thread & process),
'thread' (run on another thread) and
'process' (run on another process).
disabled : bool
If True, the task is not allowed to be run
regardless of the start_cond,
by default False
force_run : bool
If True, the task will be run once
regardless of the start_cond,
by default True
priority : int, optional
Priority of the task. Higher priority
tasks are first inspected whether they
can be executed. Can be any numeric value.
Setup tasks are recommended to have priority
>= 40 if they require loaded tasks,
>= 50 if they require loaded extensions.
By default 0
timeout : str, int, timedelta, optional
If the task has not run in given timeout
the task will be terminated. Only applicable
for tasks with execution='process' or
with execution='thread'.

Examples
--------
Minimum example:

>>> from fastcrawler.schedule.schema import Task
>>> class MyTask(Task):
... def execute(self):
... ... # What the task does.
... return ...

"""

name: str | None = Field(description="Name of the task. Must be unique")
description: str | None = Field(description="Description of the task for documentation")
logger_name: str | None = Field(
description="Logger name to be used in logging the task record"
)
execution: Literal["main", "async", "thread", "process"] | None = None
priority: int = 0
disabled: bool = False
force_run: bool = False
status: Literal["run", "fail", "success", "terminate", "inaction", "crash"] | None = Field(
description="Latest status of the task", default=None
)
timeout: datetime.timedelta | None = None
start_cond: BaseCondition | None = None
end_cond: BaseCondition | None = None

_last_run: float | None = None
_last_success: float | None = None
_last_fail: float | None = None
_last_terminate: float | None = None
_last_inaction: float | None = None
_last_crash: float | None = None

class Config:
arbitrary_types_allowed = True
protected_namespaces = ()
underscore_attrs_are_private = True
validate_assignment = True
53 changes: 53 additions & 0 deletions fastcrawler/schedule/utilties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from rocketry.core import BaseCondition as _BaseCondition


class BaseCondition(_BaseCondition): # pylint: disable=abstract-method
"""A condition is a thing/occurence that should happen in
order to something happen.

Conditions are used to determine whether a task can be started,
a task should be terminated or the scheduler should shut
down. Conditions are either true or false.

A condition could answer for any of the following questions:
- Current time is as specified (ie. Monday afternoon).
- A given task has already run.
- The machine has at least a given amount of RAM.
- A specific file exists.

Each condition should have the method ``__bool__`` specified
as minimum. This method should return ``True`` or ``False``
depending on whether the condition holds or does not hold.

Examples
--------

Minimum example:

>>> from rocketry.core import BaseCondition
>>> class MyCondition(BaseCondition):
... def __bool__(self):
... ... # Code that defines state either
... return True

Complicated example with parser:

>>> import os, re
>>> class IsFooBar(BaseCondition):
... __parsers__ = {
... re.compile(r"is foo '(?P<outcome>.+)'"): "__init__"
... }
...
... def __init__(self, outcome):
... self.outcome = outcome
...
... def __bool__(self):
... return self.outcome == "bar"
...
... def __repr__(self):
... return f"IsFooBar('{self.outcome}')"
...
>>> from rocketry.parse import parse_condition
>>> parse_condition("is foo 'bar'")
IsFooBar('bar')
"""
Loading