Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 49 additions & 31 deletions domaintools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

import re
import ssl
import yaml

from domaintools.constants import (
Endpoint,
OutputFormat,
ENDPOINT_TO_SOURCE_MAP,
RTTF_PRODUCTS_LIST,
RTTF_PRODUCTS_CMD_MAPPING,
SPECS_MAPPING,
)
from domaintools._version import current as version
from domaintools.results import (
Expand All @@ -22,6 +24,7 @@
Results,
FeedsResults,
)
from domaintools.decorators import api_endpoint, auto_patch_docstrings
from domaintools.filters import (
filter_by_riskscore,
filter_by_expire_date,
Expand All @@ -40,6 +43,7 @@ def delimited(items, character="|"):
return character.join(items) if type(items) in (list, tuple, set) else items


@auto_patch_docstrings
class API(object):
"""Enables interacting with the DomainTools API via Python:

Expand Down Expand Up @@ -94,8 +98,10 @@ def __init__(
self.key_sign_hash = key_sign_hash
self.default_parameters["app_name"] = app_name
self.default_parameters["app_version"] = app_version
self.specs = {}

self._build_api_url(api_url, api_port)
self._initialize_specs()

if not https:
raise Exception(
Expand All @@ -104,8 +110,25 @@ def __init__(
if proxy_url and not isinstance(proxy_url, str):
raise Exception("Proxy URL must be a string. For example: '127.0.0.1:8888'")

def _initialize_specs(self):
for spec_name, file_path in SPECS_MAPPING.items():
try:
with open(file_path, "r", encoding="utf-8") as f:
spec_content = yaml.safe_load(f)
if not spec_content:
raise ValueError("Spec file is empty or invalid.")

self.specs[spec_name] = spec_content

except Exception as e:
print(f"Error loading {file_path}: {e}")

def _get_ssl_default_context(self, verify_ssl: Union[str, bool]):
return ssl.create_default_context(cafile=verify_ssl) if isinstance(verify_ssl, str) else verify_ssl
return (
ssl.create_default_context(cafile=verify_ssl)
if isinstance(verify_ssl, str)
else verify_ssl
)

def _build_api_url(self, api_url=None, api_port=None):
"""Build the API url based on the given url and port. Defaults to `https://api.domaintools.com`"""
Expand Down Expand Up @@ -133,11 +156,18 @@ def _rate_limit(self, product):
hours = limit_hours and 3600 / float(limit_hours)
minutes = limit_minutes and 60 / float(limit_minutes)

self.limits[product["id"]] = {"interval": timedelta(seconds=minutes or hours or default)}
self.limits[product["id"]] = {
"interval": timedelta(seconds=minutes or hours or default)
}

def _results(self, product, path, cls=Results, **kwargs):
"""Returns _results for the specified API path with the specified **kwargs parameters"""
if product != "account-information" and self.rate_limit and not self.limits_set and not self.limits:
if (
product != "account-information"
and self.rate_limit
and not self.limits_set
and not self.limits
):
always_sign_api_key_previous_value = self.always_sign_api_key
header_authentication_previous_value = self.header_authentication
self._rate_limit(product)
Expand Down Expand Up @@ -181,7 +211,9 @@ def handle_api_key(self, is_rttf_product, path, parameters):
else:
raise ValueError(
"Invalid value '{0}' for 'key_sign_hash'. "
"Values available are {1}".format(self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES))
"Values available are {1}".format(
self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES)
)
)

parameters["timestamp"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
Expand All @@ -193,7 +225,9 @@ def handle_api_key(self, is_rttf_product, path, parameters):

def account_information(self, **kwargs):
"""Provides a snapshot of your accounts current API usage"""
return self._results("account-information", "/v1/account", items_path=("products",), **kwargs)
return self._results(
"account-information", "/v1/account", items_path=("products",), **kwargs
)

def available_api_calls(self):
"""Provides a list of api calls that you can use based on your account information."""
Expand Down Expand Up @@ -396,7 +430,9 @@ def reputation(self, query, include_reasons=False, **kwargs):

def reverse_ip(self, domain=None, limit=None, **kwargs):
"""Pass in a domain name."""
return self._results("reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs)
return self._results(
"reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs
)

def host_domains(self, ip=None, limit=None, **kwargs):
"""Pass in an IP address."""
Expand Down Expand Up @@ -570,8 +606,12 @@ def iris_enrich(self, *domains, **kwargs):
younger_than_date = kwargs.pop("younger_than_date", {}) or None
older_than_date = kwargs.pop("older_than_date", {}) or None
updated_after = kwargs.pop("updated_after", {}) or None
include_domains_with_missing_field = kwargs.pop("include_domains_with_missing_field", {}) or None
exclude_domains_with_missing_field = kwargs.pop("exclude_domains_with_missing_field", {}) or None
include_domains_with_missing_field = (
kwargs.pop("include_domains_with_missing_field", {}) or None
)
exclude_domains_with_missing_field = (
kwargs.pop("exclude_domains_with_missing_field", {}) or None
)

filtered_results = DTResultFilter(result_set=results).by(
[
Expand Down Expand Up @@ -624,6 +664,7 @@ def iris_enrich_cli(self, domains=None, **kwargs):
**kwargs,
)

@api_endpoint(spec_name="iris", path="/v1/iris-investigate/", methods="post")
def iris_investigate(
self,
domains=None,
Expand All @@ -641,29 +682,6 @@ def iris_investigate(
**kwargs,
):
"""Returns back a list of domains based on the provided filters.
The following filters are available beyond what is parameterized as kwargs:

- ip: Search for domains having this IP.
- email: Search for domains with this email in their data.
- email_domain: Search for domains where the email address uses this domain.
- nameserver_host: Search for domains with this nameserver.
- nameserver_domain: Search for domains with a nameserver that has this domain.
- nameserver_ip: Search for domains with a nameserver on this IP.
- registrar: Search for domains with this registrar.
- registrant: Search for domains with this registrant name.
- registrant_org: Search for domains with this registrant organization.
- mailserver_host: Search for domains with this mailserver.
- mailserver_domain: Search for domains with a mailserver that has this domain.
- mailserver_ip: Search for domains with a mailserver on this IP.
- redirect_domain: Search for domains which redirect to this domain.
- ssl_hash: Search for domains which have an SSL certificate with this hash.
- ssl_subject: Search for domains which have an SSL certificate with this subject string.
- ssl_email: Search for domains which have an SSL certificate with this email in it.
- ssl_org: Search for domains which have an SSL certificate with this organization in it.
- google_analytics: Search for domains which have this Google Analytics code.
- adsense: Search for domains which have this AdSense code.
- tld: Filter by TLD. Must be combined with another parameter.
- search_hash: Use search hash from Iris to bring back domains.

You can loop over results of your investigation as if it was a native Python list:

Expand Down
7 changes: 4 additions & 3 deletions domaintools/base_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ def _get_session_params_and_headers(self):
headers["accept"] = HEADER_ACCEPT_KEY_CSV_FORMAT

if self.api.header_authentication:
header_key_for_api_key = "X-Api-Key" if is_rttf_product else "X-API-Key"
headers[header_key_for_api_key] = self.api.key
headers["X-Api-Key"] = self.api.key

session_param_and_headers = {"parameters": parameters, "headers": headers}
return session_param_and_headers
Expand Down Expand Up @@ -342,7 +341,9 @@ def html(self):
)

def as_list(self):
return "\n".join([json.dumps(item, indent=4, separators=(",", ": ")) for item in self._items()])
return "\n".join(
[json.dumps(item, indent=4, separators=(",", ": ")) for item in self._items()]
)

def __str__(self):
return str(
Expand Down
5 changes: 5 additions & 0 deletions domaintools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ class OutputFormat(Enum):
"real-time-domain-discovery-feed-(api)": "domaindiscovery",
"real-time-domain-discovery-feed-(s3)": "domaindiscovery",
}

SPECS_MAPPING = {
"iris": "domaintools/specs/iris-openapi.yaml",
# "rttf": "domaintools/specs/feeds-openapi.yaml",
}
57 changes: 57 additions & 0 deletions domaintools/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import functools

from typing import List, Union

from domaintools.docstring_patcher import DocstringPatcher


def api_endpoint(spec_name: str, path: str, methods: Union[str, List[str]]):
"""
Decorator to tag a method as an API endpoint.

Args:
spec_name: The key for the spec in api_instance.specs
path: The API path (e.g., "/users")
methods: A single method ("get") or list of methods (["get", "post"])
that this function handles.
"""

def decorator(func):
func._api_spec_name = spec_name
func._api_path = path

# Always store the methods as a list
if isinstance(methods, str):
func._api_methods = [methods]
else:
func._api_methods = methods

@functools.wraps(func)
def wrapper(self, *args, **kwargs):
return func(*args, **kwargs)

# Copy all tags to the wrapper
wrapper._api_spec_name = func._api_spec_name
wrapper._api_path = func._api_path
wrapper._api_methods = func._api_methods
return wrapper

return decorator


def auto_patch_docstrings(cls):
original_init = cls.__init__

@functools.wraps(original_init)
def new_init(self, *args, **kwargs):
original_init(self, *args, **kwargs)
try:
# We instantiate our patcher and run it
patcher = DocstringPatcher()
patcher.patch(self)
except Exception as e:
print(f"Auto-patching failed: {e}")

cls.__init__ = new_init

return cls
Loading