Skip to content

Commit

Permalink
Removed all synchronized blocks and replace them to read/write lock
Browse files Browse the repository at this point in the history
  • Loading branch information
jongyoul committed Apr 17, 2018
1 parent 0e82a57 commit 24be692
Showing 1 changed file with 124 additions and 110 deletions.
Expand Up @@ -24,6 +24,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -103,6 +104,7 @@ public class InterpreterSettingManager {
*/
private final Map<String, InterpreterSetting> interpreterSettings =
Maps.newConcurrentMap();
private final ReentrantReadWriteLock interpreterSettingsLock = new ReentrantReadWriteLock();

/**
* noteId --> list of InterpreterSettingId
Expand Down Expand Up @@ -199,7 +201,9 @@ private void loadFromFile() throws IOException {
for (InterpreterSetting interpreterSettingTemplate : interpreterSettingTemplates.values()) {
InterpreterSetting interpreterSetting = new InterpreterSetting(interpreterSettingTemplate);
initInterpreterSetting(interpreterSetting);
interpreterSettingsLock.writeLock().lock();
interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
interpreterSettingsLock.writeLock().unlock();
}
return;
}
Expand All @@ -211,9 +215,11 @@ private void loadFromFile() throws IOException {
List<String> oldSettingIdList = entry.getValue();
List<String> newSettingIdList = new ArrayList<>();
for (String oldId : oldSettingIdList) {
interpreterSettingsLock.readLock().lock();
if (infoSaving.interpreterSettings.containsKey(oldId)) {
newSettingIdList.add(infoSaving.interpreterSettings.get(oldId).getName());
};
}
interpreterSettingsLock.readLock().unlock();
}
newBindingMap.put(noteId, newSettingIdList);
}
Expand Down Expand Up @@ -271,15 +277,19 @@ private void loadFromFile() throws IOException {

// Overwrite the default InterpreterSetting we registered from InterpreterSetting Templates
// remove it first
interpreterSettingsLock.writeLock().lock();
for (InterpreterSetting setting : interpreterSettings.values()) {
if (setting.getName().equals(savedInterpreterSetting.getName())) {
interpreterSettings.remove(setting.getId());
}
}
interpreterSettingsLock.writeLock().unlock();
savedInterpreterSetting.postProcessing();
LOGGER.info("Create Interpreter Setting {} from interpreter.json",
savedInterpreterSetting.getName());
interpreterSettingsLock.writeLock().lock();
interpreterSettings.put(savedInterpreterSetting.getId(), savedInterpreterSetting);
interpreterSettingsLock.writeLock().unlock();
}

if (infoSaving.interpreterRepositories != null) {
Expand All @@ -291,20 +301,20 @@ private void loadFromFile() throws IOException {

// force interpreter dependencies loading once the
// repositories have been loaded.
interpreterSettingsLock.readLock().lock();
for (InterpreterSetting setting : interpreterSettings.values()) {
setting.setDependencies(setting.getDependencies());
}
interpreterSettingsLock.readLock().unlock();
}
}

public void saveToFile() throws IOException {
synchronized (interpreterSettings) {
InterpreterInfoSaving info = new InterpreterInfoSaving();
info.interpreterBindings = interpreterBindings;
info.interpreterSettings = interpreterSettings;
info.interpreterRepositories = interpreterRepositories;
configStorage.save(info);
}
InterpreterInfoSaving info = new InterpreterInfoSaving();
info.interpreterBindings = interpreterBindings;
info.interpreterSettings = Maps.newHashMap(interpreterSettings);
info.interpreterRepositories = interpreterRepositories;
configStorage.save(info);
}

private void init() throws IOException {
Expand Down Expand Up @@ -439,7 +449,7 @@ public InterpreterSetting getDefaultInterpreterSetting(String noteId) {

public List<InterpreterSetting> getInterpreterSettings(String noteId) {
List<InterpreterSetting> settings = new ArrayList<>();
synchronized (interpreterSettings) {
interpreterSettingsLock.readLock().lock();
List<String> interpreterSettingIds = interpreterBindings.get(noteId);
if (interpreterSettingIds != null) {
for (String settingId : interpreterSettingIds) {
Expand All @@ -451,19 +461,22 @@ public List<InterpreterSetting> getInterpreterSettings(String noteId) {
}
}
}
}
interpreterSettingsLock.readLock().unlock();
return settings;
}

public InterpreterSetting getInterpreterSettingByName(String name) {
synchronized (interpreterSettings) {
try {
interpreterSettingsLock.readLock().lock();
for (InterpreterSetting setting : interpreterSettings.values()) {
if (setting.getName().equals(name)) {
return setting;
}
}
throw new RuntimeException("No such interpreter setting: " + name);
} finally {
interpreterSettingsLock.readLock().unlock();
}
throw new RuntimeException("No such interpreter setting: " + name);
}

public ManagedInterpreterGroup getInterpreterGroupById(String groupId) {
Expand Down Expand Up @@ -617,12 +630,11 @@ public void removeResourcesBelongsToNote(String noteId) {
}

/**
* Overwrite dependency jar under local-repo/{interpreterId}
* if jar file in original path is changed
* Overwrite dependency jar under local-repo/{interpreterId} if jar file in original path is
* changed
*/
private void copyDependenciesFromLocalPath(final InterpreterSetting setting) {
setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
synchronized (interpreterSettings) {
final Thread t = new Thread() {
public void run() {
try {
Expand Down Expand Up @@ -653,7 +665,6 @@ public void run() {
}
};
t.start();
}
}

/**
Expand Down Expand Up @@ -713,28 +724,28 @@ public void setInterpreterBinding(String user, String noteId, List<String> setti
throws IOException {
List<String> unBindedSettingIdList = new LinkedList<>();

synchronized (interpreterSettings) {
List<String> oldSettingIdList = interpreterBindings.get(noteId);
if (oldSettingIdList != null) {
for (String oldSettingId : oldSettingIdList) {
if (!settingIdList.contains(oldSettingId)) {
unBindedSettingIdList.add(oldSettingId);
}
interpreterSettingsLock.readLock().lock();
List<String> oldSettingIdList = interpreterBindings.get(noteId);
if (oldSettingIdList != null) {
for (String oldSettingId : oldSettingIdList) {
if (!settingIdList.contains(oldSettingId)) {
unBindedSettingIdList.add(oldSettingId);
}
}
interpreterBindings.put(noteId, settingIdList);
saveToFile();

for (String settingId : unBindedSettingIdList) {
InterpreterSetting interpreterSetting = interpreterSettings.get(settingId);
//TODO(zjffdu) Add test for this scenario
//only close Interpreters when it is note scoped
if (interpreterSetting.getOption().perNoteIsolated() ||
interpreterSetting.getOption().perNoteScoped()) {
interpreterSetting.closeInterpreters(user, noteId);
}
}
interpreterBindings.put(noteId, settingIdList);
saveToFile();

for (String settingId : unBindedSettingIdList) {
InterpreterSetting interpreterSetting = interpreterSettings.get(settingId);
//TODO(zjffdu) Add test for this scenario
//only close Interpreters when it is note scoped
if (interpreterSetting.getOption().perNoteIsolated() ||
interpreterSetting.getOption().perNoteScoped()) {
interpreterSetting.closeInterpreters(user, noteId);
}
}
interpreterSettingsLock.readLock().unlock();
}

public List<String> getInterpreterBinding(String noteId) {
Expand Down Expand Up @@ -794,49 +805,49 @@ public void removeNoteInterpreterSettingBinding(String user, String noteId) thro
interpreterBindings.remove(noteId);
}

/**
* Change interpreter properties and restart
*/
public void setPropertyAndRestart(String id, InterpreterOption option,
Map<String, InterpreterProperty> properties,
List<Dependency> dependencies)
/** Change interpreter properties and restart */
public void setPropertyAndRestart(
String id,
InterpreterOption option,
Map<String, InterpreterProperty> properties,
List<Dependency> dependencies)
throws InterpreterException, IOException {
synchronized (interpreterSettings) {
InterpreterSetting intpSetting = interpreterSettings.get(id);
if (intpSetting != null) {
try {
intpSetting.close();
intpSetting.setOption(option);
intpSetting.setProperties(properties);
intpSetting.setDependencies(dependencies);
intpSetting.postProcessing();
saveToFile();
} catch (Exception e) {
loadFromFile();
throw new IOException(e);
}
} else {
throw new InterpreterException("Interpreter setting id " + id + " not found");
interpreterSettingsLock.readLock().lock();
InterpreterSetting intpSetting = interpreterSettings.get(id);
interpreterSettingsLock.readLock().unlock();
if (intpSetting != null) {
try {
intpSetting.close();
intpSetting.setOption(option);
intpSetting.setProperties(properties);
intpSetting.setDependencies(dependencies);
intpSetting.postProcessing();
saveToFile();
} catch (Exception e) {
loadFromFile();
throw new IOException(e);
}
} else {
throw new InterpreterException("Interpreter setting id " + id + " not found");
}
}

// restart in note page
public void restart(String settingId, String noteId, String user) throws InterpreterException {
InterpreterSetting intpSetting = interpreterSettings.get(settingId);
Preconditions.checkNotNull(intpSetting);
synchronized (interpreterSettings) {
intpSetting = interpreterSettings.get(settingId);
// Check if dependency in specified path is changed
// If it did, overwrite old dependency jar with new one
if (intpSetting != null) {
//clean up metaInfos
intpSetting.setInfos(null);
copyDependenciesFromLocalPath(intpSetting);
intpSetting.closeInterpreters(user, noteId);
} else {
throw new InterpreterException("Interpreter setting id " + settingId + " not found");
}
interpreterSettingsLock.readLock().lock();
intpSetting = interpreterSettings.get(settingId);
interpreterSettingsLock.readLock().unlock();
// Check if dependency in specified path is changed
// If it did, overwrite old dependency jar with new one
if (intpSetting != null) {
// clean up metaInfos
intpSetting.setInfos(null);
copyDependenciesFromLocalPath(intpSetting);
intpSetting.closeInterpreters(user, noteId);
} else {
throw new InterpreterException("Interpreter setting id " + settingId + " not found");
}
}

Expand All @@ -845,8 +856,11 @@ public void restart(String id) throws InterpreterException {
}

public InterpreterSetting get(String id) {
synchronized (interpreterSettings) {
try {
interpreterSettingsLock.readLock().lock();
return interpreterSettings.get(id);
} finally {
interpreterSettingsLock.readLock().unlock();
}
}

Expand Down Expand Up @@ -893,36 +907,36 @@ public void remove(String id) throws IOException {
* Get interpreter settings
*/
public List<InterpreterSetting> get() {
synchronized (interpreterSettings) {
List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values());
Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() {
@Override
public int compare(InterpreterSetting o1, InterpreterSetting o2) {
int i = interpreterGroupOrderList.indexOf(o1.getGroup());
int j = interpreterGroupOrderList.indexOf(o2.getGroup());
if (i < 0) {
LOGGER.warn("InterpreterGroup " + o1.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
i = Integer.MAX_VALUE;
}
if (j < 0) {
LOGGER.warn("InterpreterGroup " + o2.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
j = Integer.MAX_VALUE;
}
if (i < j) {
return -1;
} else if (i > j) {
return 1;
} else {
return 0;
}
interpreterSettingsLock.readLock().lock();
List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values());
interpreterSettingsLock.readLock().unlock();
Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() {
@Override
public int compare(InterpreterSetting o1, InterpreterSetting o2) {
int i = interpreterGroupOrderList.indexOf(o1.getGroup());
int j = interpreterGroupOrderList.indexOf(o2.getGroup());
if (i < 0) {
LOGGER.warn("InterpreterGroup " + o1.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
i = Integer.MAX_VALUE;
}
});
return orderedSettings;
}
if (j < 0) {
LOGGER.warn("InterpreterGroup " + o2.getGroup()
+ " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
// move the unknown interpreter to last
j = Integer.MAX_VALUE;
}
if (i < j) {
return -1;
} else if (i > j) {
return 1;
} else {
return 0;
}
}
});
return orderedSettings;
}

@VisibleForTesting
Expand All @@ -940,17 +954,18 @@ public void close(String settingId) {

public void close() {
List<Thread> closeThreads = new LinkedList<>();
synchronized (interpreterSettings) {
Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
for (final InterpreterSetting intpSetting : intpSettings) {
Thread t = new Thread() {
public void run() {
intpSetting.close();
}
};
t.start();
closeThreads.add(t);
}
interpreterSettingsLock.readLock().lock();
Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
interpreterSettingsLock.readLock().unlock();
for (final InterpreterSetting intpSetting : intpSettings) {
Thread t =
new Thread() {
public void run() {
intpSetting.close();
}
};
t.start();
closeThreads.add(t);
}

for (Thread t : closeThreads) {
Expand All @@ -961,5 +976,4 @@ public void run() {
}
}
}

}

0 comments on commit 24be692

Please sign in to comment.