Skip to content

Commit

Permalink
Add sanitization to addSysteUser method, closes #461
Browse files Browse the repository at this point in the history
  • Loading branch information
dlc-ariel committed Jan 12, 2023
1 parent a1d203e commit 9acea7e
Showing 1 changed file with 37 additions and 16 deletions.
53 changes: 37 additions & 16 deletions FreeTAKServer/core/services/RestAPI.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flask import Flask, request, jsonify, session, send_file
from flask import Flask, request, jsonify, session, send_file, escape
from flask_sqlalchemy import SQLAlchemy
from flask_socketio import SocketIO, emit
from flask_httpauth import HTTPTokenAuth
Expand Down Expand Up @@ -415,7 +415,13 @@ def addSystemUserRest():
return str(e), 500

def addSystemUser(jsondata):
""" method which adds new system user
"""Method that adds new users to the system
:param jsondata: data #TODO create schema for this, also rename arg
:type jsondata: dict
:raises Exception: Malformed data exception
:return: message and response code (message: str, response code: int)
:rtype: tuple
"""
errors = []
for systemuser in jsondata['systemUsers']:
Expand All @@ -426,23 +432,38 @@ def addSystemUser(jsondata):
# if certs are to be generated the certificate generation is called DP is created and CoT is sent to
# client resulting in an automatic download of the certificate

cert_name = systemuser["Name"] + user_id
# sanitize system user name
system_user_name = str(escape(systemuser['Name']))

# sanitize system user group
system_user_group = str(escape(systemuser['Group']))

# sanitize system user token
system_user_token = str(escape(systemuser['Token']))

# sanitize system user pass
system_user_pw = str(escape(systemuser['Password']))

#sanitize system user device type
system_user_device_type = str(escape(systemuser['DeviceType']))

cert_name = system_user_name + user_id

# create certs
certificate_generation.AtakOfTheCerts().bake(common_name=cert_name)
if systemuser["DeviceType"].lower() == "wintak":

if system_user_device_type.lower() == "wintak":
certificate_generation.generate_wintak_zip(user_filename=cert_name + '.p12')
elif systemuser["DeviceType"].lower() == "mobile":
elif system_user_device_type.lower() == "mobile":
certificate_generation.generate_standard_zip(user_filename=cert_name+'.p12')
else:
raise Exception("invalid device type, must be either mobile or wintak")
# add DP
import string
import random
from pathlib import PurePath, Path
import hashlib
from defusedxml import ElementTree as etree
import shutil
import os

dp_directory = str(PurePath(Path(config.DataPackageFilePath)))
openfile = open(str(PurePath(Path(str(config.ClientPackages), cert_name + '.zip'))),
mode='rb')
Expand All @@ -456,10 +477,10 @@ def addSystemUser(jsondata):
dbController.create_datapackage(uid=user_id, Name=cert_name + '.zip', Hash=file_hash,
SubmissionUser='server',
CreatorUid='server-uid', Size=fileSize, Privacy=1)
dbController.create_systemUser(name=systemuser["Name"], group=systemuser["Group"],
token=systemuser["Token"], password=systemuser["Password"],
dbController.create_systemUser(name=system_user_name, group=system_user_group,
token=system_user_token, password=system_user_pw,
uid=user_id,
certificate_package_name=cert_name + '.zip', device_type = systemuser["DeviceType"])
certificate_package_name=cert_name + '.zip', device_type = system_user_device_type)
import datetime as dt
DATETIME_FMT = "%Y-%m-%dT%H:%M:%S.%fZ"
timer = dt.datetime
Expand All @@ -476,19 +497,19 @@ def addSystemUser(jsondata):
from FreeTAKServer.model.RawCoT import RawCoT
cot = RawCoT()
DPIP = getStatus().TCPDataPackageService.TCPDataPackageServiceIP
clientXML = f'<?xml version="1.0"?><event version="2.0" uid="{user_id}" type="b-f-t-r" time="{time}" start="{time}" stale="{stale}" how="h-e"><point lat="43.85570300" lon="-66.10801200" hae="19.55866360" ce="3.21600008" le="nan" /><detail><fileshare filename="{cert_name}" senderUrl="{DPIP}:8080/Marti/api/sync/metadata/{str(file_hash)}/tool" sizeInBytes="{fileSize}" sha256="{str(file_hash)}" senderUid="{"server-uid"}" senderCallsign="{"server"}" name="{cert_name + ".zip"}" /><ackrequest uid="{uuid.uuid4()}" ackrequested="true" tag="{cert_name + ".zip"}" /><marti><dest callsign="{systemuser["Name"]}" /></marti></detail></event>'
clientXML = f'<?xml version="1.0"?><event version="2.0" uid="{user_id}" type="b-f-t-r" time="{time}" start="{time}" stale="{stale}" how="h-e"><point lat="43.85570300" lon="-66.10801200" hae="19.55866360" ce="3.21600008" le="nan" /><detail><fileshare filename="{cert_name}" senderUrl="{DPIP}:8080/Marti/api/sync/metadata/{str(file_hash)}/tool" sizeInBytes="{fileSize}" sha256="{str(file_hash)}" senderUid="{"server-uid"}" senderCallsign="{"server"}" name="{cert_name + ".zip"}" /><ackrequest uid="{uuid.uuid4()}" ackrequested="true" tag="{cert_name + ".zip"}" /><marti><dest callsign="{system_user_name}" /></marti></detail></event>'
cot.xmlString = clientXML.encode()
newCoT = SendOtherController(cot, addToDB=False)
APIPipe.put(newCoT.getObject())

else:
# in the event no certificate is to be generated simply create a system user
dbController.create_systemUser(name=systemuser["Name"], group=systemuser["Group"],
token=systemuser["Token"], password=systemuser["Password"],
uid=user_id, device_type = systemuser["DeviceType"])
dbController.create_systemUser(name=system_user_name, group=system_user_group,
token=system_user_token, password=system_user_pw,
uid=user_id, device_type = system_user_device_type)
except Exception as e:
if isinstance(systemuser, dict) and "Name" in systemuser:
errors.append(f"operation failed for user {systemuser['Name']} with error {str(e)}")
errors.append(f"operation failed for user {system_user_name} with error {str(e)}")
else:
errors.append(f"operation failed for user missing name parameter with error {str(e)}")

Expand Down

0 comments on commit 9acea7e

Please sign in to comment.