Skip to content

Commit

Permalink
Add new backend Ketrew_yarn
Browse files Browse the repository at this point in the history
This for now comes without tests since it was tested “live” with Biokepi and our
Hadoop cluster.

Creating an instant one-node virtual hadoop-cluster (like the others in
`src/test/intergration.ml`) is left for future work :)
  • Loading branch information
smondet committed Mar 12, 2015
1 parent 3625830 commit 046a304
Show file tree
Hide file tree
Showing 13 changed files with 464 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/atd/daemonize_v0.atd
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type created = {
program: program;
using_hack: [ Nohup_setsid | Python_daemon ];
starting_timeout: time;
shell_command: string list;
}
type running = {
pid: int option;
Expand Down
4 changes: 4 additions & 0 deletions src/atd/versioned.atd
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ type lsf_run_parameters_v0
type pbs_run_parameters_v0
<ocaml from="Ketrew_gen_pbs_v0.Run_parameters" > = abstract

type yarn_run_parameters_v0
<ocaml from="Ketrew_gen_yarn_v0.Run_parameters" > = abstract

type target = [ V0 of target_v0 ]
type stored_target = [ V0 of stored_target_v0 ]

type daemonize_run_parameters = [ V0 of daemonize_run_parameters_v0 ]
type lsf_run_parameters = [ V0 of lsf_run_parameters_v0 ]
type pbs_run_parameters = [ V0 of pbs_run_parameters_v0 ]
type yarn_run_parameters = [ V0 of yarn_run_parameters_v0 ]

type measurement_collection_v0
<ocaml from="Ketrew_gen_base_v0.Measurement_collection" > = abstract
Expand Down
49 changes: 49 additions & 0 deletions src/atd/yarn_v0.atd
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

(**************************************************************************)
(* Copyright 2014, Sebastien Mondet <seb@mondet.org> *)
(* *)
(* Licensed under the Apache License, Version 2.0 (the "License"); *)
(* you may not use this file except in compliance with the License. *)
(* You may obtain a copy of the License at *)
(* *)
(* http://www.apache.org/licenses/LICENSE-2.0 *)
(* *)
(* Unless required by applicable law or agreed to in writing, software *)
(* distributed under the License is distributed on an "AS IS" BASIS, *)
(* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *)
(* implied. See the License for the specific language governing *)
(* permissions and limitations under the License. *)
(**************************************************************************)

type time <ocaml from="Ketrew_gen_base_v0.Time"> = abstract
type host <ocaml from="Ketrew_gen_base_v0.Host"> = abstract
type path <ocaml from="Ketrew_gen_base_v0.Path"> = abstract
type program <ocaml from="Ketrew_gen_base_v0.Program"> = abstract
type monitored_script <ocaml from="Ketrew_gen_base_v0.Monitored_script"> = abstract

type daemonized_run <ocaml from="Ketrew_gen_daemonize_v0.Running"> = abstract

type distributed_shell_parameters = {
hadoop_bin: string;
distributed_shell_shell_jar: string;
container_memory: [ GB of int | Raw of string ];
timeout: [ Seconds of int | Raw of string ];
application_name: string;
}
type created = {
host: host;
program: [
| Distributed_shell of (distributed_shell_parameters * program)
| Yarn_application of program
];
daemonize_using: [ Nohup_setsid | Python_daemon ];
daemon_start_timeout: float;
}
type running = {
created: created;
daemonized_script: daemonized_run;
}
type run_parameters = [
| Created of created
| Running of running
]
41 changes: 31 additions & 10 deletions src/lib/pure/ketrew_daemonize.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ let created =

let name = "daemonize"

let default_shell = "bash"
let script_placeholder = "<script>"
let default_shell_command = [default_shell; script_placeholder]

let create
?(starting_timeout=5.)
?(starting_timeout=5.) ?(call_script=fun s -> [default_shell; s])
?(using=`Nohup_setsid) ?(host=Ketrew_host.tmp_on_localhost) program =
let c = {host; program; using_hack = using; starting_timeout } in
let shell_command = call_script script_placeholder in
let c =
{host; program; using_hack = using; starting_timeout; shell_command } in
`Long_running (name, `Created c |> serialize)

let using_hack = function
Expand All @@ -64,7 +70,8 @@ let log =
"Status", s "Created" % sp % parens (s (hack_to_string c.using_hack));
"Host", Ketrew_host.log c.host;
"Program", Ketrew_program.log c.program;
"Starting-timeout", f c.starting_timeout % s " sec."
"Starting-timeout", f c.starting_timeout % s " sec.";
"Call-script", OCaml.list quote c.shell_command;
]
| `Running rp -> [
"Status", s "Running" % sp
Expand Down Expand Up @@ -123,7 +130,7 @@ let query run_parameters item =
| other -> fail Log.(s "Unknown query: " % sf "%S" other)
end

let make_python_script ~out ~err ~pid_file monitored_script_path =
let make_python_script ~out ~err ~pid_file ~call_script monitored_script_path =
fmt "
import os # Miscellaneous OS interfaces.
import sys # System-specific parameters and functions.
Expand Down Expand Up @@ -154,13 +161,16 @@ if __name__ == '__main__':
except OSError, e:
print >>sys.stderr, 'fork #2 failed: %%d (%%s)' %% (e.errno, e.strerror)
sys.exit(1)
p = subprocess.Popen(['bash', '%s'],
p = subprocess.Popen([%s],
cwd='/',
stdout=file('%s', 'w'),
stderr=file('%s', 'w'))
"
(Ketrew_path.to_string pid_file)
(Ketrew_path.to_string monitored_script_path)
(call_script
(Ketrew_path.to_string monitored_script_path)
|> List.map ~f:(fmt "'%s'")
|> String.concat ~sep:", ")
(Ketrew_path.to_string out)
(Ketrew_path.to_string err)

Expand All @@ -185,11 +195,18 @@ let start rp =
>>= fun () ->
let out = out_file_path ~playground in
let err = err_file_path ~playground in
let call_script s =
List.map created.shell_command ~f:(function
| tok when tok = script_placeholder -> s
| other -> other) in
begin match created.using_hack with
| `Nohup_setsid ->
let cmd =
fmt "nohup setsid bash %s > %s 2> %s &"
(Path.to_string_quoted monitored_script_path)
fmt "nohup setsid %s > %s 2> %s &"
(call_script
(Ketrew_path.to_string monitored_script_path)
|> List.map ~f:Filename.quote
|> String.concat ~sep:" ")
(Path.to_string_quoted out) (Path.to_string_quoted err) in
Host.run_shell_command created.host cmd
>>= fun () ->
Expand All @@ -198,7 +215,8 @@ let start rp =
| `Python_daemon ->
let pid_file = Ketrew_monitored_script.pid_file monitored_script in
let content =
make_python_script ~out ~err ~pid_file monitored_script_path in
make_python_script ~out ~err ~pid_file ~call_script
monitored_script_path in
let path = python_hack_path ~playground in
Host.put_file ~content created.host ~path
>>= fun () ->
Expand Down Expand Up @@ -263,7 +281,10 @@ let update run_parameters =
get_log_of_monitored_script ~host:run.created.host ~script:run.script
>>= fun log_opt ->
begin match Option.bind log_opt List.last with
| None -> (* no log at all *)
| None when elapsed <= run.created.starting_timeout ->
(* no log at all *)
return (`Still_running new_run_parameters)
| None ->
return (`Failed (new_run_parameters, "no log file"))
| Some (`Success date) ->
return (`Succeeded new_run_parameters)
Expand Down
6 changes: 6 additions & 0 deletions src/lib/pure/ketrew_daemonize.mli
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@

(** The “standard” plugin-API. *)
include Ketrew_long_running.LONG_RUNNING
with type run_parameters = Ketrew_gen_daemonize_v0.Run_parameters.t

val create:
?starting_timeout:float ->
?call_script:(string -> string list) ->
?using:[ `Nohup_setsid | `Python_daemon] ->
?host:Ketrew_host.t -> Ketrew_program.t ->
[> `Long_running of string * string ]
(** Create a “long-running” {!Ketrew_target.build_process} (run parameters
are already serialized). *)


val default_shell : string
val script_placeholder : string
val default_shell_command : string list
1 change: 1 addition & 0 deletions src/lib/pure/ketrew_edsl.mli
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ val file_target:

val daemonize :
?starting_timeout:float ->
?call_script:(string -> string list) ->
?using:[`Nohup_setsid | `Python_daemon] ->
?host:Host.t ->
Program.t ->
Expand Down
9 changes: 9 additions & 0 deletions src/lib/pure/ketrew_long_running_utilities.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,12 @@ let get_pid_of_monitored_script ~host ~script =
| `Error (`Timeout _ as e) -> fail e
end


let shell_command_output_or_log ~host cmd =
begin Ketrew_host.get_shell_command_output host cmd
>>< function
| `Ok (o, _) -> return o
| `Error e ->
fail Log.(s "Command " % quote cmd % s " on " % Host.log host
% s " failed: " % s (Ketrew_error.to_string e))
end
6 changes: 6 additions & 0 deletions src/lib/pure/ketrew_long_running_utilities.mli
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,9 @@ val get_pid_of_monitored_script :
script:Ketrew_monitored_script.t ->
(int option, [> `Timeout of Time.t ]) Deferred_result.t
(** Fetch and parse the [pid] file of a monitored-script. *)

val shell_command_output_or_log :
host:Ketrew_host.t ->
string -> (string, Ketrew_pervasives.Log.t) Ketrew_pervasives.t
(** Call {!Ketrew_host.get_shell_command_output} and transform errors
into a {!Log.t}. *)
1 change: 1 addition & 0 deletions src/lib/pure/ketrew_plugin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ let default_plugins = [
Daemonize.name, (module Daemonize: LONG_RUNNING);
Ketrew_lsf.name, (module Ketrew_lsf: LONG_RUNNING);
Ketrew_pbs.name, (module Ketrew_pbs: LONG_RUNNING);
Ketrew_yarn.name, (module Ketrew_yarn: LONG_RUNNING);
]
let global_list_of_plugins: (string * (module LONG_RUNNING)) list ref =
ref default_plugins
Expand Down
Loading

0 comments on commit 046a304

Please sign in to comment.