Skip to content
This repository has been archived by the owner on Dec 15, 2022. It is now read-only.

Commit

Permalink
Merge pull request #518 from atom/mkt-ku-parallelize-git-ops
Browse files Browse the repository at this point in the history
Parallelize Git operations
  • Loading branch information
BinaryMuse committed Feb 15, 2017
2 parents 590c760 + 98ea7b1 commit 55fc31f
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 157 deletions.
92 changes: 68 additions & 24 deletions lib/async-queue.js
@@ -1,28 +1,72 @@
class Task {
constructor(fn, parallel = true) {
this.fn = fn;
this.parallel = parallel;
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}

async execute() {
try {
const value = await this.fn.call(undefined);
this.resolve(value);
} catch (err) {
this.reject(err);
}
}

runsInParallel() {
return this.parallel;
}

runsInSerial() {
return !this.parallel;
}

getPromise() {
return this.promise;
}
}

export default class AsyncQueue {
constructor() {
this.commandQueue = [];
this.running = false;
}

push(fn) {
const p = new Promise((resolve, reject) => this.commandQueue.push({fn, resolve, reject}));
if (!this.running) { this.processQueue(); }
return p;
}

async processQueue() {
this.running = true;
let lastPromise = Promise.resolve();
while (this.commandQueue.length) {
const {fn, resolve, reject} = this.commandQueue[0];
const run = () => fn();
lastPromise = lastPromise.then(run, run);
lastPromise.then(resolve, reject);
try {
await lastPromise; // eslint-disable-line babel/no-await-in-loop
} catch (e) { /* nothing */ }
this.commandQueue.shift();
constructor(options = {}) {
this.parallelism = options.parallelism || 1;
this.nonParallelizableOperation = false;
this.tasksInProgress = 0;
this.queue = [];
}

push(fn, {parallel} = {parallel: true}) {
const task = new Task(fn, parallel);
this.queue.push(task);
this.processQueue();
return task.getPromise();
}

processQueue() {
if (!this.queue.length || this.nonParallelizableOperation) { return; }

const task = this.queue[0];
const canRunParallelOp = task.runsInParallel() && this.tasksInProgress < this.parallelism;
const canRunSerialOp = task.runsInSerial() && this.tasksInProgress === 0;
if (canRunSerialOp || canRunParallelOp) {
this.processTask(task, task.runsInParallel());
this.queue.shift();
this.processQueue();
}
this.running = false;
}

async processTask(task, runsInParallel) {
this.tasksInProgress++;
if (!runsInParallel) {
this.nonParallelizableOperation = true;
}

await task.execute();
this.tasksInProgress--;
this.nonParallelizableOperation = false;
this.processQueue();
}
}
26 changes: 18 additions & 8 deletions lib/controllers/git-panel-controller.js
Expand Up @@ -82,18 +82,28 @@ export default class GitPanelController {

@autobind
async fetchRepositoryData(repository) {
const dataPromises = {
mergeConflicts: repository.getMergeConflicts(),
lastCommit: repository.getLastCommit(),
isMerging: repository.isMerging(),
branchName: repository.getCurrentBranch(),
branches: repository.getBranches(),
remoteName: repository.getRemoteForBranch(this.branchName),
unstagedChanges: repository.getUnstagedChanges(),
stagedChanges: this.fetchStagedChanges(repository),
};
const data = {
unstagedChanges: await repository.getUnstagedChanges(),
stagedChanges: await this.fetchStagedChanges(repository),
mergeConflicts: await repository.getMergeConflicts(),
lastCommit: await repository.getLastCommit(),
isMerging: await repository.isMerging(),
branchName: await repository.getCurrentBranch(),
branches: await repository.getBranches(),
unstagedChanges: await dataPromises.unstagedChanges,
stagedChanges: await dataPromises.stagedChanges,
mergeConflicts: await dataPromises.mergeConflicts,
lastCommit: await dataPromises.lastCommit,
isMerging: await dataPromises.isMerging,
branchName: await dataPromises.branchName,
branches: await dataPromises.branches,
mergeMessage: null,
aheadCount: null,
behindCount: null,
remoteName: await repository.getRemoteForBranch(this.branchName),
remoteName: await dataPromises.remoteName,
};

if (data.remoteName) {
Expand Down
99 changes: 54 additions & 45 deletions lib/git-shell-out-strategy.js
@@ -1,4 +1,5 @@
import path from 'path';
import os from 'os';

import {CompositeDisposable} from 'atom';

Expand Down Expand Up @@ -30,13 +31,16 @@ function withGpgScript(args) {
}

export default class GitShellOutStrategy {
constructor(workingDir) {
static defaultExecArgs = {stdin: null, useGitPromptServer: false, writeOperation: false}

constructor(workingDir, options = {}) {
this.workingDir = workingDir;
this.commandQueue = new AsyncQueue();
const parallelism = options.parallelism || Math.max(3, os.cpus().length);
this.commandQueue = new AsyncQueue({parallelism});
}

// Execute a command and read the output using the embedded Git environment
exec(args, stdin = null, useGitPromptServer = false) {
exec(args, {stdin, useGitPromptServer, writeOperation} = GitShellOutStrategy.defaultExecArgs) {
/* eslint-disable no-console */
const subscriptions = new CompositeDisposable();

Expand Down Expand Up @@ -86,43 +90,48 @@ export default class GitShellOutStrategy {
if (process.env.PRINT_GIT_TIMES) {
console.time(`git:${formattedArgs}`);
}
timingMarker.mark('execute');
return GitProcess.exec(args, this.workingDir, options)
.then(({stdout, stderr, exitCode}) => {
timingMarker.finalize();
if (process.env.PRINT_GIT_TIMES) {
console.timeEnd(`git:${formattedArgs}`);
}
if (gitPromptServer) {
gitPromptServer.terminate();
}
subscriptions.dispose();
if (exitCode) {
const err = new GitError(
`${formattedArgs} exited with code ${exitCode}\nstdout: ${stdout}\nstderr: ${stderr}`,
);
err.code = exitCode;
err.stdErr = stderr;
err.stdOut = stdout;
err.command = formattedArgs;
return Promise.reject(err);
}
return stdout;
return new Promise(resolve => {
timingMarker.mark('nexttick');
setImmediate(() => {
timingMarker.mark('execute');
resolve(GitProcess.exec(args, this.workingDir, options)
.then(({stdout, stderr, exitCode}) => {
timingMarker.finalize();
if (process.env.PRINT_GIT_TIMES) {
console.timeEnd(`git:${formattedArgs}`);
}
if (gitPromptServer) {
gitPromptServer.terminate();
}
subscriptions.dispose();
if (exitCode) {
const err = new GitError(
`${formattedArgs} exited with code ${exitCode}\nstdout: ${stdout}\nstderr: ${stderr}`,
);
err.code = exitCode;
err.stdErr = stderr;
err.stdOut = stdout;
err.command = formattedArgs;
return Promise.reject(err);
}
return stdout;
}));
});
});
});
}, {parallel: !writeOperation});
/* eslint-enable no-console */
}

/**
* Execute a git command that may create a commit. If the command fails because the GPG binary was invoked and unable
* to acquire a passphrase (because the pinentry program attempted to use a tty), retry with a `GitPromptServer`.
*/
gpgExec(args, stdin = null) {
gpgExec(args, {stdin} = {stdin: null}) {
const gpgArgs = withGpgScript(args);
return this.exec(gpgArgs, stdin).catch(err => {
return this.exec(gpgArgs, {stdin, writeOperation: true}).catch(err => {
if (err.code === 128 && /gpg failed/.test(err.stdErr)) {
// Retry with a GitPromptServer
return this.exec(gpgArgs, stdin, true);
return this.exec(gpgArgs, {stdin, useGitPromptServer: true, writeOperation: true});
} else {
throw err;
}
Expand All @@ -144,33 +153,33 @@ export default class GitShellOutStrategy {
stageFiles(paths) {
if (paths.length === 0) { return null; }
const args = ['add'].concat(paths);
return this.exec(args);
return this.exec(args, {writeOperation: true});
}

unstageFiles(paths, commit = 'HEAD') {
if (paths.length === 0) { return null; }
const args = ['reset', commit, '--'].concat(paths);
return this.exec(args);
return this.exec(args, {writeOperation: true});
}

applyPatch(patch, {index} = {}) {
const args = ['apply', '-'];
if (index) { args.splice(1, 0, '--cached'); }
return this.exec(args, patch);
return this.exec(args, {stdin: patch, writeOperation: true});
}

commit(message, {allowEmpty, amend} = {}) {
const args = ['commit', '-m', message];
if (amend) { args.push('--amend'); }
if (allowEmpty) { args.push('--allow-empty'); }
return this.gpgExec(args);
return this.gpgExec(args, {writeOperation: true});
}

/**
* File Status and Diffs
*/
async getStatusesForChangedFiles() {
const output = await this.exec(['status', '--untracked-files=all', '-z']);
const output = await this.exec(['status', '--untracked-files=all', '-z'], {writeOperation: true});

const statusMap = {
'A': 'added',
Expand Down Expand Up @@ -280,7 +289,7 @@ export default class GitShellOutStrategy {

async isPartiallyStaged(filePath) {
const args = ['status', '--short', '--', filePath];
const output = await this.exec(args);
const output = await this.exec(args, {writeOperation: true});
const results = output.trim().split(LINE_ENDING_REGEX);
if (results.length === 2) {
return true;
Expand Down Expand Up @@ -325,7 +334,7 @@ export default class GitShellOutStrategy {
}

abortMerge() {
return this.exec(['merge', '--abort']);
return this.exec(['merge', '--abort'], {writeOperation: true});
}

/**
Expand All @@ -334,7 +343,7 @@ export default class GitShellOutStrategy {
clone(remoteUrl, options = {}) {
const args = ['clone', '--no-local', remoteUrl, this.workingDir];
if (options.bare) { args.push('--bare'); }
return this.exec(args);
return this.exec(args, {writeOperation: true});
}

async getRemoteForBranch(branchName) {
Expand All @@ -348,20 +357,20 @@ export default class GitShellOutStrategy {

async fetch(branchName) {
const remote = await this.getRemoteForBranch(branchName);
return this.exec(['fetch', remote, branchName], null, true);
return this.exec(['fetch', remote, branchName], {useGitPromptServer: true, writeOperation: true});
}

async pull(branchName) {
const remote = await this.getRemoteForBranch(branchName);
return this.gpgExec(['pull', remote, branchName], null, true);
return this.gpgExec(['pull', remote, branchName], {useGitPromptServer: true, writeOperation: true});
}

async push(branchName, options = {}) {
const remote = await this.getRemoteForBranch(branchName);
const args = ['push', remote || 'origin', branchName];
if (options.setUpstream) { args.push('--set-upstream'); }
if (options.force) { args.push('--force'); }
return this.exec(args, null, true);
return this.exec(args, {useGitPromptServer: true, writeOperation: true});
}

async getAheadCount(branchName) {
Expand Down Expand Up @@ -395,15 +404,15 @@ export default class GitShellOutStrategy {
checkout(branchName, options = {}) {
const args = ['checkout'];
if (options.createNew) { args.push('-b'); }
return this.exec(args.concat(branchName));
return this.exec(args.concat(branchName), {writeOperation: true});
}

async checkoutFiles(paths, revision) {
if (paths.length === 0) { return null; }
try {
const args = ['checkout'];
if (revision) { args.push(revision); }
return await this.exec(args.concat('--', paths));
return await this.exec(args.concat('--', paths), {writeOperation: true});
} catch (error) {
const matches = error.stdErr.match(/error: pathspec .* did not match any file\(s\) known to git\./g);
if (matches.length) {
Expand Down Expand Up @@ -449,7 +458,7 @@ export default class GitShellOutStrategy {
let args = ['config'];
if (replaceAll) { args.push('--replace-all'); }
args = args.concat(option, value);
return this.exec(args);
return this.exec(args, {writeOperation: true});
}

async getRemotes() {
Expand All @@ -472,9 +481,9 @@ export default class GitShellOutStrategy {
async createBlob({filePath, stdin} = {}) {
let output;
if (filePath) {
output = await this.exec(['hash-object', '-w', filePath]);
output = await this.exec(['hash-object', '-w', filePath], {writeOperation: true});
} else if (stdin) {
output = await this.exec(['hash-object', '-w', '--stdin'], stdin);
output = await this.exec(['hash-object', '-w', '--stdin'], {stdin, writeOperation: true});
} else {
throw new Error('Must supply file path or stdin');
}
Expand Down
1 change: 1 addition & 0 deletions lib/views/git-timings-view.js
Expand Up @@ -94,6 +94,7 @@ class MarkerTooltip extends React.Component {
const COLORS = {
queued: 'red',
prepare: 'cyan',
nexttick: 'yellow',
execute: 'green',
};
class MarkerSpan extends React.Component {
Expand Down

0 comments on commit 55fc31f

Please sign in to comment.