Skip to content

Commit

Permalink
Merge 8835e2e into 6973a76
Browse files Browse the repository at this point in the history
  • Loading branch information
flyflyinit committed Mar 10, 2023
2 parents 6973a76 + 8835e2e commit 21310e1
Show file tree
Hide file tree
Showing 26 changed files with 3,249 additions and 0 deletions.
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Expand Up @@ -35,6 +35,7 @@ lexicon/providers/easydns.py @analogj
lexicon/providers/easyname.py @astzweig @rqelibari
lexicon/providers/euserv.py @mschoettle
lexicon/providers/exoscale.py @greut @brutasse
lexicon/providers/flexibleengine.py @flyflyinit
lexicon/providers/gandi.py @hrunting @adferrand @tristan-weil
lexicon/providers/gehirn.py @chibiegg
lexicon/providers/glesys.py @hecd
Expand Down
4 changes: 4 additions & 0 deletions docs/providers/flexibleengine.rst
@@ -0,0 +1,4 @@
flexibleengine
* ``auth_token`` Specify token for authentication (global api key or api token)

* ``zone_id`` Specify the zone id (it's mandatory, by either providing 'domain name' and it will be resolved by lexicon to 'zone id', or providing straight 'zone id')
3 changes: 3 additions & 0 deletions docs/providers_options.rst
Expand Up @@ -77,6 +77,9 @@ List of options
.. _exoscale:
.. include:: providers/exoscale.rst

.. _flexibleengine:
.. include:: providers/flexibleengine.rst

.. _gandi:
.. include:: providers/gandi.rst

Expand Down
226 changes: 226 additions & 0 deletions lexicon/providers/flexibleengine.py
@@ -0,0 +1,226 @@
"""Module provider for Flexible Engine Cloud"""
import json
import logging

import requests
from lexicon.exceptions import AuthenticationError
from lexicon.providers.base import Provider as BaseProvider

LOGGER = logging.getLogger(__name__)
NAMESERVER_DOMAINS = ["orange-business.com"]


def provider_parser(subparser):
"""Configure provider parser for Flexible Engine Cloud"""
subparser.add_argument(
"--auth-token", help="specify token for authentication")
subparser.add_argument(
"--zone-id",
help="specify the zone id",
)


class Provider(BaseProvider):
"""Provider class for Flexible Engine Cloud"""

def __init__(self, config):
super(Provider, self).__init__(config)
self.api_endpoint = "https://dns.prod-cloud-ocb.orange-business.com/v2"
self.domain_id = None

def _authenticate(self):
zone_id = self._get_provider_option("zone_id")
payload = self._get("/zones", {"name": self.domain})

if not zone_id:
if not payload["zones"]:
raise AuthenticationError("No domain found")
if len(payload["zones"]) > 1:
raise AuthenticationError(
"Too many domains found. This should not happen"
)
self.domain_id = payload["zones"][0]["id"]
self.domain = payload["zones"][0]["name"].rstrip('.')
else:
self.domain_id = zone_id
self.domain = payload["zones"][0]["name"].rstrip('.')

def _create_record(self, rtype, name, content):
# put string in array
tmp = content
content = []
content.append(tmp)

record = {
"type": rtype,
"name": self._full_name(name),
"records": content
}

if self._get_lexicon_option("ttl"):
record["ttl"] = self._get_lexicon_option("ttl")

if rtype == "TXT":
# Convert "String" to "\"STRING\""
tmp = []
tmp.append('\"' + record["records"][0] + '\"')
record["records"] = tmp
try:
self._post(f"/zones/{self.domain_id}/recordsets", record)
except requests.exceptions.HTTPError as err:
already_exists = next(
(
True
for error in err.response.json()
if err.response.json()['code'] == 'DNS.0312'
),
False,
)
if not already_exists:
raise

LOGGER.debug("create_record: %s", True)
return True

# List all records. Return an empty list if no records found
# type, name and content are used to filter records.
# If possible filter during the query, otherwise filter after response is received.

def _list_records(self, rtype=None, name=None, content=None):
url = f"/zones/{self.domain_id}/recordsets"
records = []
payload = {}

# Convert it to Array if it is not converted yet.
if isinstance(content, str):
tmp = content
content = []
content.append(tmp)

# Iterating recordsets
next_url = url
while next_url is not None:
payload = self._get(next_url)
if (
"links" in payload
and "next" in payload["links"]
):
next_url = payload["links"]["next"]
else:
next_url = None

for record in payload["recordsets"]:
processed_record = {
"type": record["type"],
"name": f"{record['name']}",
"ttl": record["ttl"],
"content": record["records"],
"id": record["id"],
}
records.append(processed_record)

if rtype:
records = [record for record in records if record["type"] == rtype]

if name:
records = [
record for record in records if record["name"].rstrip('.') == self._full_name(name)
]

if content:
if len(content) > 1:
records = [
record
for record in records
if record["content"] == content
]

LOGGER.debug("list_records: %s", records)
return records

# update a record.

def _update_record(self, identifier, rtype=None, name=None, content=None):
if identifier is None:
records = self._list_records(rtype, name)
if len(records) == 1:
identifier = records[0]["id"]
elif len(records) < 1:
raise Exception(
"No records found matching type and name - won't update"
)
else:
raise Exception(
"Multiple records found matching type and name - won't update"
)

data = {}

if name:
data["name"] = name

if rtype:
data["type"] = rtype

if self._get_lexicon_option("ttl"):
data["ttl"] = self._get_lexicon_option("ttl")

if content:
if rtype == "TXT":
content = '\"' + content + '\"'
tmp = content
content = []
content.append(tmp)
data["records"] = content

self._put(f"/zones/{self.domain_id}/recordsets/{identifier}", data)
LOGGER.debug("update_record: %s", True)
return True

# Delete an existing record.
# If record does not exist, do nothing.
def _delete_record(self, identifier=None, rtype=None, name=None, content=None):
delete_record_id = []

tmp = content
content = []
content.append(tmp)

if not identifier:
records = self._list_records(rtype, name, content)
delete_record_id = [record["id"] for record in records]
else:
delete_record_id.append(identifier)

LOGGER.debug("delete_records: %s", delete_record_id)
for record_id in delete_record_id:
self._delete(f"/zones/{self.domain_id}/recordsets/{record_id}")

LOGGER.debug("delete_record: %s", True)
return True

# API requests
def _request(self, action="GET", url="/", data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}
default_headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"X-Auth-Token": f"{self._get_provider_option('auth_token')}",
}
if not url.startswith(self.api_endpoint):
url = self.api_endpoint + url

response = requests.request(
action,
url,
params=query_params,
data=json.dumps(data),
headers=default_headers,
)
response.raise_for_status()
if action == "DELETE":
return ""
return response.json()
57 changes: 57 additions & 0 deletions lexicon/tests/providers/test_flexibleengine.py
@@ -0,0 +1,57 @@
"""Integration tests for FlexibleEngine Cloud"""
from unittest import TestCase

import pytest

from lexicon.tests.providers.integration_tests import IntegrationTestsV2

# Hook into testing framework by inheriting unittest.TestCase and reuse
# the tests which *each and every* implementation of the interface must
# pass, by inheritance from define_tests.TheTests


class FlexibleEngineProviderTests(TestCase, IntegrationTestsV2):
"""TestCase for FlexibleEngine"""

provider_name = "flexibleengine"
domain = "flexibleengine.test"

def _filter_headers(self):
return ["X-Auth-Token"]

def _test_fallback_fn(self):
return (
lambda x: "placeholder_" + x
if x not in ("zone_id")
else ""
)

@pytest.mark.skip(reason="Content returned is an Array not a String")
def test_provider_when_calling_list_records_with_fqdn_name_filter_should_return_record(
self,
):
return

@pytest.mark.skip(reason="Content returned is an Array not a String")
def test_provider_when_calling_list_records_with_full_name_filter_should_return_record(
self,
):
return

@pytest.mark.skip(reason="Content returned is an Array not a String")
def test_provider_when_calling_list_records_with_name_filter_should_return_record(
self,
):
return

@pytest.mark.skip(reason="Creating Multiple records matching type and name is not accepted by FlexibleEngine DNS Provider")
def test_provider_when_calling_list_records_should_handle_record_sets(
self,
):
return

@pytest.mark.skip(reason="Creating Multiple records matching type and name is not accepted by FlexibleEngine DNS Provider")
def test_provider_when_calling_delete_record_with_record_set_by_content_should_leave_others_untouched(
self,
):
return
@@ -0,0 +1,49 @@
interactions:
- request:
body: '{}'
headers:
Accept:
- application/json
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '2'
Content-Type:
- application/json
User-Agent:
- python-requests/2.28.2
method: GET
uri: https://dns.prod-cloud-ocb.orange-business.com/v2/zones?name=flexibleengine.test
response:
body:
string: '{"zones":[{"id":"ff8080827274f00e018563dcecb527d3","name":"flexibleengine.test.","description":"This
zone is for test purposes only.","email":"hostmaster@example.com","ttl":300,"serial":1,"masters":[],"status":"ACTIVE","pool_id":"ff80808261568dfe016156c446410001","project_id":"5374cb5cb9a1450f947a9ff0b60b0b5f","zone_type":"public","created_at":"2022-12-30T16:28:21.021","updated_at":"2022-12-30T16:28:42.022","record_num":14,"links":{"self":"https://dns.prod-cloud-ocb.orange-business.com/v2/zones/ff8080827274f00e018563dcecb527d3"}}],"links":{"self":"https://dns.prod-cloud-ocb.orange-business.com/v2/zones?name=flexibleengine.test"},"metadata":{"total_count":1}}'
headers:
Connection:
- keep-alive
Content-Length:
- '666'
Content-Type:
- application/json
Date:
- Fri, 10 Mar 2023 11:09:34 GMT
Server:
- api-gateway
Strict-Transport-Security:
- max-age=31536000; includeSubdomains;
X-Content-Type-Options:
- nosniff
X-Download-Options:
- noopen
X-Frame-Options:
- SAMEORIGIN
X-Request-Id:
- 34c045812d34ef0e9961ba105f690080
X-XSS-Protection:
- 1; mode=block;
status:
code: 200
message: OK
version: 1

0 comments on commit 21310e1

Please sign in to comment.