-
Notifications
You must be signed in to change notification settings - Fork 2
[WIP] pyDKB: stage fix #151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
If run Stage without any parameters, it fails to `configure()` with
error:
```
File "./../../pyDKB/dataflow/stage/ProcessorStage.py", line 179, in
configure
self.__input = consumer.ConsumerBuilder(vars(self.ARGS)) \
TypeError: vars() argument must have __dict__ attribute
```
To fix it we need to ensure ARGS is initialized in any scenario.
We could initialize ARGS as `argparse.Namespace()` during the object
initialization, but it is better to run parse_args (just in case stage
has some required parameters).
When FileConsumer takes input file names from STDIN and is interrupted by `Ctrl+C`, `close()` method fails as tries to call `stdin.tell()` when STDIN is already closed: ``` $ ./json2TTL.py input/NDjson.json (INFO) (FileProducer) Output directory is set to subdirectory 'out' of the one containing input files or of the current one (/home/mgolosova/dkb/Utils/Dataflow/test/pyDKB). (INFO) (ProcessorStage) Interrupted by user. (INFO) (ProcessorStage) Stopping stage. (ERROR) (ProcessorStage) Failed to stop <pyDKB.dataflow.communication.consumer.FileConsumer.FileConsumer object at 0x1bdf090>: I/O operation on closed file (DEBUG) (ProcessorStage) Traceback (most recent call last): (==) File "./../../pyDKB/dataflow/stage/ProcessorStage.py", line 258, in stop (==) p.close() (==) File "./../../pyDKB/dataflow/communication/consumer/Consumer.py", line 141, in close (==) for s in (self.get_stream(), self.get_source()): (==) File "./../../pyDKB/dataflow/communication/consumer/Consumer.py", line 84, in get_stream (==) if self.reset_stream(): (==) File "./../../pyDKB/dataflow/communication/consumer/Consumer.py", line 92, in reset_stream (==) src = self.get_source() (==) File "./../../pyDKB/dataflow/communication/consumer/FileConsumer.py", line 66, in get_source (==) if self.source_is_empty() is not False: (==) File "./../../pyDKB/dataflow/communication/consumer/FileConsumer.py", line 56, in source_is_empty (==) return fd.tell() == f['size'] (==) ValueError: I/O operation on closed file ```
When stage read input with custom EOM, it read first message, and then 'resetted' input stream, loosing buffered in `custom_readline()` generator data. Now `InputStream` resets iterator only with `force` parameter or when the stream (`fd`) has changed, and thus does not create new `custom_readline()` generator when it is not needed.
`FileConsumer.source_is_empty()`, used to check if current (file) data source is fully read, checked current cursor position against file size; but when custom EOM is used, `custom_readline()` reads data into local buffer and thus file can be fully read, while not all the values from the buffer are processed. To fix this issue new method `InputStream.is_empty()` is added with different implementations: for ordinar file reading (used to work with `'\n'` or empty EOM) and for `custom_readline()` generator.
|
It seems that there is a typo in get_readable_stream description: "nearset" instead of "nearest". |
`get_source()` tried to skip empty sources, but as the functional to detect if source is empty was moved to the InputStream object, we got collision: current `_stream` was not reset until `get_source()` returns new non-empty source; but `get_source()` relied on the InputStream's methods and always saw that current source (or, actually, current stream) is empty. Now logic is simplified, recursion-like method calls are removed: * `get_source()` returns current source (or, if source is not defined yet, the next one); * `get_stream()` returns stream "as is" (or, if stream is not defined yet, initializes it with current source and returns new one); * `next_source()` returns next source; * `reset_stream()` resets stream to the next source. Yet to read new message from the InputStream object, we still need to reset it to the "nearest readable source" (or else we can not say if it is just current source is over or we run out of sources at all). So new method is added: * `get_readable_stream()` returns stream, resetted to the nearest non-empty source.
d1090b0 to
b05aa1f
Compare
|
@Evildoor, |
Found out that after all the changes test stage still does not work as expected. Details are in the comment below.
|
Checked the test stage before merging and here is what I saw:
I think I`m gonna add a checklist to the PR description with all the usecases to be tested before asking for a review, approve or merge. Without it we risk to miss something again. |
As '--help' option has description, starting with lower-case letter and ends without period, I believe it is better to follow this paradigm in our own descriptions.
Now for parameters with default value it is automatically added to the description as the last line saying "DEFAULT: 'value'". For `-e` and `-E` parameters default value depends on the `--mode`, so for them this information is added manually.
For parameters with actions like `store_true`, `store_false`, `store_const` DEFAULT value has no special meaning, so is excessive ni help message.
Empty string or None makes no difference, while None will not be displayed in help message.
Most of the parameter values due to the `nargs='?'` were displayed in help message as optional, while being required. It means, that this: ``` ./stage.py -m ``` must raise error, but help said: ``` usage: json2TTL.py [-h] [-m [MODE]] ... ``` , as if `MODE` was optional. It happened due to the fact that `nargs` does not have value, that would be interpreted as "get exactly one value" AND would not return this value as a list (as `nargs=1` would do). Careful reading showed that "exactly one" is the default behaviour, so omitting `nargs` gives the desired result.
Now the description explains that this parameter is essentially a shortcut for combination of other parameters and how it interacts with explicitly specified "--source", "--dest", etc.
0892cee to
df1e763
Compare
|
I`m closing this PR, as most of the fixes are done in the series of This PR could be left open as a reminder, but I don`t think it is a good idea: we have other tools for task tracking, after all. |
There were some issues with Stage:
The PR fixes these issues.
To check the behaviour I suggest to use test stage in
Utils/Dataflow/test/pyDKB.With which set of parameters to run this stage and what to expect [3]:
-m f(should claim that no input files specified);-m f input/NDjson.json(should produce output fileinput/out/NDjson.ttl[1] or claim that file exists);;input/NDjson.json(should produce output fileinput/out/NDjson.ttlor claim that file exists)I believe that first two items shows that
-m fand "without parameters" are the same, so 3rd and 4th are the same too.-i ./input NDjson.json(should produce output fileinput/out/NDjson.ttlor claim that file exists);-i ./input(should produce output fileinput/out/NDjson.ttl(or claim that file exists) and claim that fail reading messages fromjsonArray.json);-o ./output input/NDjson.json(should produce output fileoutput/NDjson.ttlor claim that the file exists);-o output input/NDjson.json(should produce output fileinput/output/NDjson.ttlor claim that the file exists);-d s input/NDjson.json(should write output to STDOUT);-s s(should take data from STDIN and write output toout/???.ttlfiles [2]);--hdfs(should claim that no input files specified);--hdfs /path/to/hdfs/file.json(should produce output file/path/to/hdfs/out/file.ttlin HDFS or claim that the file already exists);-m f --hdfs /path/to/hdfs/file.json(should produce output file/path/to/hdfs/out/file.ttlin HDFS or claim that the file already exists);-d h input/NDjson.json(should produce output file/user/DKB/temp/out/NDjson.ttlin HDFS or claim that the file already exists);-s h /path/to/hdfs/file.json(should produce output fileout/file.ttlor claim that the file already exists);--hdfs -o /path/to/hdfs/dir /path/to/hdfs/file.json(should produce output file/path/to/hdfs/dir/file.ttlin HDFS or claim that the file already exists);-s h -i /path/to/hdfs file.json(should processhdfs:///path/to/hdfs/file.jsonand produce local output fileout/file.ttlor claim that the file already exists);-s h -i /path/to/hdfs(should produce localout/*.ttloutput files in current directory for each*.jsonfile from the HDFS directory or claim that the file already exists);-m s) should add EOP marker to STDOUT;-m s(should run in stream mode: take input data from STDIN and write output data to STDOUT);-m s -d f(should take input data from STDIN and write output data to./out/???.ttl( ==-s s==-m f -s s));-m s -d h(should take input data from STDIN and write output data to/user/DKB/temp/out/???.ttl( ==-s s -d h==--hdfs -s s));-m s -s f(should claim that no input files is specified ( ==-s f -d s==-d s==-m f -d s));-m s -s h(should claim that no input files is specified ( ==-s h -d s==--hdfs -d s==-m f -s h -d s));-m m(should run in MapReduce mode: take input data from STDIN (line by line) and write output data to STDOUT ( ~=-m s, except that no EOP marker appears in output);-m m -s s(==-m m);-m m -d s(==-m m);-m m -s h(should take input HDFS file names from STDIN and write output data to STDOUT);-m m -d h(should take input from STDIN and write output data to/user/DKB/temp/out/???.ttl);-m m --hdfs(should take input HDFS file names from STDIN and write output data to/user/DKB/temp/out/???.ttl);-m m -s f(should claim that given source is not supported in (m)ap-reduce mode);-m m -d f(should claim that given destination is not supported in (m)ap-reduce mode);-d s input/EOMDjson.json(should fail to read input data);-d s -e EOM input/NDjson.json(should fail to read input data);-d s -e EOM input/EOMDjson.json(should output processed data to STDOUT with same EOM);-d s -E EOP input/NDjson.json(should output EOP after every message);-d s -e '' input/jsonArray.json(should output processed data to STDOUT, also in form of JSON array (or hash, if there`s only one message)) [4];-d s -e '' input/NDjson.json(should fail to read input data);Not sure if this list is complete and that "expected" behaviour is intuitive, so if looking on cmdline parameters one expects different behaviour, please tell.
[1]
NDjson.ttlmust contain (Pythonic) string representation of JSON strings fromNDjson.json.[2] ??? is timestamp (seconds) of the moment when the filename is being chosen.
[3] Stage help message was changed to eliminate disputable moments; so everything said above is to be reviewd according to it.
[4] Seems like there`s a problem: if EOM == '', it allows to read JSON array -- but how output messages should be delimited? Maybe they should be collected into a single JSON array, too?(solved in PR #171)ToDo:
'\n';*Stage.args_parse();Waits for: #149(merged)Stage help/usage message changes are copied to a separate PR (#171) (merged)
Some of the errors are going to be fixed in #172 (re-refactoring).