Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 268 additions & 53 deletions extensions/puterfs/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const {
Actor,
Context,
UserActorType,
TDetachable,
MultiDetachable,
} = extension.import('core');

const {
Expand All @@ -66,6 +68,10 @@ const {
NodeInternalIDSelector,
} = extension.import('core').fs.selectors;

const {
FSNodeContext,
} = extension.import('fs');

const {
// MODE_READ,
MODE_WRITE,
Expand All @@ -79,6 +85,10 @@ const {
RESOURCE_STATUS_PENDING_CREATE,
} = extension.import('fs').resource;

const {
UploadProgressTracker,
} = extension.import('fs').util;

class PuterFSProvider {
/**
* Check if a given node exists.
Expand Down Expand Up @@ -156,66 +166,55 @@ class PuterFSProvider {
async mkdir ({ context, parent, name, immutable }) {
const { actor, thumbnail } = context.values;

const lock_handle = await svc_fsLock.lock_child(await parent.get('path'),
name,
MODE_WRITE);

try {
const ts = Math.round(Date.now() / 1000);
const uid = uuidv4();

const existing = await svc_fs.node(new NodeChildSelector(parent.selector, name));
const ts = Math.round(Date.now() / 1000);
const uid = uuidv4();

if ( await existing.exists() ) {
throw APIError.create('item_with_same_name_exists', null, {
entry_name: name,
});
}

if ( ! await parent.exists() ) {
throw APIError.create('subject_does_not_exist');
}
if ( ! await svc_acl.check(actor, parent, 'write') ) {
throw await svc_acl.get_safe_acl_error(actor, parent, 'write');
}
const existing = await svc_fs.node(new NodeChildSelector(parent.selector, name));

svc_resource.register({
uid,
status: RESOURCE_STATUS_PENDING_CREATE,
if ( await existing.exists() ) {
throw APIError.create('item_with_same_name_exists', null, {
entry_name: name,
});
}

const raw_fsentry = {
is_dir: 1,
uuid: uid,
parent_uid: await parent.get('uid'),
path: path_.join(await parent.get('path'), name),
user_id: actor.type.user.id,
name,
created: ts,
accessed: ts,
modified: ts,
immutable: immutable ?? false,
...(thumbnail ? {
thumbnail: thumbnail,
} : {}),
};

const entryOp = await svc_fsEntry.insert(raw_fsentry);

await entryOp.awaitDone();
svc_resource.free(uid);
if ( ! await parent.exists() ) {
throw APIError.create('subject_does_not_exist');
}

const node = await svc_fs.node(new NodeUIDSelector(uid));
svc_resource.register({
uid,
status: RESOURCE_STATUS_PENDING_CREATE,
});

svc_event.emit('fs.create.directory', {
node,
context: Context.get(),
});
const raw_fsentry = {
is_dir: 1,
uuid: uid,
parent_uid: await parent.get('uid'),
path: path_.join(await parent.get('path'), name),
user_id: actor.type.user.id,
name,
created: ts,
accessed: ts,
modified: ts,
immutable: immutable ?? false,
...(thumbnail ? {
thumbnail: thumbnail,
} : {}),
};

const entryOp = await svc_fsEntry.insert(raw_fsentry);

await entryOp.awaitDone();
svc_resource.free(uid);

const node = await svc_fs.node(new NodeUIDSelector(uid));

svc_event.emit('fs.create.directory', {
node,
context: Context.get(),
});

return node;
} finally {
await lock_handle.unlock();
}
return node;
}

async read ({ context, node, version_id, range }) {
Expand All @@ -234,6 +233,222 @@ class PuterFSProvider {
return stream;
}

async stat ({
selector,
options,
controls,
node,
}) {
// For Puter FS nodes, we assume we will obtain all properties from
// fsEntryService/fsEntryFetcher, except for 'thumbnail' unless it's
// explicitly requested.

if ( options.tracer == null ) {
options.tracer = svc_trace.tracer;
}

if ( options.op ) {
options.trace_options = {
parent: options.op.span,
};
}

let entry;

await new Promise (rslv => {
const detachables = new MultiDetachable();

const callback = (_resolver) => {
detachables.as(TDetachable).detach();
rslv();
};

// either the resource is free
{
// no detachale because waitForResource returns a
// Promise that will be resolved when the resource
// is free no matter what, and then it will be
// garbage collected.
svc_resource.waitForResource(selector).then(callback.bind(null, 'resourceService'));
}

// or pending information about the resource
// becomes available
{
// detachable is needed here because waitForEntry keeps
// a map of listeners in memory, and this event may
// never occur. If this never occurs, waitForResource
// is guaranteed to resolve eventually, and then this
// detachable will be detached by `callback` so the
// listener can be garbage collected.
const det = svc_fsEntry.waitForEntry(node, callback.bind(null, 'fsEntryService'));
if ( det ) detachables.add(det);
}
});

const maybe_uid = node.uid;
if ( svc_resource.getResourceInfo(maybe_uid) ) {
entry = await svc_fsEntry.get(maybe_uid, options);
controls.log.debug('got an entry from the future');
} else {
entry = await svc_fsEntryFetcher.find(selector, options);
}

if ( ! entry ) {
if ( this.log_fsentriesNotFound ) {
controls.log.warn(`entry not found: ${selector.describe(true)}`);
}
}

if ( entry === null || typeof entry !== 'object' ) {
return null;
}

if ( entry.id ) {
controls.provide_selector(new NodeInternalIDSelector('mysql', entry.id, {
source: 'FSNodeContext optimization',
}));
}

return entry;
}

async copy_tree ({ context, source, parent, target_name }) {
// Context
const actor = (context ?? Context).get('actor');
const user = actor.type.user;

const tracer = svc_trace.tracer;
const uuid = uuidv4();
const timestamp = Math.round(Date.now() / 1000);
await parent.fetchEntry();
await source.fetchEntry({ thumbnail: true });

// New filesystem entry
const raw_fsentry = {
uuid,
is_dir: source.entry.is_dir,
...(source.entry.is_shortcut ? {
is_shortcut: source.entry.is_shortcut,
shortcut_to: source.entry.shortcut_to,
} : {}),
parent_uid: parent.uid,
name: target_name,
created: timestamp,
modified: timestamp,

path: path_.join(await parent.get('path'), target_name),

// if property exists but the value is undefined,
// it will still be included in the INSERT, causing
// an error
...(source.entry.thumbnail ?
{ thumbnail: source.entry.thumbnail } : {}),

user_id: user.id,
};

svc_event.emit('fs.pending.file', {
fsentry: FSNodeContext.sanitize_pending_entry_info(raw_fsentry),
context: context,
});

if ( await source.get('has-s3') ) {
Object.assign(raw_fsentry, {
size: source.entry.size,
associated_app_id: source.entry.associated_app_id,
bucket: source.entry.bucket,
bucket_region: source.entry.bucket_region,
});

await tracer.startActiveSpan('fs:cp:storage-copy', async span => {
let progress_tracker = new UploadProgressTracker();

svc_event.emit('fs.storage.progress.copy', {
upload_tracker: progress_tracker,
context,
meta: {
item_uid: uuid,
item_path: raw_fsentry.path,
},
});

// const storage = new PuterS3StorageStrategy({ services: svc });
const storage = context.get('storage');
const state_copy = storage.create_copy();
await state_copy.run({
src_node: source,
dst_storage: {
key: uuid,
bucket: raw_fsentry.bucket,
bucket_region: raw_fsentry.bucket_region,
},
storage_api: { progress_tracker },
});

span.end();
});
}

{
await svc_size.add_node_size(undefined, source, user);
}

svc_resource.register({
uid: uuid,
status: RESOURCE_STATUS_PENDING_CREATE,
});

const entryOp = await svc_fsEntry.insert(raw_fsentry);

let node;

const tasks = new ParallelTasks({ tracer, max: 4 });
await context.arun('fs:cp:parallel-portion', async () => {
// Add child copy tasks if this is a directory
if ( source.entry.is_dir ) {
const children = await svc_fsEntry.fast_get_direct_descendants(source.uid);
for ( const child_uuid of children ) {
tasks.add('fs:cp:copy-child', async () => {
const child_node = await svc_fs.node(new NodeUIDSelector(child_uuid));
const child_name = await child_node.get('name');

await this.copy_tree({
context,
source: await svc_fs.node(new NodeUIDSelector(child_uuid)),
parent: await svc_fs.node(new NodeUIDSelector(uuid)),
target_name: child_name,
});
});
}
}

// Add task to await entry
tasks.add('fs:cp:entry-op', async () => {
await entryOp.awaitDone();
svc_resource.free(uuid);
const copy_fsNode = await svc_fs.node(new NodeUIDSelector(uuid));
copy_fsNode.entry = raw_fsentry;
copy_fsNode.found = true;
copy_fsNode.path = raw_fsentry.path;

node = copy_fsNode;

svc_event.emit('fs.create.file', {
node,
context,
});
}, { force: true });

await tasks.awaitAll();
});

node = node || await svc_fs.node(new NodeUIDSelector(uuid));

// TODO: What event do we emit? How do we know if we're overwriting?
return node;
}

async #rmnode ({ node, options }) {
// Services
if ( !options.override_immutable && await node.get('immutable') ) {
Expand Down
10 changes: 8 additions & 2 deletions src/backend/src/CoreModule.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const { LLOWrite } = require('./filesystem/ll_operations/ll_write');
const { LLRead } = require('./filesystem/ll_operations/ll_read');
const { RuntimeModule } = require('./extension/RuntimeModule.js');
const { TYPE_DIRECTORY, TYPE_FILE } = require('./filesystem/FSNodeContext.js');
const { TDetachable } = require('@heyputer/putility/src/traits/traits.js');
const { MultiDetachable } = require('@heyputer/putility/src/libs/listener.js');

/**
* Core module for the Puter platform that includes essential services including
Expand Down Expand Up @@ -89,6 +91,10 @@ const install = async ({ context, services, app, useapi, modapi }) => {
def('core.Context', Context);

def('core', require('./services/auth/Actor'), { assign: true });
def('core', {
TDetachable,
MultiDetachable,
}, { assign: true });
def('core.config', config);

// Note: this is an incomplete export; it was added for a proprietary
Expand Down Expand Up @@ -275,11 +281,11 @@ const install = async ({ context, services, app, useapi, modapi }) => {
});
services.registerService('__refresh-assocs', RefreshAssociationsService);
services.registerService('__prod-debugging', MakeProdDebuggingLessAwfulService);
if ( config.env == 'dev' && ! config.no_devsocket ) {
if ( config.env == 'dev' && !config.no_devsocket ) {
const { DevSocketService } = require('./services/DevSocketService.js');
services.registerService('dev-socket', DevSocketService);
}
if ( (config.env == 'dev' && ! config.no_devconsole && process.env.DEVCONSOLE) || config.devconsole ) {
if ( (config.env == 'dev' && !config.no_devconsole && process.env.DEVCONSOLE) || config.devconsole ) {
const { DevConsoleService } = require('./services/DevConsoleService');
services.registerService('dev-console', DevConsoleService);
} else {
Expand Down
Loading