forked from spotify/luigi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hadoop_jar.py
120 lines (95 loc) · 3.44 KB
/
hadoop_jar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import random
import luigi.hadoop
import luigi.hdfs
logger = logging.getLogger('luigi-interface')
def fix_paths(job):
"""
Coerce input arguments to use temporary files when used for output.
Return a list of temporary file pairs (tmpfile, destination path) and
a list of arguments.
Converts each HdfsTarget to a string for the path.
"""
tmp_files = []
args = []
for x in job.args():
if isinstance(x, luigi.hdfs.HdfsTarget): # input/output
if x.exists() or not job.atomic_output(): # input
args.append(x.path)
else: # output
x_path_no_slash = x.path[:-1] if x.path[-1] == '/' else x.path
y = luigi.hdfs.HdfsTarget(x_path_no_slash + '-luigi-tmp-%09d' % random.randrange(0, 1e10))
tmp_files.append((y, x_path_no_slash))
logger.info('Using temp path: %s for path %s', y.path, x.path)
args.append(y.path)
else:
args.append(str(x))
return (tmp_files, args)
class HadoopJarJobRunner(luigi.hadoop.JobRunner):
"""
JobRunner for `hadoop jar` commands. Used to run a HadoopJarJobTask.
"""
def __init__(self):
pass
def run_job(self, job):
# TODO(jcrobak): libjars, files, etc. Can refactor out of
# hadoop.HadoopJobRunner
if not job.jar() or not os.path.exists(job.jar()):
logger.error("Can't find jar: %s, full path %s", job.jar(), os.path.abspath(job.jar()))
raise Exception("job jar does not exist")
arglist = luigi.hdfs.load_hadoop_cmd() + ['jar', job.jar()]
if job.main():
arglist.append(job.main())
jobconfs = job.jobconfs()
for jc in jobconfs:
arglist += ['-D' + jc]
(tmp_files, job_args) = fix_paths(job)
arglist += job_args
luigi.hadoop.run_and_track_hadoop_job(arglist)
for a, b in tmp_files:
a.move(b)
class HadoopJarJobTask(luigi.hadoop.BaseHadoopJobTask):
"""
A job task for `hadoop jar` commands that define a jar and (optional) main method.
"""
def jar(self):
"""
Path to the jar for this Hadoop Job.
"""
return None
def main(self):
"""
optional main method for this Hadoop Job.
"""
return None
def job_runner(self):
# We recommend that you define a subclass, override this method and set up your own config
return HadoopJarJobRunner()
def atomic_output(self):
"""
If True, then rewrite output arguments to be temp locations and
atomically move them into place after the job finishes.
"""
return True
def args(self):
"""
Returns an array of args to pass to the job (after hadoop jar <jar> <main>).
"""
return []