Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 36 additions & 50 deletions citus_dev/citus_dev
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,57 @@ from docopt import docopt
from subprocess import call
from subprocess import Popen, PIPE
import os
import subprocess
import sys
import getpass
import time


def createNodeCommands(clustername, role, index=None, usessl=False, mx=False):
cs = []
def run(command, *args, **kwargs):
print(command)
result = subprocess.run(command, *args, check=True, shell=True, **kwargs)
print()
return result


def createNodeCommands(clustername, role, index=None, usessl=False, mx=False):
nodename = role
if index != None:
nodename += "%d" % index

dir = "%s/%s" % (clustername, nodename)
cs.append("initdb -D %s" % dir)
cs.append("echo \"shared_preload_libraries = 'citus'\" >> %s/postgresql.conf" % dir)
cs.append('echo "wal_level = logical" >> %s/postgresql.conf' % dir)
run("initdb -D %s" % dir)
run("echo \"shared_preload_libraries = 'citus,pg_stat_statements'\" >> %s/postgresql.conf" % dir)
run('echo "wal_level = logical" >> %s/postgresql.conf' % dir)

if usessl:
cs.append('echo "ssl = on" >> %s/postgresql.conf' % dir)
cs.append(
run('echo "ssl = on" >> %s/postgresql.conf' % dir)
run(
"echo \"citus.node_conninfo = 'sslmode=require'\" >> %s/postgresql.conf"
% dir
)
cs.append(
run(
"openssl req -new -x509 -days 365 -nodes -text -out %s/server.crt -keyout %s/server.key -subj '/CN=%s'"
% (dir, dir, nodename)
)
cs.append("chmod 0600 %s/server.key" % dir)
run("chmod 0600 %s/server.key" % dir)

if mx:
cs.append(
run(
"echo \"citus.replication_model = 'streaming'\" >> %s/postgresql.conf" % dir
)

return cs


def main(arguments):
print(arguments)
if arguments["make"]:
cs = []
if arguments['--destroy']:
name = arguments["<name>"]
for role in getRoles(name):
cs.append("pg_ctl stop -D %s/%s" % (name, role))
cs.append('rm -rf %s' % (name))
run("pg_ctl stop -D %s/%s || true" % (name, role))
run('rm -rf %s' % (name))

cs += createNodeCommands(
createNodeCommands(
arguments["<name>"],
"coordinator",
usessl=arguments["--use-ssl"],
Expand All @@ -77,7 +81,7 @@ def main(arguments):
size = int(arguments["--size"])

for i in range(size):
cs += createNodeCommands(
createNodeCommands(
arguments["<name>"],
"worker",
i,
Expand All @@ -89,15 +93,17 @@ def main(arguments):

cport = port
role = "coordinator"
cs.append(
run(
'pg_ctl -D %s/%s -o "-p %d" -l %s_logfile start'
% (arguments["<name>"], role, cport, role)
)
port += 1

worker_ports = []
for i in range(size):
role = "worker%d" % i
cs.append(
worker_ports.append(port)
run(
'pg_ctl start -D %s/%s -o "-p %d" -l %s_logfile'
% (arguments["<name>"], role, port, role)
)
Expand All @@ -106,93 +112,73 @@ def main(arguments):

if getpass.getuser() != 'postgres' and not os.getenv('PGDATABASE'):
for i in range(size + 1):
cs.append('createdb -p %d' % (port + i))
run('createdb -p %d' % (port + i))

if not arguments["--no-extension"]:
for i in range(size + 1):
cs.append('psql -p %d -c "CREATE EXTENSION citus;"' % (port + i))
run('psql -p %d -c "CREATE EXTENSION citus;"' % (port + i))

# If the cluster size is 0 we add the coordinator as the only node, otherwise we will add all other nodes
if size == 0:
cs.append(
run(
"psql -p %d -c \"SELECT * from master_add_node('localhost', %d);\""
% (port, port)
)
else:
for i in range(size):
cs.append(
run(
"psql -p %d -c \"SELECT * from master_add_node('localhost', %d);\""
% (port, port + 1 + i)
)
if arguments["--mx"]:
cs.append(
run(
"psql -p %d -c \"SELECT start_metadata_sync_to_node('localhost', %d);\""
% (port, port + 1 + i)
)

cs.append(
run(
'psql -p %d -c "SELECT * from master_get_active_worker_nodes();"'
% (port)
)
if arguments['--init-with']:
cs.append('psql -p %d -f %s' % (cport, arguments['--init-with']))

for c in cs:
print(c)
os.system(c)
print("")
run('psql -p %d -f %s -v ON_ERROR_STOP=1' % (cport, arguments['--init-with']))

elif arguments["stop"]:
cs = []
name = arguments["<name>"]
for role in getRoles(name):
cs.append("pg_ctl stop -D %s/%s" % (name, role))
run("pg_ctl stop -D %s/%s" % (name, role))

for c in cs:
print(c)
os.system(c)
print("")

elif arguments["start"]:
cs = []
name = arguments["<name>"]
port = int(arguments["--port"])
cport = port
for role in getRoles(name):
cs.append(
run(
'pg_ctl start -D %s/%s -o "-p %d" -l %s_logfile'
% (name, role, cport, role)
)
cport += 1

for c in cs:
print(c)
os.system(c)
print("")

elif arguments["restart"]:
cs = []
name = arguments["<name>"]
port = int(arguments["--port"])
if arguments["--watch"]:
cs.append(
run(
"fswatch -0 '%s' | xargs -0 -n 1 -I{} citus_dev restart %s --port=%d"
% (citus_so(), name, port)
)

else:
cport = port
for role in getRoles(name):
cs.append(
run(
'pg_ctl restart -D %s/%s -o "-p %d" -l %s_logfile'
% (name, role, cport, role)
)
cport += 1

for c in cs:
print(c)
os.system(c)
print("")

else:
print("unknown command")
Expand Down