Skip to content

Commit

Permalink
Clean implementation, compatible with GSSAPI scheme.
Browse files Browse the repository at this point in the history
  • Loading branch information
manuco committed Mar 19, 2021
1 parent fbf0e85 commit e8cf19e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 60 deletions.
112 changes: 52 additions & 60 deletions kafka/conn.py
Expand Up @@ -79,15 +79,14 @@ class SSLWantWriteError(Exception):
import gssapi
from gssapi.raw.misc import GSSError
except ImportError:
#no gssapi available, will disable gssapi mechanism
gssapi = None
GSSError = None


try:
import kerberos_sspi as kerberos
import kerberos_sspi
except ImportError:
kerberos = None
kerberos_sspi = None

AFI_NAMES = {
socket.AF_UNSPEC: "unspecified",
Expand Down Expand Up @@ -185,8 +184,7 @@ class BrokerConnection(object):
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512, KERBEROS_V5
(Windows Only, via win32's SSPI).
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Expand Down Expand Up @@ -233,7 +231,7 @@ class BrokerConnection(object):
'sasl_oauth_token_provider': None
}
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512", "KERBEROS_V5")
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512")

def __init__(self, host, port, afi, **configs):
self.host = host
Expand Down Expand Up @@ -276,10 +274,9 @@ def __init__(self, host, port, afi, **configs):
'sasl_plain_password required for PLAIN or SCRAM sasl'
)
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
if gssapi is None and kerberos_sspi is None:
raise AssertionError('No GSSAPI lib available')
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
if self.config['sasl_mechanism'] == 'KERBEROS_V5':
assert kerberos is not None, 'KERBEROS_V5 / kerberos-sspi lib not available'
if self.config['sasl_mechanism'] == 'OAUTHBEARER':
token_provider = self.config['sasl_oauth_token_provider']
assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
Expand Down Expand Up @@ -530,13 +527,11 @@ def _try_authenticate(self):

if self._sasl_auth_future is None:
# Build a SaslHandShakeRequest message
sasl_mechanism = self.config['sasl_mechanism'] if self.config['sasl_mechanism'] != "KERBEROS_V5" else "GSSAPI"
log.debug(f'Using {sasl_mechanism} sasl_mechanism')
request = SaslHandShakeRequest[0](sasl_mechanism)
request = SaslHandShakeRequest[0](self.config['sasl_mechanism'])
future = Future()
sasl_response = self._send(request)
sasl_response.add_callback(self._handle_sasl_handshake_response, future)
sasl_response.add_errback(self._handle_err, future)
sasl_response.add_errback(lambda f, e: f.failure(e), future)
self._sasl_auth_future = future

for r, f in self.recv():
Expand All @@ -551,21 +546,14 @@ def _try_authenticate(self):
raise ex # pylint: disable-msg=raising-bad-type
return self._sasl_auth_future.succeeded()

def _handle_err(self, f, e):
print(f, e)
f.failure(e)




def _handle_sasl_handshake_response(self, future, response):
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
error = error_type(self)
self.close(error=error)
return future.failure(error_type(self))
sasl_mechanism = self.config['sasl_mechanism'] if self.config['sasl_mechanism'] != "KERBEROS_V5" else "GSSAPI"
if sasl_mechanism not in response.enabled_mechanisms:

if self.config['sasl_mechanism'] not in response.enabled_mechanisms:
return future.failure(
Errors.UnsupportedSaslMechanismError(
'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s'
Expand All @@ -574,8 +562,6 @@ def _handle_sasl_handshake_response(self, future, response):
return self._try_authenticate_plain(future)
elif self.config['sasl_mechanism'] == 'GSSAPI':
return self._try_authenticate_gssapi(future)
elif self.config['sasl_mechanism'] == 'KERBEROS_V5':
return self._try_authenticate_kerberos_v5(future)
elif self.config['sasl_mechanism'] == 'OAUTHBEARER':
return self._try_authenticate_oauth(future)
elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"):
Expand Down Expand Up @@ -728,13 +714,20 @@ def _try_authenticate_scram(self, future):
return future.success(True)

def _try_authenticate_gssapi(self, future):
kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host
auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
if gssapi is not None:
return self._try_authenticate_gssapi_gss_implementation(future)

if kerberos_sspi is not None:
return self._try_authenticate_gssapi_sspi_implementation(future)

def _try_authenticate_gssapi_gss_implementation(self, future):
kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host
auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_host_name
gssapi_name = gssapi.Name(
auth_id,
name_type=gssapi.NameType.hostbased_service
).canonicalize(gssapi.MechType.kerberos)
log.debug('%s: GSSAPI name: %s', self, gssapi_name)
log.debug('%s: GSSAPI Service Principal Name: %s', self, gssapi_name)

err = None
close = False
Expand Down Expand Up @@ -797,12 +790,17 @@ def _try_authenticate_gssapi(self, future):
self.close(error=err)
return future.failure(err)

log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name)
log.info(
'%s: Authenticated as %s to %s via GSSAPI',
self,
client_ctx.initiator_name,
client_ctx.target_name
)
return future.success(True)

def _try_authenticate_kerberos_v5(self, future):
target_host = self.config['sasl_kerberos_domain_name'] or self.host
service_principal_name = self.config['sasl_kerberos_service_name'] + '@' + target_host
def _try_authenticate_gssapi_sspi_implementation(self, future):
kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host
service_principal_name = self.config['sasl_kerberos_service_name'] + '@' + kerberos_host_name

err = None
close = False
Expand All @@ -816,26 +814,32 @@ def _try_authenticate_kerberos_v5(self, future):

# Establish security context and negotiate protection level
# For reference RFC 2222, section 7.2.1
flags = kerberos.GSS_C_CONF_FLAG|kerberos.GSS_C_INTEG_FLAG|kerberos.GSS_C_MUTUAL_FLAG|kerberos.GSS_C_SEQUENCE_FLAG
res, client_ctx = kerberos.authGSSClientInit(service_principal_name, gssflags=flags)
assert res == kerberos.AUTH_GSS_COMPLETE
flags = \
kerberos_sspi.GSS_C_CONF_FLAG | \
kerberos_sspi.GSS_C_INTEG_FLAG | \
kerberos_sspi.GSS_C_MUTUAL_FLAG | \
kerberos_sspi.GSS_C_SEQUENCE_FLAG

# Create a security context.
res, client_ctx = kerberos_sspi.authGSSClientInit(service_principal_name, gssflags=flags)
assert res == kerberos_sspi.AUTH_GSS_COMPLETE

res = kerberos.AUTH_GSS_CONTINUE
res = kerberos_sspi.AUTH_GSS_CONTINUE
received_token = b""
# Exchange tokens until authentication either succeeds or fails
krb_round = 0
while res == kerberos.AUTH_GSS_CONTINUE:
while res == kerberos_sspi.AUTH_GSS_CONTINUE:
krb_round += 1
log.debug(f"Round {krb_round}")
res = kerberos.authGSSClientStep(client_ctx, kerberos.encodestring(received_token))
res = kerberos_sspi.authGSSClientStep(client_ctx, kerberos_sspi.encodestring(received_token))
if res == -1:
raise RuntimeError("Client Step Error", res)

output_token = client_ctx["response"] # get the binary data, not a base64 encoded version

# pass output token to kafka, or send empty response if the security
# context is complete (output token is None in that case)
if res != kerberos.AUTH_GSS_CONTINUE:
if res != kerberos_sspi.AUTH_GSS_CONTINUE:
self._send_bytes_blocking(Int32.encode(0))
else:
msg = output_token
Expand All @@ -844,7 +848,7 @@ def _try_authenticate_kerberos_v5(self, future):

# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The remote gssapi will be able to identify the needed next step.
# The connection is closed on failure.
header = self._recv_bytes_blocking(4)
(token_size,) = struct.unpack('>i', header)
Expand All @@ -854,7 +858,7 @@ def _try_authenticate_kerberos_v5(self, future):
# once the security context is established.

# unwraps message containing supported protection levels and msg size
kerberos.authGSSClientUnwrap(client_ctx, kerberos.encodestring(received_token))
kerberos_sspi.authGSSClientUnwrap(client_ctx, kerberos_sspi.encodestring(received_token))
msg = client_ctx["response"]

# Kafka currently doesn't support integrity or confidentiality security layers, so we
Expand All @@ -864,24 +868,7 @@ def _try_authenticate_kerberos_v5(self, future):
msg += service_principal_name.encode("utf-8")
# add authorization identity to the response, GSS-wrap and send it



# import sspicon, win32security
# pkg_size_info = client_ctx["csa"].ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
# trailersize=pkg_size_info['SecurityTrailer']
#
# encbuf=win32security.PySecBufferDescType()
# encbuf.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA))
# encbuf.append(win32security.PySecBufferType(trailersize, sspicon.SECBUFFER_TOKEN))
# encbuf[0].Buffer=msg
# client_ctx["csa"].ctxt.EncryptMessage(0,encbuf,client_ctx["csa"]._get_next_seq_num())
#
# msg = encbuf[0].Buffer #+ encbuf[1].Buffer
# # return encbuf[0].Buffer, encbuf[1].Buffer



kerberos.authGSSClientWrap(client_ctx, kerberos.encodestring(msg), service_principal_name)
kerberos_sspi.authGSSClientWrap(client_ctx, kerberos_sspi.encodestring(msg), service_principal_name)
msg = client_ctx["response"]

size = Int32.encode(len(msg))
Expand All @@ -892,7 +879,6 @@ def _try_authenticate_kerberos_v5(self, future):
err = Errors.KafkaConnectionError("%s: %s" % (self, e))
close = True
except Exception as e:
# raise
err = e
close = True

Expand All @@ -901,7 +887,13 @@ def _try_authenticate_kerberos_v5(self, future):
self.close(error=err)
return future.failure(err)

log.info('%s: Authenticated as %s via GSSAPI', self, service_principal_name)
log.info(
'%s: Authenticated as %s to %s via Windows SSPI',
self,
kerberos_sspi.authGSSClientUserName(client_ctx),
kerberos_sspi.authGSSServerTargetName(client_ctx), # incomplete API...
)

return future.success(True)


Expand Down
2 changes: 2 additions & 0 deletions setup.py
Expand Up @@ -41,6 +41,8 @@ def run(cls):
"lz4": ["lz4"],
"snappy": ["python-snappy"],
"zstd": ["python-zstandard"],
"gssapi": ["gssapi"],
"gssapi_sspi": ["kerberos-sspi"],
},
cmdclass={"test": Tox},
packages=find_packages(exclude=['test']),
Expand Down

0 comments on commit e8cf19e

Please sign in to comment.