Skip to content

Commit

Permalink
updated handlers to refernce python ipaddress instead of str IP repre…
Browse files Browse the repository at this point in the history
…sentation
  • Loading branch information
AdrianFretwell committed Apr 13, 2024
1 parent 4d35d93 commit 5eb6649
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 79 deletions.
23 changes: 8 additions & 15 deletions httapihandler/httapihandler.py
Expand Up @@ -32,6 +32,7 @@
from lxml import etree
from .models import HttApiSession
from tenants.pbxsettings import PbxSettings
from pbx.pbxipaddresscheck import loopback_default


class HttApiHandler():
Expand Down Expand Up @@ -160,22 +161,14 @@ def get_allowed_addresses(self):
cache_key = 'httapihandler:allowed_addresses'
aa = cache.get(cache_key)
if aa:
allowed_addresses = aa.split(',')
else:
allowed_addresses = PbxSettings().default_settings('httapihandler', 'allowed_address', 'array')
if allowed_addresses:
aa = ','.join(allowed_addresses)
else:
aa = '127.0.0.1'
cache.set(cache_key, aa)
return allowed_addresses

def address_allowed(self, ip_address):
allowed_addresses = self.get_allowed_addresses()
if ip_address in allowed_addresses:
return True
return aa
aa = PbxSettings().default_settings('httapihandler', 'allowed_address', 'array')
if aa:
aa = list(aa)
else:
return False
aa = loopback_default
cache.set(cache_key, aa)
return aa

def get_first_call(self):
if self.exiting:
Expand Down
4 changes: 3 additions & 1 deletion httapihandler/views.py
Expand Up @@ -32,6 +32,7 @@
from rest_framework import permissions
from django_filters.rest_framework import DjangoFilterBackend
from django.views.decorators.csrf import csrf_exempt
from pbx.pbxipaddresscheck import pbx_ip_address_check

from pbx.restpermissions import (
AdminApiAccessPermission
Expand Down Expand Up @@ -79,7 +80,8 @@ class HttApiSessionViewSet(viewsets.ModelViewSet):
def processhttapi(request, httapihf):
if not request.method == 'POST':
return HttpResponseNotFound()
if not httapihf.address_allowed(request.META['REMOTE_ADDR']):
allowed_addresses = httapihf.get_allowed_addresses()
if not pbx_ip_address_check(request, allowed_addresses):
return HttpResponseNotFound()
return HttpResponse(httapihf.htt_get_data(), content_type='text/xml')

Expand Down
46 changes: 46 additions & 0 deletions pbx/pbxipaddresscheck.py
@@ -0,0 +1,46 @@
#
# DjangoPBX
#
# MIT License
#
# Copyright (c) 2016 - 2024 Adrian Fretwell <adrian@djangopbx.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Contributor(s):
# Adrian Fretwell <adrian@djangopbx.com>
#

import ipaddress
from python_ipware import IpWare

loopback_default = ['127.0.0.1/32', '::1/128']

def pbx_ip_address_check(request, allowed_addresses):
ipw = IpWare()
ip, trusted_route = ipw.get_client_ip(request.META)
for ip_net in allowed_addresses:
try:
ipa = ipaddress.ip_network(ip_net)
except:
ipa = ()
continue
if ip in ipa:
return True
return False
21 changes: 11 additions & 10 deletions recordings/views.py
Expand Up @@ -35,6 +35,7 @@
from django.core.cache import cache
from tenants.pbxsettings import PbxSettings
from pbx.commonfunctions import DomainUtils
from pbx.pbxipaddresscheck import pbx_ip_address_check, loopback_default

from pbx.restpermissions import (
AdminApiAccessPermission
Expand Down Expand Up @@ -96,24 +97,24 @@ def perform_destroy(self, instance):
instance.delete()


def rec_check_address(addr):
def rec_check_address(request):
aa_cache_key = 'recordings:allowed_addresses'
aa = cache.get(aa_cache_key)
if aa:
allowed_addresses = aa.split(',')
else:
allowed_addresses = PbxSettings().default_settings('recordings', 'allowed_address', 'array')
aa = ','.join(allowed_addresses)
if not aa:
aa = PbxSettings().default_settings('recordings', 'allowed_address', 'array')
if aa:
aa = list(aa)
else:
aa = loopback_default
cache.set(aa_cache_key, aa)

if addr not in allowed_addresses:
if not pbx_ip_address_check(request, aa):
return False
return True


@csrf_exempt
def rec_import(request, domain, recfile):
if not rec_check_address(request.META['REMOTE_ADDR']):
if not rec_check_address(request):
return HttpResponseNotFound()
try:
d = Domain.objects.get(name=domain)
Expand All @@ -134,7 +135,7 @@ def rec_import(request, domain, recfile):

@csrf_exempt
def call_rec_import(request, domain, year, month, day, recfile):
if not rec_check_address(request.META['REMOTE_ADDR']):
if not rec_check_address(request):
return HttpResponseNotFound()
try:
d = Domain.objects.get(name=domain)
Expand Down
16 changes: 8 additions & 8 deletions tenants/fixtures/defaultsetting.json
Expand Up @@ -7,7 +7,7 @@
"category": "httapihandler",
"subcategory": "allowed_address",
"value_type": "array",
"value": "127.0.0.1",
"value": "127.0.0.1/32",
"sequence": "10",
"enabled": "true",
"description": null,
Expand Down Expand Up @@ -1033,7 +1033,7 @@
"category": "cdr",
"subcategory": "allowed_address",
"value_type": "array",
"value": "::1",
"value": "::1/128",
"sequence": "20",
"enabled": "true",
"description": null,
Expand Down Expand Up @@ -1357,7 +1357,7 @@
"category": "httapihandler",
"subcategory": "allowed_address",
"value_type": "array",
"value": "::1",
"value": "::1/128",
"sequence": "20",
"enabled": "true",
"description": null,
Expand Down Expand Up @@ -1807,7 +1807,7 @@
"category": "xmlhandler",
"subcategory": "allowed_address",
"value_type": "array",
"value": "127.0.0.1",
"value": "127.0.0.1/32",
"sequence": "10",
"enabled": "true",
"description": null,
Expand Down Expand Up @@ -3229,7 +3229,7 @@
"category": "xmlhandler",
"subcategory": "allowed_address",
"value_type": "array",
"value": "::1",
"value": "::1/128",
"sequence": "20",
"enabled": "true",
"description": null,
Expand Down Expand Up @@ -3733,7 +3733,7 @@
"category": "cdr",
"subcategory": "allowed_address",
"value_type": "array",
"value": "127.0.0.1",
"value": "127.0.0.1/32",
"sequence": "10",
"enabled": "true",
"description": null,
Expand Down Expand Up @@ -4633,7 +4633,7 @@
"category": "recordings",
"subcategory": "allowed_address",
"value_type": "array",
"value": "127.0.0.1",
"value": "127.0.0.1/32",
"sequence": "10",
"enabled": "true",
"description": null,
Expand All @@ -4651,7 +4651,7 @@
"category": "recordings",
"subcategory": "allowed_address",
"value_type": "array",
"value": "::1",
"value": "::1/128",
"sequence": "20",
"enabled": "true",
"description": null,
Expand Down
14 changes: 8 additions & 6 deletions xmlcdr/views.py
Expand Up @@ -53,6 +53,7 @@
from pbx.restpermissions import (
AdminApiAccessPermission
)
from pbx.pbxipaddresscheck import pbx_ip_address_check, loopback_default
from .models import (
XmlCdr,
)
Expand All @@ -70,14 +71,15 @@ def xml_cdr_import(request):
debug = False
aa_cache_key = 'xmlcdr:allowed_addresses'
aa = cache.get(aa_cache_key)
if aa:
allowed_addresses = aa.split(',')
else:
allowed_addresses = PbxSettings().default_settings('cdr', 'allowed_address', 'array')
aa = ','.join(allowed_addresses)
if not aa:
aa = PbxSettings().default_settings('cdr', 'allowed_address', 'array')
if aa:
aa = list(aa)
else:
aa = loopback_default
cache.set(aa_cache_key, aa)

if request.META['REMOTE_ADDR'] not in allowed_addresses:
if not pbx_ip_address_check(request, aa):
return HttpResponseNotFound()

if request.method == 'POST':
Expand Down
46 changes: 16 additions & 30 deletions xmlhandler/views.py
Expand Up @@ -30,21 +30,24 @@
import logging
from django.http import HttpResponse, HttpResponseNotFound
from django.views.decorators.csrf import csrf_exempt
from pbx.pbxipaddresscheck import pbx_ip_address_check
from .xmlhandlerclasses import DirectoryHandler, DialplanHandler, LanguagesHandler, ConfigHandler

logger = logging.getLogger(__name__)

def check_ok_to_process(request, allowed_addresses, method='POST'):
if not pbx_ip_address_check(request, allowed_addresses):
return False
if not request.method == method:
return False
return True

@csrf_exempt
def dialplan(request):
debug = False
xmlhf = DialplanHandler()
allowed_addresses = xmlhf.get_allowed_addresses()

if request.META['REMOTE_ADDR'] not in allowed_addresses:
return HttpResponseNotFound()

if not request.method == 'POST':
if not check_ok_to_process(request, allowed_addresses):
return HttpResponseNotFound()

if debug:
Expand Down Expand Up @@ -72,15 +75,11 @@ def dialplan(request):
def staticdialplan(request):
xmlhf = DialplanHandler()
allowed_addresses = xmlhf.get_allowed_addresses()

if request.META['REMOTE_ADDR'] not in allowed_addresses:
if not check_ok_to_process(request, allowed_addresses, 'GET'):
return HttpResponseNotFound()

if request.method == 'GET':
hostname = request.GET.get('hostname', 'None')
xml = xmlhf.GetDialplanStatic(hostname)
else:
return HttpResponseNotFound()
hostname = request.GET.get('hostname', 'None')
xml = xmlhf.GetDialplanStatic(hostname)

return HttpResponse(xml, content_type='application/xml')

Expand All @@ -90,11 +89,7 @@ def directory(request):
debug = False
xmlhf = DirectoryHandler()
allowed_addresses = xmlhf.get_allowed_addresses()

if request.META['REMOTE_ADDR'] not in allowed_addresses:
return HttpResponseNotFound()

if not request.method == 'POST':
if not check_ok_to_process(request, allowed_addresses):
return HttpResponseNotFound()

if debug:
Expand Down Expand Up @@ -134,13 +129,9 @@ def staticdirectory(request):
xmlhf = DirectoryHandler()
allowed_addresses = xmlhf.get_allowed_addresses()

if request.META['REMOTE_ADDR'] not in allowed_addresses:
return HttpResponseNotFound()

if request.method == 'GET':
xml = xmlhf.GetDirectoryStatic()
else:
if not check_ok_to_process(request, allowed_addresses, 'GET'):
return HttpResponseNotFound()
xml = xmlhf.GetDirectoryStatic()

return HttpResponse(xml, content_type='application/xml')

Expand All @@ -150,10 +141,7 @@ def languages(request):
xmlhf = LanguagesHandler()
allowed_addresses = xmlhf.get_allowed_addresses()

if request.META['REMOTE_ADDR'] not in allowed_addresses:
return HttpResponseNotFound()

if not request.method == 'POST':
if not check_ok_to_process(request, allowed_addresses):
return HttpResponseNotFound()

if debug:
Expand All @@ -172,9 +160,7 @@ def configuration(request):

xmlhf = ConfigHandler()
allowed_addresses = xmlhf.get_allowed_addresses()
if request.META['REMOTE_ADDR'] not in allowed_addresses:
return HttpResponseNotFound()
if not request.method == 'POST':
if not check_ok_to_process(request, allowed_addresses):
return HttpResponseNotFound()
if debug:
logger.info('XML Handler request: {}'.format(request.POST))
Expand Down
18 changes: 9 additions & 9 deletions xmlhandler/xmlhandler.py
Expand Up @@ -31,7 +31,7 @@
from lxml import etree
from tenants.pbxsettings import PbxSettings
from switch.models import SwitchVariable

from pbx.pbxipaddresscheck import loopback_default

class XmlHandler():

Expand Down Expand Up @@ -84,15 +84,15 @@ def get_allowed_addresses(self):
cache_key = 'xmlhandler:allowed_addresses'
aa = cache.get(cache_key)
if aa:
allowed_addresses = aa.split(',')
else:
allowed_addresses = PbxSettings().default_settings('xmlhandler', 'allowed_address', 'array')
if allowed_addresses:
aa = ','.join(allowed_addresses)
else:
aa = '127.0.0.1'
return aa
aa = PbxSettings().default_settings('xmlhandler', 'allowed_address', 'array')
if aa:
aa = list(aa)
cache.set(cache_key, aa)
return allowed_addresses
return aa
aa = loopback_default
cache.set(cache_key, aa)
return aa

def get_default_language(self):
cache_key = 'xmlhandler:lang:default_language'
Expand Down

0 comments on commit 5eb6649

Please sign in to comment.