forked from spotify/luigi
/
ftp_experiment_outputs.py
56 lines (43 loc) · 1.6 KB
/
ftp_experiment_outputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import luigi
from luigi.contrib.ftp import RemoteTarget
HOST = "some_host"
USER = "user"
PWD = "some_password"
DIR = "/"
class ExperimentTask(luigi.ExternalTask):
''' This class represents something that was created elsewhere by an external process,
so all we want to do is to implement the output method.
'''
def output(self):
return RemoteTarget('/experiment/output1.txt', HOST, username=USER, password=PWD)
def run(self):
with self.output().open('w') as outfile:
print >> outfile, "data 0 200 10 50 60"
print >> outfile, "data 1 190 9 52 60"
print >> outfile, "data 2 200 10 52 60"
print >> outfile, "data 3 195 1 52 60"
class ProcessingTask(luigi.Task):
''' This class represents something that was created elsewhere by an external process,
so all we want to do is to implement the output method.
'''
def requires(self):
return ExperimentTask()
def output(self):
return luigi.LocalTarget('/tmp/processeddata.txt')
def run(self):
avg = 0.0
elements = 0
sumval = 0.0
# Target objects are a file system/format abstraction and this will return a file stream object
for line in self.input().open('r'):
values = line.split(" ")
avg += float(values[2])
sumval += float(values[3])
elements = elements + 1
# average
avg = avg / elements
# save calculated values
with self.output().open('w') as outfile:
print >> outfile, avg, sumval
if __name__ == '__main__':
luigi.run()