APEXMALHAR-2473 Support for global cache meta information in db Cache… #605
APEXMALHAR-2473 Support for global cache meta information in db Cache… #605
Conversation
948fc9d
to
206b78e
Compare
while ((line = bin.readLine()) != null) { | ||
if ( numInitCachedLines > 0 && linesCount >= numInitCachedLines) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be added to the while condition
FSDataInputStream in = null; | ||
BufferedReader bin = null; | ||
try { | ||
this.connect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why open file system for each call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also shouldn't fs be already initialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I added a function where it tries to open a stream with the initialized file system. If it happens to fail it will try to initialize file system once again.
try { | ||
this.connect(); | ||
in = fs.open(filePath); | ||
bin = new BufferedReader(new InputStreamReader(in)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also do... try with close
return getValue(tuple); | ||
} | ||
} catch (Exception parseExp) { | ||
logger.info("Unable to parse line {}", line); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warn level
protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>(); | ||
protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>(); | ||
|
||
private CacheManager cacheManager = new NullValuesCacheManager(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enricher is initializing cacheManager during activate but doesn't seem to be closing it in deactivate.
public static class CacheContext implements Context | ||
{ | ||
public static final transient Attribute<Boolean> READ_ONLY_ATTR = new Attribute<>((false)); | ||
public static final transient Attribute<Integer> NUM_INIT_CACHED_LINES_ATTR = new Attribute<>((-1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checks in other places might be straightforward if it is initialized to Integer.MAX_VALUE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it would slightly change the behavior, e.g. in FSLoader it has no upper bound if num_init_cached_lines_attr was not set. With the change Integer.MAX_VALUE would always be the default limit there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be necessary to leave as is to keep functionality of CacheStore (primary store) to not limit maxCacheSize explicitly when numInitCacheLines is not set, but rise maxCacheSize if a greater numInitCacheLines is chosen.
{ | ||
@NotNull | ||
private String fileName; | ||
|
||
private transient Path filePath; | ||
private transient FileSystem fs; | ||
private transient boolean connected; | ||
private transient int numInitCachedLines = -1; | ||
private transient CacheManager.CacheContext cacheContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a need to save the cache context?
this.numInitCachedLines = numInitCachedLines; | ||
} | ||
|
||
public boolean getReadOnlyAttribute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just readOnlyData instead of Attribute in name
primary.getClass().getMethod("setup", parameters); | ||
attributeMap.put(CacheContext.READ_ONLY_ATTR, readOnlyAttribute); | ||
attributeMap.put(CacheContext.NUM_INIT_CACHED_LINES_ATTR, numInitCachedLines); | ||
((Component)primary).setup(new CacheContext(attributeMap)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about reusing the CacheContext instance between the primary and backup initialization. Looks like same attributeMap is being re-written.
34e9f3d
to
5d39b4b
Compare
@@ -185,13 +185,18 @@ public void activate(Context context) | |||
try { | |||
cacheManager.initialize(); | |||
} catch (IOException e) { | |||
throw new RuntimeException("Unable to initialize primary cache", e); | |||
throw new RuntimeException("Unable to initialize cache store", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would be just cache store initialization failure, the backup could fail as well, isn't it. Shouldn't this be unable to initialize cache manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant generally a store of the cache, but I see the confusion with 'CacheStore'. I will change it accordingly.
try { | ||
Map<Object, Object> result; | ||
try ( | ||
FSDataInputStream in = fs.open(filePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the input stream closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try-with-resources should close it automatically
try { | ||
Map<String, Object> tuple = extractFields(line); | ||
if (tuple != null && !tuple.isEmpty()) { | ||
result.put(getKey(tuple), getValue(tuple)); | ||
} | ||
} catch (Exception parseExp) { | ||
logger.info("Unable to parse line {}", line); | ||
logger.warn("Unable to parse line {}", line); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass exception as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is initialization, you might as well fail with a runtime exception.
@@ -145,9 +136,44 @@ private Object getKey(Map<String, Object> tuple) | |||
@Override | |||
public Object get(Object key) | |||
{ | |||
try ( | |||
FSDataInputStream in = openFsInputStream(filePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close input stream when done
return getValue(tuple); | ||
} | ||
} catch (Exception parseExp) { | ||
logger.warn("Unable to parse line {}", line); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fail with runtime exception if unable to parse line.
return null; | ||
} | ||
|
||
private FSDataInputStream openFsInputStream(Path filePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call it check open or something that indicates it is not a mere open but will try to handle failures and retry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about openWithRetryFsInputStream(..)?
if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_ACCESS) { | ||
cacheBuilder.expireAfterAccess(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS); | ||
} else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) { | ||
cacheBuilder.expireAfterWrite(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS); | ||
} | ||
cache = cacheBuilder.build(); | ||
|
||
if (entryExpiryStrategy == ExpiryType.NO_EVICTION) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In disconnect, cleanupSchedule is referenced as well, in disconnect check for not null
@Override | ||
public Map<Object, Object> loadInitialData() | ||
{ | ||
Map<Object, Object> result = new HashedMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the standard HashMap?
@@ -137,7 +137,7 @@ private Object getKey(Map<String, Object> tuple) | |||
public Object get(Object key) | |||
{ | |||
try ( | |||
FSDataInputStream in = openFsInputStream(filePath); | |||
FSDataInputStream in = openWithRetryFsInputStream(filePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like stream is still not being closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try-with-resources should close it automatically.
squash commits and rebase |
fff7bac
to
36b44d0
Compare
} catch (IOException e) { | ||
try { | ||
this.connect(); | ||
fs.open(filePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in = fs.open(filePath)
36b44d0
to
6be30b7
Compare
…Manager 1. Uses Component interface and newly implemented CacheContext to pass properties to the Stores by calling setup(CacheContext). 2. APEXMALHAR-2474: FSLoader implements Component to get numInitLinesToCache from CacheManager and use it in initial load. Add implementation of get() function so data will also be loaded after initial load. 3. APEXMALHAR-2475: CacheStore implements Component for passing readOnly and numInitLinesToCache. Added NO_EVICTION expire strategy. This strategy will be set in setup(CacheContext) if readOnly is true.
6be30b7
to
118a75f
Compare
…Manager
Add implementation of get() function so data will also be loaded after initial load.
Added NO_EVICTION expire strategy. This strategy will be set in setup(CacheContext) if readOnly is true.
@PramodSSImmaneni: please review