Skip to content

Commit

Permalink
Introduction of a subject switch SPECIAL rule.
Browse files Browse the repository at this point in the history
This patch fixes #185.

This new special rule is triggered when a subject starts with:

- www.
- m.

Indeed, when such subjects are caught by the mechanism, it will
follow any HTTP redirects and compare the target domains against
the given one and drop the status of the given subject to INACTIVE.

Examples:

  Tested Subject: www.example.org
  Behavior:
      www.example.org/hello/world -> example.org/hello/world
  Status Changed: yes
  Status before: ACTIVE
  Status after: INACTIVE
------
  Tested Subject: www.example.org
  Behavior:
      www.example.org/hello/world -> example.com/hello/world
  Status Changed: no
  Status before: ACTIVE
  Status after: ACTIVE
------
  Tested Subject: m.example.org
  Behavior:
      m.example.org/hello/world -> example.org/world/hello
  Status Changed: yes
  Status before: ACTIVE
  Status after: INACTIVE

Contributors:
  * @spirillen
  • Loading branch information
funilrys committed Oct 25, 2022
1 parent 391e19f commit c9d6bcd
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 10 deletions.
11 changes: 9 additions & 2 deletions PyFunceble/checker/availability/base.py
Expand Up @@ -63,14 +63,17 @@
import PyFunceble.factory
import PyFunceble.storage
from PyFunceble.checker.availability.extras.base import ExtraRuleHandlerBase
from PyFunceble.checker.availability.extras.parked import ParkedRulesHandler
from PyFunceble.checker.availability.extras.rules import ExtraRulesHandler
from PyFunceble.checker.availability.extras.subject_switch import (
SubjectSwitchRulesHandler,
)
from PyFunceble.checker.availability.params import AvailabilityCheckerParams
from PyFunceble.checker.availability.status import AvailabilityCheckerStatus
from PyFunceble.checker.base import CheckerBase
from PyFunceble.checker.syntax.domain import DomainSyntaxChecker
from PyFunceble.checker.syntax.ip import IPSyntaxChecker
from PyFunceble.checker.syntax.url import URLSyntaxChecker
from PyFunceble.converter.url2netloc import Url2Netloc
from PyFunceble.helpers.regex import RegexHelper
from PyFunceble.query.dns.query_tool import DNSQueryTool
from PyFunceble.query.http_status_code import HTTPStatusCode
Expand Down Expand Up @@ -129,6 +132,7 @@ class AvailabilityCheckerBase(CheckerBase):
ip_syntax_checker: Optional[IPSyntaxChecker] = None
url_syntax_checker: Optional[URLSyntaxChecker] = None
extra_rules_handlers: Optional[List[ExtraRuleHandlerBase]] = None
url2netloc: Optional[Url2Netloc] = None

_use_extra_rules: bool = False
_use_whois_lookup: bool = False
Expand Down Expand Up @@ -165,7 +169,7 @@ def __init__(
self.ip_syntax_checker = IPSyntaxChecker()
self.url_syntax_checker = URLSyntaxChecker()
# WARNING: Put the aggressive one first!
self.extra_rules_handlers = [ExtraRulesHandler()]
self.extra_rules_handlers = [SubjectSwitchRulesHandler(), ExtraRulesHandler()]
self.db_session = db_session

self.params = AvailabilityCheckerParams()
Expand Down Expand Up @@ -507,6 +511,9 @@ def subject_propagator(self) -> "CheckerBase":

self.status.subject = self.subject
self.status.idna_subject = self.idna_subject
self.status.netloc = self.url2netloc.set_data_to_convert(
self.idna_subject
).get_converted()
self.status.status = None

self.query_syntax_checker()
Expand Down
23 changes: 23 additions & 0 deletions PyFunceble/checker/availability/extras/base.py
Expand Up @@ -53,6 +53,8 @@
import functools
from typing import Callable, Optional

import requests

import PyFunceble.factory
from PyFunceble.checker.availability.status import AvailabilityCheckerStatus

Expand All @@ -68,6 +70,8 @@ class ExtraRuleHandlerBase:
"""

_status: Optional[AvailabilityCheckerStatus] = None
req: Optional[requests.Response] = None
req_url: Optional[str] = None

def __init__(self, status: Optional[AvailabilityCheckerStatus] = None) -> None:
if status is not None:
Expand Down Expand Up @@ -189,6 +193,25 @@ def set_status(self, value: AvailabilityCheckerStatus) -> "ExtraRuleHandlerBase"

return self

def do_request(self, *, allow_redirects: bool = True) -> requests.Response:
"""
Do a request and store its response into the `req` attribute.
:param bool allow_redirects:
Whether we shoold follow the redirection - or not.
"""

if any(self.status.idna_subject.startswith(x) for x in ("http:", "https:")):
self.req_url = url = self.status.idna_subject
else:
self.req_url = url = f"http://{self.status.idna_subject}:80"

self.req = PyFunceble.factory.Requester.get(
url, allow_redirects=allow_redirects
)

return self

def start(self) -> "ExtraRuleHandlerBase":
"""
Starts the gathering process.
Expand Down
2 changes: 1 addition & 1 deletion PyFunceble/checker/availability/extras/parked.py
Expand Up @@ -11,7 +11,7 @@
██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
Provides the domains availability checker.
Provides the extra rules handler based on the "parked status" of a subject.
Author:
Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
Expand Down
153 changes: 153 additions & 0 deletions PyFunceble/checker/availability/extras/subject_switch.py
@@ -0,0 +1,153 @@
"""
The tool to check the availability or syntax of domain, IP or URL.
::
██████╗ ██╗ ██╗███████╗██╗ ██╗███╗ ██╗ ██████╗███████╗██████╗ ██╗ ███████╗
██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██║████╗ ██║██╔════╝██╔════╝██╔══██╗██║ ██╔════╝
██████╔╝ ╚████╔╝ █████╗ ██║ ██║██╔██╗ ██║██║ █████╗ ██████╔╝██║ █████╗
██╔═══╝ ╚██╔╝ ██╔══╝ ██║ ██║██║╚██╗██║██║ ██╔══╝ ██╔══██╗██║ ██╔══╝
██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
Provides the extra rules handler based on the "switching" domain behavior of some
subjects.
Author:
Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
Special thanks:
https://pyfunceble.github.io/#/special-thanks
Contributors:
https://pyfunceble.github.io/#/contributors
Project link:
https://github.com/funilrys/PyFunceble
Project documentation:
https://pyfunceble.readthedocs.io/en/dev/
Project homepage:
https://pyfunceble.github.io/
License:
::
Copyright 2017, 2018, 2019, 2020, 2022 Nissar Chababy
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import Optional

import PyFunceble.facility
from PyFunceble.checker.availability.extras.base import ExtraRuleHandlerBase
from PyFunceble.checker.availability.status import AvailabilityCheckerStatus
from PyFunceble.converter.url2netloc import Url2Netloc


class SubjectSwitchRulesHandler(ExtraRuleHandlerBase):
"""
Provides our very own "subject switch" handler.
This handler will be used to detects a subject switch behavior from a server.
In other words, this handler should be able to detect the following scenario
and switch the original subject to :code:`INACTIVE`.
1. https://www.example.org/hello/world -> https://example.org/hello/world
2. https://m.example.org/hello/world -> https://example.org/hello/world
"""

url2netloc: Optional[Url2Netloc] = None

def __init__(self, status: Optional[AvailabilityCheckerStatus] = None) -> None:
self.url2netloc = Url2Netloc()
super().__init__(status)

def _switch_down_by_history(self) -> "SubjectSwitchRulesHandler":
"""
Tries to switch the status to :code:`INACTIVE` by following hte history.
"""

variations = set(
[
self.status.netloc.replace("www.", "", 1),
self.status.netloc.replace("m.", "", 1),
]
)
# The current netloc should be included in the variations
variations.remove(self.status.netloc)

start_path = (
self.url2netloc.set_data_to_convert(self.req_url)
.parse_url()
.parsed_url.path
)

for response in self.req.history:
if (
not str(response.status_code).startswith("3")
or "location" not in response.headers
):
continue

redirect_url = response.headers["location"]

netloc = self.url2netloc.set_data_to_convert(redirect_url).get_converted()
local_path = self.url2netloc.parsed_url.path

if netloc == self.status.idna_subject and netloc not in variations:
continue

if not start_path:
if local_path != "/":
continue
elif start_path != local_path:
continue

self.switch_to_down()
break

return self

@ExtraRuleHandlerBase.ensure_status_is_given
@ExtraRuleHandlerBase.setup_status_before
@ExtraRuleHandlerBase.setup_status_after
def start(self) -> "SubjectSwitchRulesHandler":
"""
Process the check and handling of the current subject.
"""

PyFunceble.facility.Logger.info(
"Started to check %r against our subject switcher rules.",
self.status.idna_subject,
)

try:
if any(self.status.netloc.startswith(x) for x in ("www.", "m.")):
self.do_request()

if not self.status.status_after_extra_rules:
self._switch_down_by_history()
except PyFunceble.factory.Requester.exceptions.RequestException:
pass

PyFunceble.facility.Logger.info(
"Finished to check %r against our subject switcher rules.",
self.status.idna_subject,
)

return self
4 changes: 4 additions & 0 deletions PyFunceble/checker/availability/url.py
Expand Up @@ -119,6 +119,10 @@ def subject_propagator(self) -> "URLAvailabilityChecker":

self.status.subject = self.subject
self.status.idna_subject = self.idna_subject
self.status.netloc = self.url2netloc.set_data_to_convert(
self.idna_subject
).get_converted()

self.status.status = None

self.query_syntax_checker()
Expand Down
4 changes: 4 additions & 0 deletions PyFunceble/checker/base.py
Expand Up @@ -61,6 +61,7 @@
import PyFunceble.storage
from PyFunceble.checker.params_base import CheckerParamsBase
from PyFunceble.checker.status_base import CheckerStatusBase
from PyFunceble.converter.url2netloc import Url2Netloc
from PyFunceble.query.collection import CollectionQueryTool


Expand All @@ -86,6 +87,8 @@ class CheckerBase:
_subject: Optional[str] = None
_idna_subject: Optional[str] = None

url2netloc: Optional[Url2Netloc] = None

db_session: Optional[Session] = None
collection_query_tool: Optional[CollectionQueryTool] = None

Expand All @@ -101,6 +104,7 @@ def __init__(
use_collection: Optional[bool] = None,
) -> None:
self.collection_query_tool = CollectionQueryTool()
self.url2netloc = Url2Netloc()

if self.params is None:
self.params = CheckerParamsBase()
Expand Down
3 changes: 3 additions & 0 deletions PyFunceble/checker/reputation/base.py
Expand Up @@ -138,6 +138,9 @@ def subject_propagator(self) -> "CheckerBase":

self.status.subject = self.subject
self.status.idna_subject = self.idna_subject
self.status.netloc = self.url2netloc.set_data_to_convert(
self.idna_subject
).get_converted()

self.query_syntax_checker()

Expand Down
1 change: 1 addition & 0 deletions PyFunceble/checker/status_base.py
Expand Up @@ -67,6 +67,7 @@ class CheckerStatusBase:

subject: Optional[str] = None
idna_subject: Optional[str] = None
netloc: Optional[str] = None

status: Optional[str] = None
status_source: Optional[str] = None
Expand Down
3 changes: 3 additions & 0 deletions PyFunceble/checker/syntax/base.py
Expand Up @@ -98,6 +98,9 @@ def subject_propagator(self) -> "CheckerBase":

self.status.subject = self.subject
self.status.idna_subject = self.idna_subject
self.status.netloc = self.url2netloc.set_data_to_convert(
self.idna_subject
).get_converted()

return self

Expand Down
3 changes: 3 additions & 0 deletions PyFunceble/checker/syntax/domain.py
Expand Up @@ -98,6 +98,9 @@ def subject_propagator(self) -> "DomainSyntaxChecker":

self.status.subject = self.subject
self.status.idna_subject = self.idna_subject
self.status.netloc = self.url2netloc.set_data_to_convert(
self.idna_subject
).get_converted()

return self

Expand Down
3 changes: 3 additions & 0 deletions PyFunceble/checker/syntax/ip.py
Expand Up @@ -91,6 +91,9 @@ def subject_propagator(self) -> "IPSyntaxChecker":

self.status.subject = self.subject
self.status.idna_subject = self.idna_subject
self.status.netloc = self.url2netloc.set_data_to_convert(
self.idna_subject
).get_converted()

return self

Expand Down
26 changes: 20 additions & 6 deletions PyFunceble/converter/url2netloc.py
Expand Up @@ -51,7 +51,7 @@
"""

import urllib.parse
from typing import Any
from typing import Any, Optional

from PyFunceble.converter.base import ConverterBase

Expand All @@ -62,6 +62,11 @@ class Url2Netloc(ConverterBase):
of a given URL.
"""

parsed_url: Optional[urllib.parse.ParseResult] = None
"""
Expose the parsed URL.
"""

@ConverterBase.data_to_convert.setter
def data_to_convert(self, value: Any) -> None:
"""
Expand All @@ -82,17 +87,26 @@ def data_to_convert(self, value: Any) -> None:
# pylint: disable=no-member
super(Url2Netloc, self.__class__).data_to_convert.fset(self, value)

def parse_url(self) -> "Url2Netloc":
"""
Parses the URL.
"""

if self.data_to_convert:
self.parsed_url = urllib.parse.urlparse(self.data_to_convert)
return self

def get_converted(self) -> str:
"""
Provides the converted data (after conversion)
"""

parsed_url = urllib.parse.urlparse(self.data_to_convert)
self.parse_url()

if not parsed_url.netloc and parsed_url.path:
netloc = parsed_url.path
elif parsed_url.netloc:
netloc = parsed_url.netloc
if not self.parsed_url.netloc and self.parsed_url.path:
netloc = self.parsed_url.path
elif self.parsed_url.netloc:
netloc = self.parsed_url.netloc
else: # pragma: no cover ## Safety
netloc = self.data_to_convert

Expand Down

0 comments on commit c9d6bcd

Please sign in to comment.