Skip to content

Commit

Permalink
Add 'ServerAddress' Compactor Config (#3636)
Browse files Browse the repository at this point in the history
Make compactor take a ServerAddress config
from YML file whose value can be replaced
by corfu-server init script
  • Loading branch information
chetangudisagar committed Jun 7, 2023
1 parent 2e87474 commit d16178d
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 56 deletions.
7 changes: 7 additions & 0 deletions infrastructure/src/main/resources/corfu-compactor-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,10 @@ Security:
Truststore: /certs/truststore.jks
TruststorePassword: /password/password

# The below param values will be replaced during runtime by corfu-server init script
ServerAddress:
Hostname:
Port: 9000
# The below params are used only for self ip discovery
NetworkInterface: eth0
NetworkInterfaceVersion: IPv4
186 changes: 130 additions & 56 deletions scripts/compactor_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self):
Fields are public to keep it simple.
"""
self.network_interface = None
self.network_interface_version = None
self.hostname = None
self.corfu_port = None
self.configPath = None
Expand Down Expand Up @@ -121,47 +122,6 @@ def append_compactor_config(self, compactor_config):

return " ".join(cmd)

def _resolve_ip_address(self, ifname):
"""
If environment variable is set, resolve using it. Else, get an
address of from the network interfaces. IPv6 is preferred over IPv4.
"""

pod_name = os.environ.get(POD_NAME, "default_name")
pod_namespace = os.environ.get(POD_NAMESPACE, "default_namespace")
if pod_name != "default_name" and pod_namespace != "default_namespace":
return pod_name + ".corfu-headless." + pod_namespace + ".svc.cluster.local"

network_interfaces = netifaces.interfaces()
ip_version = netifaces.AF_INET6 # Default is IPV6
generic_interfaces = ['eth0', 'en0', 'eth', 'en', 'lo']

for iteration in range(2):
try:
return netifaces.ifaddresses(ifname)[ip_version][0]['addr']
except (KeyError, ValueError) as e:
print('Unable to find valid IP in the interface %s at iteration %s, looking for other options.'
% (ifname, iteration), e)

for generic_interface in generic_interfaces:
for network_interface in network_interfaces:
if network_interface.startswith(generic_interface):
try:
return netifaces.ifaddresses(network_interface)[ip_version][0]['addr']
except (KeyError, ValueError) as e:
print('Unable to find valid IP in the interface %s at iteration %s,'
' looking for other options.'
% (network_interface, iteration), e)

if iteration == 0:
if ip_version == netifaces.AF_INET6:
ip_version = netifaces.AF_INET
else:
ip_version = netifaces.AF_INET6

raise RuntimeError("Could not find any IP addresses. "
"Please check the network interfaces and the program arguments.")

def get_corfu_compactor_cmd(self, compactor_config, class_to_invoke):
diskBacked = False
if "DiskBacked" in compactor_config["MemoryOptions"] and compactor_config["MemoryOptions"]['DiskBacked'] is True:
Expand All @@ -182,8 +142,7 @@ def get_corfu_compactor_cmd(self, compactor_config, class_to_invoke):
cmd.append("-Dio.netty.recycler.maxCapacityPerThread=0")
cmd.append(self.append_compactor_config(compactor_config))
cmd.append(class_to_invoke)

cmd.append("--hostname=" + self._resolve_ip_address(self._config.network_interface))
cmd.append("--hostname=" + self._config.hostname)
cmd.append("--port=" + self._config.corfu_port)
cmd.append("--tlsEnabled=true")
cmd.append("--bulkReadSize=" + str(COMPACTOR_BULK_READ_SIZE))
Expand Down Expand Up @@ -226,7 +185,7 @@ def run(self):
"""
Run compactor.
"""
self._print_and_log("Invoked compactor_runner...");
self._print_and_log("Invoked compactor_runner...")
if self._config.freezeCompaction and self._config.unfreezeCompaction:
self._print_and_log("ERROR: Both freeze and unfreeze compaction parameters cannot be passed together")
return
Expand All @@ -251,7 +210,7 @@ def _rsync_log(self, src_file_prefix, dst_dir):
check_output("mkdir " + dst_dir, shell=True)
self._print_and_log("create dst " + dst_dir)

flist = glob.glob(src_file_prefix + "*");
flist = glob.glob(src_file_prefix + "*")
for file in flist:
try:
if file.find("current") == -1:
Expand Down Expand Up @@ -282,6 +241,12 @@ def _run_corfu_compactor(self):
except Exception as ex:
self._print_and_log("Failed to run rsync_log " + " error: " + str(ex))

self._configure_server_address(compactor_config)
self._print_and_log("_run_corfu_compactor: Configured compacter hostname as "
+ self._config.hostname
+ ", port as "
+ self._config.corfu_port)

if self._config.startCheckpointing:
grep_running_tool = "ps aux | grep 'python3 /usr/share/corfu/scripts/compactor_runner.py' | grep 'startCheckpointing' | grep -v 'grep' | grep " + self._config.corfu_port

Expand Down Expand Up @@ -323,7 +288,7 @@ def _run_corfu_compactor(self):
cmd = self._command_builder.get_corfu_compactor_cmd(compactor_config, class_to_invoke)
self._print_and_log("Start compacting. Command %s" % cmd)
check_call(cmd, shell=True)
self._print_and_log("Finished running corfu compactor.")
self._print_and_log("Finished running corfu compactor tool.")

except Exception as ex:
self._print_and_log("Failed to run compactor tool: %s" % str(ex))
Expand All @@ -336,11 +301,15 @@ def _complete_config(self, args):
Return a Config object.
"""
config = Config()
config.network_interface = args.ifname
config.corfu_port = args.port
config.configPath = args.compactorConfig
if 'hostname' in args and args.hostname:
config.hostname = args.hostname
if 'port' in args and args.port:
config.corfu_port = args.port
if 'network_interface' in args and args.network_interface:
config.network_interface = args.network_interface
if 'network_interface_version' in args and args.network_interface_version:
config.network_interface_version = args.network_interface_version
if 'instantTriggerCompaction' in args:
config.instantTriggerCompaction = args.instantTriggerCompaction
if 'trimAfterCheckpoint' in args:
Expand All @@ -357,22 +326,128 @@ def _complete_config(self, args):
config.startCheckpointing = args.startCheckpointing
return config

def _configure_server_address(self, compactor_config):
"""
Configure Server hostname and port from args and config file.
This method invokes resolve ip method to configure from network interfaces
when the hostname is not present in args or config file.
:param compactor_config: values parsed from config file
:return:
"""
server_address = None
if 'ServerAddress' in compactor_config:
server_address = compactor_config["ServerAddress"]
else:
self._print_and_log("WARNING: _configure_server_address: ServerAddress is not present in config file.")

if not self._config.corfu_port:
if server_address and "Port" in server_address and server_address["Port"]:
self._config.corfu_port = str(server_address["Port"])
else:
# When port is not present in the server args or port param is not there in the config file
raise RuntimeError("_configure_server_address: Port is not present in args or config file."
" Please check the configurations.")

# configuring hostname and port
pod_name = os.environ.get(POD_NAME, "default_name")
pod_namespace = os.environ.get(POD_NAMESPACE, "default_namespace")
if pod_name != "default_name" and pod_namespace != "default_namespace":
self._config.hostname = pod_name + ".corfu-headless." + pod_namespace + ".svc.cluster.local"
return

if not self._config.hostname:
if self._config.network_interface and self._config.network_interface_version:
self._print_and_log("WARNING: _configure_server_address: Hostname is not present in args."
" Continuing self IP discovery with {}"
" interface and {} version from args.".format(self._config.network_interface,
self._config.network_interface_version))
self._config.hostname = self._resolve_ip_address(
network_interface=self._config.network_interface,
network_interface_version=self._config.network_interface_version
)
return
elif server_address:
self._print_and_log("_configure_server_address: Found the server_address from config file: {}"
.format(server_address))
if "Hostname" in server_address and server_address["Hostname"]:
self._config.hostname = server_address["Hostname"]
return
# Do self ip discovery since hostname is not configured
elif "NetworkInterfaceVersion" in server_address and server_address["NetworkInterfaceVersion"] \
and "NetworkInterface" in server_address and server_address["NetworkInterface"]:
self._print_and_log("WARNING: _configure_server_address: Hostname is not present in args or"
" config file. Continuing self IP discovery with {} interface and {} version"
" from config file.".format(server_address["NetworkInterface"],
server_address["NetworkInterfaceVersion"]))
self._config.hostname = self._resolve_ip_address(
network_interface=server_address["NetworkInterface"],
network_interface_version=server_address["NetworkInterfaceVersion"]
)
return
else:
raise RuntimeError("_configure_server_address: Hostname or network interface info"
" is not present in args or config file. Please check the configurations.")
else:
# When server address param is not there in the config file or hostname is not present in the args
raise RuntimeError("_configure_server_address: Couldn't load the server_address from args or config"
" file. Please check the configurations.")


def _resolve_ip_address(self, network_interface=None, network_interface_version=None):
"""
If environment variable or hostname config is set, resolve using it. Else, get an
address of from the network interfaces. IPv6 is preferred over IPv4.
"""
network_interface_version = network_interface_version.upper()
network_interfaces = netifaces.interfaces()
network_interface_versions_dict = {'IPV4': netifaces.AF_INET, 'IPV6': netifaces.AF_INET6}
generic_interfaces = ['eth0', 'en0', 'eth', 'en', 'lo']

for iteration in range(2):
try:
return netifaces.ifaddresses(network_interface)[network_interface_versions_dict[network_interface_version]][0]['addr']
except (KeyError, ValueError) as e:
print('Unable to find valid IP in the interface %s at iteration %s, looking for other options.'
% (network_interface, iteration), e)

for generic_interface in generic_interfaces:
for network_interface in network_interfaces:
if network_interface.startswith(generic_interface):
try:
return netifaces.ifaddresses(network_interface)[network_interface_versions_dict[network_interface_version]][0]['addr']
except (KeyError, ValueError) as e:
self._print_and_log('Unable to find valid IP in the interface %s at iteration %s,'
' looking for other options.'
.format(network_interface, iteration, e))

if iteration == 0:
if network_interface_version == 'IPV6':
network_interface_version = 'IPV4'
else:
network_interface_version = 'IPV6'

raise RuntimeError("Could not find any IP addresses. "
"Please check the network interfaces and the program arguments.")

if __name__ == "__main__":
arg_parser = ArgumentParser()

arg_parser.add_argument("--ifname", type=str,
arg_parser.add_argument("--network_interface", type=str,
help="The network interface that corfu server is listening to. "
"Default value is eth0.",
required=False,
default="eth0")
"Default value from config file is eth0.",
required=False)
arg_parser.add_argument("--network_interface_version", type=str,
help="The network interface version that corfu server is listening to. "
"Default value from config file is IPv4.",
required=False)
arg_parser.add_argument("--hostname", type=str,
help="The corfu server hostname",
required=False)
arg_parser.add_argument("--port", type=str,
help="The corfu server port number. "
"Default value is 9000.",
required=False,
default="9000")
"Default value from config file is 9000.",
required=False)
arg_parser.add_argument("--compactorConfig", type=str,
help="The file containing config for compactor",
default="/usr/share/corfu/conf/corfu-compactor-config.yml",
Expand Down Expand Up @@ -401,4 +476,3 @@ def _complete_config(self, args):
args = arg_parser.parse_args()
compactor_runner = CompactorRunner(args)
compactor_runner.run()

0 comments on commit d16178d

Please sign in to comment.