Conversation
5c8c266 to
1941854
Compare
93d5151 to
52f7e9d
Compare
52f7e9d to
03beee6
Compare
23a35de to
3d30d35
Compare
3d30d35 to
18300f7
Compare
| * indexed column, or the type does not match the schema | ||
| * @throws IOException if there is an error scanning the table | ||
| */ | ||
| default CloseableIterator<StructuredRow> scan(Range keyRange, int limit, |
There was a problem hiding this comment.
Currently only implemented in PostgresSql. Will handle nosql in a seperate PR
There was a problem hiding this comment.
this isn't needed anymore right?
|
|
||
| @VisibleForTesting | ||
| // USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS | ||
| public void deleteAllTables() throws IOException { |
There was a problem hiding this comment.
methods like this should be package private
There was a problem hiding this comment.
a better name would be clear() or clearData(). This name makes it sound like the table will be dropped.
| ); | ||
| } | ||
|
|
||
| private long getStartTime(OperationRunId runId) |
There was a problem hiding this comment.
Methods like this can lead to unnecessary DB reads over time, as callers may do a normal read and then call this method. We should remove this and force callers to get the start time from the row.
There was a problem hiding this comment.
Done. Moved the logic directly to caller as there is only one usage
| return fields; | ||
| } | ||
|
|
||
| private OperationRunDetail<?> getCurrentRunDetail(OperationRunId runId) |
There was a problem hiding this comment.
this should be getRunDetail. The current makes it sound like there could be a previous run with the same id.
Also, I'm not sure how this would work with the <?>. What does it get deserialized to?
There was a problem hiding this comment.
The generic type T refers to the type of operation request object. It is not needed for most of the cases when we retrieve the OperationRunDetail except when we plan to launch the operation. The current approach I am taking is based on the operation type the request object type will be determined and the OperationRunDetail will be casted accordingly.
There was a problem hiding this comment.
I'm still confused how this will work, doesn't the type need to be passed into gson in order for it to be deserialized properly? If you just deserialize it here as ?, don't you get a Map or a JsonObject or something as the field? I don't think you'd be able to cast it directly.
|
|
||
| private Collection<Field<?>> getFilterIndexes(OperationRunFilter filter) { | ||
| Collection<Field<?>> filterIndexes = new ArrayList<>(); | ||
| if (filter == null) { |
There was a problem hiding this comment.
don't need to handle his case, we should not allow the filter instance to be null
There was a problem hiding this comment.
filter null represents no filter applied. I have added a new method in OperationRunFilter to get a empty filter to avoid a null check
| * @throws OperationRunNotFoundException run with id does not exist in namespace | ||
| */ | ||
| public OperationRunDetail<?> getOperation(OperationRunId runId) | ||
| throws NotFoundException, IOException { |
There was a problem hiding this comment.
throws OperationRunNotFoundException
| Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun())); | ||
| fields.add( | ||
| Fields.longField(StoreDefinition.OperationRunsStore.UPDATE_TIME_FIELD, | ||
| System.currentTimeMillis())); |
There was a problem hiding this comment.
instead of System.currentTimeMillis(), it would be good to take a Clock in the constructor of this class and use that instead. It would make unit testing simpler, as you can expect a concrete time.
| * @param limit number of result | ||
| * @return a select query | ||
| */ | ||
| private PreparedStatement prepareMultiScanQuery(Collection<Range> singletonRanges, |
There was a problem hiding this comment.
are there any logical changes being made here or is it just getting moved? (In the future, keep methods in the same location so it is easier to review the diff)
There was a problem hiding this comment.
This was done to fix a checkstyle warning about keeping all the override methods together. There is no logical change. I will revert it back to reduce the diff
There was a problem hiding this comment.
I see, if it is fixing checkstyle it's ok to move it.
| * License for the specific language governing permissions and limitations under | ||
| * the License. | ||
| */ | ||
| package io.cdap.cdap.proto.id; |
| @@ -0,0 +1,90 @@ | |||
| /* | |||
| * Copyright © 2015-2019 Cask Data, Inc. | |||
73ae585 to
ce08626
Compare
| * @param txBatchSize batch size of transaction | ||
| * @param consumer {@link Consumer} to process each scanned run | ||
| */ | ||
| public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, |
There was a problem hiding this comment.
Unit test will be added in a followup PR. Will add a TODO
ce08626 to
cb66ab8
Compare
|
|
||
| AtomicReference<ScanOperationRunsRequest> requestRef = new AtomicReference<>(request); | ||
| AtomicReference<String> lastKey = new AtomicReference<>(request.getScanAfter()); | ||
| AtomicInteger currentLimit = new AtomicInteger(request.getLimit()); |
There was a problem hiding this comment.
this looks like it can be a normal int
| * | ||
| * @param request scan request including filters and limit | ||
| * @param txBatchSize batch size of transaction | ||
| * @param consumer {@link Consumer} to process each scanned run |
There was a problem hiding this comment.
javadoc should have @return as well
There was a problem hiding this comment.
though returning whether the limit was reached is kind of a weird thing to return, not sure how it would be used. I would either make it void or return the total number of operations scanned.
There was a problem hiding this comment.
The boolean would be used in the handler to find if all the operations are scanned and send last operation id accordingly. We can use the total number of operation scanned also but wanted to keep this analogous to the application scan
see:
There was a problem hiding this comment.
hm I see, that's ok then
| requestRef.set(batchRequest); | ||
|
|
||
| TransactionRunners.run(transactionRunner, context -> { | ||
| lastKey.set( |
There was a problem hiding this comment.
you can use the run method with a return value instead of setting lastKey in the closure. Then it can be a normal String instead of an AtomicReference.
|
|
||
| private static final String SMALLEST_POSSIBLE_STRING = ""; | ||
|
|
||
| @Inject |
There was a problem hiding this comment.
don't think we would use injection to create the store
| } | ||
|
|
||
| @VisibleForTesting | ||
| // USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS |
| Range range = Range.create(endFields, endBound, startFields, startBound); | ||
| Collection<Field<?>> filterIndexes = getFilterIndexes(request.getFilter()); | ||
| StructuredTable table = getOperationRunsTable(context); | ||
| AtomicReference<String> lastKey = new AtomicReference<>(); |
There was a problem hiding this comment.
this can be a normal String
| /** | ||
| * namespace to return applications for or null for all namespaces. | ||
| */ | ||
| @Nullable |
There was a problem hiding this comment.
this should not be nullable
| } | ||
|
|
||
| /** | ||
| * namespace to return applications for or null for all namespaces. |
There was a problem hiding this comment.
update javadoc to not include null
| public void testScanOperation() throws Exception { | ||
| insertTestRuns(); | ||
|
|
||
| // TODO(samik) verify the actual list |
There was a problem hiding this comment.
should do this now rather than later
| "is not an indexed column or primary key"); | ||
| } | ||
| return scan(keyRange, limit, filterIndexes, Collections.singleton(orderByField), sortOrder, | ||
| true); |
There was a problem hiding this comment.
It is really confusing that this scan method is an 'and' filter while the other ones are an 'or' filter. Can we just make the operation type part of the primary key? That way it can go in the key range and we can use the existing methods without adding more to StructuredTable. Would like to avoid bloating the StructuredTable class, it's already pretty confusing as-is.
FYI, whenever we go to CDAP 7.0, we would like to remove Hadoop support, which would allow us to remove the NoSql implementations which would let us use normal SQL instead of this interface.
There was a problem hiding this comment.
The reason operationtype is not part of the primary key because the operation id and namespace should uniquely identify a operation. This is in line with the API design /namespace/{namespace}/operation/{id}. If we add the operationtype in primary key we have to change the API also.
The API is designed to be compatible with GCP operations api which also does not include types.
I think we do need a option to select how the filters should be joined. I can create two different top level functions for AND and OR filter.
There was a problem hiding this comment.
Removed and filter and added operation type in the range key
af6aa13 to
57a1c6e
Compare
df24aec to
cf0f87b
Compare
| } | ||
| return GSON.fromJson( | ||
| row.get().getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), | ||
| OperationRunDetail.class |
There was a problem hiding this comment.
I'm still not sure how this will work without a concrete class for the generic. Maybe it will be more clear when there is code around the caller using the result, but I get the feeling it will need to revisited.
| * indexed column, or the type does not match the schema | ||
| * @throws IOException if there is an error scanning the table | ||
| */ | ||
| default CloseableIterator<StructuredRow> scan(Range keyRange, int limit, |
There was a problem hiding this comment.
this isn't needed anymore right?
albertshau
left a comment
There was a problem hiding this comment.
lgtm assuming the new scan method is removed from StructuredTable
cf0f87b to
26ad907
Compare
| private final TransactionRunner transactionRunner; | ||
|
|
||
| @Inject | ||
| public OperationLifecycleService(TransactionRunner transactionRunner) { |
There was a problem hiding this comment.
Remove public for injected constructor to avoid direct instantiation.
| /** | ||
| * Service that manages lifecycle of Operation. | ||
| */ | ||
| public class OperationLifecycleService { |
There was a problem hiding this comment.
Usually we only name a class with Service suffix if it is a guava Service. This class seems more like a Manager?
There was a problem hiding this comment.
We do have classes with suffix service like ProgramLifecycleService which is not a guava service but encapsulates all operations we can do upon an entity . I was following the apparent naming convention.
There was a problem hiding this comment.
Updated to use Manager Suffix
| */ | ||
| public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, | ||
| Consumer<OperationRunDetail<?>> consumer) throws OperationRunNotFoundException, IOException { | ||
| AtomicReference<ScanOperationRunsRequest> requestRef = new AtomicReference<>(request); |
There was a problem hiding this comment.
Why we need an atomic reference? Seems like everything is happening in the local thread? A simple local variable would do the job.
| } | ||
| currentLimit -= txBatchSize; | ||
| } | ||
| return currentLimit == 0; |
There was a problem hiding this comment.
If the above loop having lastKey == null, the while loop would break and currentLimit may not be 0. Is this expected? If the caller is also having a while loop until this method return false, would it resulted in infinite loop?
There was a problem hiding this comment.
It is expected to break the loop as lastKey == null will signify no more row left to scan.
The return value here will signify if the page limit was reached. The caller would also need to track the last record scanned as the following example in AppLifeCycleHttpHandler
boolean pageLimitReached = applicationLifecycleService.scanApplications(scanRequest,
appDetail -> {
ApplicationRecord record = new ApplicationRecord(appDetail);
jsonListResponder.send(record);
lastRecord.set(record);
});
ApplicationRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null :
record.getName() + EntityId.IDSTRING_PART_SEPARATOR + record.getAppVersion();
The caller is requierd to update the ScanAfter field in the request for the next call if this method is called from a while loop
| private OperationRunsStore getOperationRunStore(StructuredTableContext context) { | ||
| return new OperationRunsStore(context); | ||
| } | ||
|
|
| /** | ||
| * This class defined various filters that can be applied during operation runs scanning. | ||
| */ | ||
| public class OperationRunFilter { |
There was a problem hiding this comment.
Usually a Filter is kind of a Function instead of just having getters. How is it supposed to be used?
There was a problem hiding this comment.
It is just a container class to encapsulate what filter options are available for operations.
| /** | ||
| * Store for operation runs. | ||
| */ | ||
| public class OperationRunsStore { |
|
|
||
| private final Clock clock; | ||
|
|
||
| private static final String SMALLEST_POSSIBLE_STRING = ""; |
There was a problem hiding this comment.
group static fields together
|
|
||
| private void writeOperationRun(OperationRunId runId, OperationRunDetail<?> detail) | ||
| throws IOException { | ||
| Collection<Field<?>> fields = new ArrayList<>(); |
There was a problem hiding this comment.
Consider using Arrays.asList or ImmutableList.of to improve readability.
| private OperationRunDetail<?> getRunDetail(OperationRunId runId) | ||
| throws IOException, OperationRunNotFoundException { | ||
| Optional<StructuredRow> row = getOperationRunInternal(runId); | ||
| if (!row.isPresent()) { |
There was a problem hiding this comment.
Avoid using isPresent on Optional. In this case, use .map and orElseThrow.
return getOperationRunInternal(runId)
.map(this::rowToRunDetail)
.orElseThrow(() -> new OperationRunNotFoundException(...));
There was a problem hiding this comment.
Please also fix other places that use isPresent and replace with more expressive way.
There was a problem hiding this comment.
Updated. There is one case where we throw error if the row is present. As we can't throw checked error from within ifPresent I have not made any change
26ad907 to
6ef10d8
Compare
|
@chtyim Merging this PR to unblock followup PRs for LRO. Please add comments for any suggested change and I will raise a further PR to solve. |
Added