Skip to content

Commit

Permalink
Using new ddlog version
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanhe committed Oct 21, 2016
1 parent 40aa9af commit c40d55a
Showing 1 changed file with 46 additions and 27 deletions.
73 changes: 46 additions & 27 deletions salt/src/numbskull_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,13 @@ def prep_numbskull(self):
"""TODO."""
# Setup local instance
self.prep_local_numbskull()
# Setup minion instnances
# Setup minion instances
success = self.prep_minions_numbskull()
if not success:
print('ERROR: Numbksull not loaded')

def prepare_db(self):
"""TODO."""
# hard-coded application directory
# application_dir = "/dfs/scratch0/bryanhe/genomics/"
# application_dir = "/dfs/scratch0/bryanhe/census/"
# application_dir = "/dfs/scratch0/bryanhe/voting/"
# application_dir = "/dfs/scratch0/bryanhe/congress6/"

# obtain database url from file
with open(self.application_dir + "/db.url", "r") as f:
Expand All @@ -244,11 +239,16 @@ def prepare_db(self):

# Obtain partition information
cmd = ["ddlog", "semantic-partition", "app.ddlog",
"--ppa", "-w", str(self.num_minions)]
"--ppa",
# "-u",
"--workers", str(self.num_minions),
"--cost-model", "simple.costmodel.txt"]
print(" ".join(cmd))
partition_json = subprocess.check_output(cmd, cwd=self.application_dir)
partition = json.loads(partition_json)

begin = time.time()

# Connect to an existing database
# http://stackoverflow.com/questions/15634092/connect-to-an-uri-in-postgres
url = urlparse.urlparse(db_url)
Expand All @@ -264,24 +264,38 @@ def prepare_db(self):
host=hostname,
port=port
)

# Open a cursor to perform database operations
cur = conn.cursor()

# Select which partitioning
# TODO: actually check costs
print(len(partition))
print(partition[0].keys())
print(80 * "*")
for p in partition:
print(p["partition_types"])
cur.execute(p["sql_to_cost"])
cost = cur.fetchone()[0]
print(cost)
# if p["partition_types"] == "":
# if p["partition_types"] == "(0)":
if p["partition_types"] == self.partition_type:
p0 = p
print(80 * "*")

# Define functions that sql needs
for op in partition[0]["sql_prefix"]:
cur.execute(op)
# Make the changes to the database persistent
conn.commit()

# Select which partitioning scheme to use
if self.partition_type is not None:
# Type was prespecified
for p in partition:
if p["partition_types"] == self.partition_type:
p0 = p
else:
# Evaluating costs
print(80 * "*")
optimal_cost = None
for p in partition:
cur.execute(p["sql_to_cost"])
cost = cur.fetchone()[0]
print('Partition scheme "' + p["partition_types"] +
'" has cost ' + str(cost))
if optimal_cost is None or cost < optimal_cost:
optimal_cost = cost
p0 = p
print(80 * "*")

# p0 is partition to use
for k in p0.keys():
Expand Down Expand Up @@ -514,13 +528,18 @@ def main(application_dir, machines, threads_per_machine,
"inference_time": infer_time}

if __name__ == "__main__":
if len(sys.argv) == 7:
main(sys.argv[1],
int(sys.argv[2]),
int(sys.argv[3]),
int(sys.argv[4]),
int(sys.argv[5]),
sys.argv[6])
if len(sys.argv) == 6 or len(sys.argv) == 7:
application_dir = sys.argv[1]
machines = int(sys.argv[2])
threads_per_machine = int(sys.argv[3])
learning_epochs = int(sys.argv[4])
inference_epochs = int(sys.argv[5])
partition_type = None
if len(sys.argv) == 7:
partition_type = sys.argv[6]

main(application_dir, machines, threads_per_machine,
learning_epochs, inference_epochs, partition_type)
else:
print("Usage: " + sys.argv[0] +
" application_dir" +
Expand Down

0 comments on commit c40d55a

Please sign in to comment.