-
Notifications
You must be signed in to change notification settings - Fork 768
SOLR-16628: Ensure that InputStreams are closed after Xml parsing #1302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
974c749
f198643
b648786
9507e1a
cd4923f
f12517d
3a047d4
a14fc44
3ee812e
1653dcd
6216132
39c5620
75fe9a0
ad2bca6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -24,6 +24,9 @@ | |||||||||
import java.nio.file.Files; | ||||||||||
import java.nio.file.Path; | ||||||||||
import java.nio.file.Paths; | ||||||||||
import java.util.AbstractMap.SimpleImmutableEntry; | ||||||||||
import java.util.Map.Entry; | ||||||||||
import net.jcip.annotations.NotThreadSafe; | ||||||||||
import org.apache.commons.io.IOUtils; | ||||||||||
import org.apache.solr.cloud.ZkController; | ||||||||||
import org.apache.solr.cloud.ZkSolrResourceLoader; | ||||||||||
|
@@ -48,6 +51,7 @@ | |||||||||
import org.xml.sax.InputSource; | ||||||||||
|
||||||||||
/** Factory for ManagedIndexSchema */ | ||||||||||
@NotThreadSafe | ||||||||||
public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements SolrCoreAware { | ||||||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); | ||||||||||
public static final String UPGRADED_SCHEMA_EXTENSION = ".bak"; | ||||||||||
|
@@ -76,7 +80,6 @@ public SolrResourceLoader getResourceLoader() { | |||||||||
private SolrCore core; | ||||||||||
private ZkIndexSchemaReader zkIndexSchemaReader; | ||||||||||
|
||||||||||
private String loadedResource; | ||||||||||
private boolean shouldUpgrade = false; | ||||||||||
|
||||||||||
@Override | ||||||||||
|
@@ -197,89 +200,99 @@ public ManagedIndexSchema create( | |||||||||
this.config = config; | ||||||||||
this.loader = config.getResourceLoader(); | ||||||||||
InputStream schemaInputStream = null; | ||||||||||
String loadedResource = null; | ||||||||||
|
||||||||||
if (null == resourceName) { | ||||||||||
resourceName = IndexSchema.DEFAULT_SCHEMA_FILE; | ||||||||||
} | ||||||||||
|
||||||||||
int schemaZkVersion = -1; | ||||||||||
if (!(loader instanceof ZkSolrResourceLoader)) { | ||||||||||
schemaInputStream = readSchemaLocally(); | ||||||||||
} else { // ZooKeeper | ||||||||||
final ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader) loader; | ||||||||||
final SolrZkClient zkClient = zkLoader.getZkController().getZkClient(); | ||||||||||
final String managedSchemaPath = lookupZKManagedSchemaPath(); | ||||||||||
managedSchemaResourceName = | ||||||||||
managedSchemaPath.substring(managedSchemaPath.lastIndexOf("/") + 1); // not loving this | ||||||||||
Stat stat = new Stat(); | ||||||||||
try { | ||||||||||
// Attempt to load the managed schema | ||||||||||
byte[] data = zkClient.getData(managedSchemaPath, null, stat, true); | ||||||||||
schemaZkVersion = stat.getVersion(); | ||||||||||
schemaInputStream = | ||||||||||
new ZkSolrResourceLoader.ZkByteArrayInputStream(data, managedSchemaPath, stat); | ||||||||||
loadedResource = managedSchemaResourceName; | ||||||||||
warnIfNonManagedSchemaExists(); | ||||||||||
} catch (InterruptedException e) { | ||||||||||
// Restore the interrupted status | ||||||||||
Thread.currentThread().interrupt(); | ||||||||||
log.warn("", e); | ||||||||||
} catch (KeeperException.NoNodeException e) { | ||||||||||
log.info( | ||||||||||
"The schema is configured as managed, but managed schema resource {} not found - loading non-managed schema {} instead", | ||||||||||
managedSchemaResourceName, | ||||||||||
resourceName); | ||||||||||
} catch (KeeperException e) { | ||||||||||
String msg = "Error attempting to access " + managedSchemaPath; | ||||||||||
log.error(msg, e); | ||||||||||
throw new SolrException(ErrorCode.SERVER_ERROR, msg, e); | ||||||||||
try { | ||||||||||
if (null == resourceName) { | ||||||||||
resourceName = IndexSchema.DEFAULT_SCHEMA_FILE; | ||||||||||
} | ||||||||||
if (null == schemaInputStream) { | ||||||||||
// The managed schema file could not be found - load the non-managed schema | ||||||||||
|
||||||||||
int schemaZkVersion = -1; | ||||||||||
if (!(loader instanceof ZkSolrResourceLoader)) { | ||||||||||
Entry<String, InputStream> localSchemaInput = readSchemaLocally(); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. THREAD_SAFETY_VIOLATION: Unprotected write. Non-private method ℹ️ Expand to see all @sonatype-lift commandsYou can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.
Note: When talking to LiftBot, you need to refresh the page to see its response. Help us improve LIFT! (Sonatype LiftBot external survey) Was this a good recommendation for you? Answering this survey will not impact your Lift settings. [ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It sucks to still see this because we have a NotThreadSafe annotation, thus I wasn't anticipating the checks. From reading the Infer/RacerD docs in more detail, it appears this is because this class, despite not being ThreadSafe, and advertising itself as such nonetheless contains the "synchronized" keyword in one spot at the end of create(). The presence of the keyword in the code takes precedence over NotThreadSafe annotation. It seems to me we don't need that synchronized there because the schema is not published yet. Any way, such changes / explorations don't belong here. I'll create a PR to toy with this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, ok, that makes sense! Thanks for following up on this. I expect to merge/backport tomorrow. |
||||||||||
loadedResource = localSchemaInput.getKey(); | ||||||||||
schemaInputStream = localSchemaInput.getValue(); | ||||||||||
} else { // ZooKeeper | ||||||||||
final ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader) loader; | ||||||||||
final SolrZkClient zkClient = zkLoader.getZkController().getZkClient(); | ||||||||||
final String managedSchemaPath = lookupZKManagedSchemaPath(); | ||||||||||
managedSchemaResourceName = | ||||||||||
managedSchemaPath.substring(managedSchemaPath.lastIndexOf("/") + 1); // not loving this | ||||||||||
Stat stat = new Stat(); | ||||||||||
try { | ||||||||||
schemaInputStream = loader.openResource(resourceName); | ||||||||||
loadedResource = resourceName; | ||||||||||
shouldUpgrade = true; | ||||||||||
} catch (IOException e) { | ||||||||||
// Attempt to load the managed schema | ||||||||||
byte[] data = zkClient.getData(managedSchemaPath, null, stat, true); | ||||||||||
schemaZkVersion = stat.getVersion(); | ||||||||||
schemaInputStream = | ||||||||||
new ZkSolrResourceLoader.ZkByteArrayInputStream(data, managedSchemaPath, stat); | ||||||||||
loadedResource = managedSchemaResourceName; | ||||||||||
warnIfNonManagedSchemaExists(); | ||||||||||
} catch (InterruptedException e) { | ||||||||||
// Restore the interrupted status | ||||||||||
Thread.currentThread().interrupt(); | ||||||||||
log.warn("", e); | ||||||||||
} catch (KeeperException.NoNodeException e) { | ||||||||||
log.info( | ||||||||||
"The schema is configured as managed, but managed schema resource {} not found - loading non-managed schema {} instead", | ||||||||||
managedSchemaResourceName, | ||||||||||
resourceName); | ||||||||||
} catch (KeeperException e) { | ||||||||||
String msg = "Error attempting to access " + managedSchemaPath; | ||||||||||
log.error(msg, e); | ||||||||||
throw new SolrException(ErrorCode.SERVER_ERROR, msg, e); | ||||||||||
} | ||||||||||
if (null == schemaInputStream) { | ||||||||||
// The managed schema file could not be found - load the non-managed schema | ||||||||||
try { | ||||||||||
// Retry to load the managed schema, in case it was created since the first attempt | ||||||||||
byte[] data = zkClient.getData(managedSchemaPath, null, stat, true); | ||||||||||
schemaZkVersion = stat.getVersion(); | ||||||||||
schemaInputStream = new ByteArrayInputStream(data); | ||||||||||
loadedResource = managedSchemaPath; | ||||||||||
warnIfNonManagedSchemaExists(); | ||||||||||
} catch (Exception e1) { | ||||||||||
if (e1 instanceof InterruptedException) { | ||||||||||
Thread.currentThread().interrupt(); // Restore the interrupted status | ||||||||||
schemaInputStream = loader.openResource(resourceName); | ||||||||||
loadedResource = resourceName; | ||||||||||
shouldUpgrade = true; | ||||||||||
} catch (IOException e) { | ||||||||||
try { | ||||||||||
// Retry to load the managed schema, in case it was created since the first attempt | ||||||||||
byte[] data = zkClient.getData(managedSchemaPath, null, stat, true); | ||||||||||
schemaZkVersion = stat.getVersion(); | ||||||||||
schemaInputStream = new ByteArrayInputStream(data); | ||||||||||
loadedResource = managedSchemaPath; | ||||||||||
warnIfNonManagedSchemaExists(); | ||||||||||
} catch (Exception e1) { | ||||||||||
if (e1 instanceof InterruptedException) { | ||||||||||
Thread.currentThread().interrupt(); // Restore the interrupted status | ||||||||||
} | ||||||||||
final String msg = | ||||||||||
"Error loading both non-managed schema '" | ||||||||||
+ resourceName | ||||||||||
+ "' and managed schema '" | ||||||||||
+ managedSchemaResourceName | ||||||||||
+ "'"; | ||||||||||
log.error(msg, e); | ||||||||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg, e); | ||||||||||
} | ||||||||||
final String msg = | ||||||||||
"Error loading both non-managed schema '" | ||||||||||
+ resourceName | ||||||||||
+ "' and managed schema '" | ||||||||||
+ managedSchemaResourceName | ||||||||||
+ "'"; | ||||||||||
log.error(msg, e); | ||||||||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg, e); | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
assert loadedResource != null; | ||||||||||
InputSource inputSource = new InputSource(schemaInputStream); | ||||||||||
inputSource.setSystemId(SystemIdResolver.createSystemIdFromResourceName(loadedResource)); | ||||||||||
dsmiley marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
try { | ||||||||||
schema = | ||||||||||
new ManagedIndexSchema( | ||||||||||
config, | ||||||||||
loadedResource, | ||||||||||
IndexSchemaFactory.getConfigResource( | ||||||||||
configSetService, schemaInputStream, loader, managedSchemaResourceName), | ||||||||||
isMutable, | ||||||||||
managedSchemaResourceName, | ||||||||||
schemaZkVersion, | ||||||||||
getSchemaUpdateLock()); | ||||||||||
} catch (Exception e) { | ||||||||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Error loading parsing schema", e); | ||||||||||
} | ||||||||||
} finally { | ||||||||||
// XML Parser should close but in exceptional cases might not; so let's be safe | ||||||||||
IOUtils.closeQuietly(schemaInputStream); | ||||||||||
} | ||||||||||
InputSource inputSource = new InputSource(schemaInputStream); | ||||||||||
inputSource.setSystemId(SystemIdResolver.createSystemIdFromResourceName(loadedResource)); | ||||||||||
try { | ||||||||||
schema = | ||||||||||
new ManagedIndexSchema( | ||||||||||
config, | ||||||||||
loadedResource, | ||||||||||
IndexSchemaFactory.getConfigResource( | ||||||||||
configSetService, schemaInputStream, loader, managedSchemaResourceName), | ||||||||||
isMutable, | ||||||||||
managedSchemaResourceName, | ||||||||||
schemaZkVersion, | ||||||||||
getSchemaUpdateLock()); | ||||||||||
} catch (Exception e) { | ||||||||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Error loading parsing schema", e); | ||||||||||
} | ||||||||||
|
||||||||||
if (shouldUpgrade) { | ||||||||||
// Persist the managed schema if it doesn't already exist | ||||||||||
synchronized (schema.getSchemaUpdateLock()) { | ||||||||||
|
@@ -290,8 +303,9 @@ public ManagedIndexSchema create( | |||||||||
return schema; | ||||||||||
} | ||||||||||
|
||||||||||
private InputStream readSchemaLocally() { | ||||||||||
private Entry<String, InputStream> readSchemaLocally() { | ||||||||||
InputStream schemaInputStream = null; | ||||||||||
String loadedResource = null; | ||||||||||
try { | ||||||||||
// Attempt to load the managed schema | ||||||||||
final Path managedSchemaPath = lookupLocalManagedSchemaPath(); | ||||||||||
|
@@ -323,7 +337,8 @@ private InputStream readSchemaLocally() { | |||||||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg, e); | ||||||||||
} | ||||||||||
} | ||||||||||
return schemaInputStream; | ||||||||||
assert loadedResource != null; | ||||||||||
return new SimpleImmutableEntry<>(loadedResource, schemaInputStream); | ||||||||||
} | ||||||||||
|
||||||||||
/** Return whether a non-managed schema exists, either in local storage or on ZooKeeper. */ | ||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.