Skip to content

Commit

Permalink
feat: allow custom ipfs node usage
Browse files Browse the repository at this point in the history
  • Loading branch information
geolffreym committed Sep 8, 2022
1 parent 1d90d7e commit 93f46ac
Show file tree
Hide file tree
Showing 8 changed files with 4,292 additions and 2,879 deletions.
4 changes: 3 additions & 1 deletion .vscode/settings.json
@@ -1,7 +1,9 @@
{
"cSpell.words": [
"IPFS",
"ipns",
"linvodb",
"logplease"
"logplease",
"WATCHIT"
]
}
6,957 changes: 4,171 additions & 2,786 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
@@ -1,7 +1,7 @@
{
"name": "watchit",
"author": "ZorrillosDev",
"version": "0.3.5-alpha.1",
"version": "0.3.6-alpha.1",
"private": true,
"main": "./src/main/index.js",
"description": "Open movies everywhere",
Expand Down
9 changes: 6 additions & 3 deletions src/main/core/index.js
Expand Up @@ -5,17 +5,20 @@
const log = require('logplease').create('CORE')
const Node = require('./node')
const Ingest = require('./ingest')
// const BroadCast = require('./broadcast')
const key = require('./key')

module.exports = (ipcMain, runtime = 'node') => {
let nodeConf = { }
let nodeConf = {}
const keyFile = key.readFromStorage()
// Check if user added local node port
const node = keyFile && 'node' in keyFile ? keyFile.node : null

if (!Object.is(runtime, 'web')) {
const { ROOT_ORBIT_DIR } = require('./settings')
nodeConf = Object.assign({ directory: ROOT_ORBIT_DIR }, nodeConf)
}

const orbit = Node.getInstance({ orbit: nodeConf })
const orbit = Node.getInstance({ orbit: nodeConf, ipfs: { node } })
const ingest = Ingest.getInstance(orbit)

/**
Expand Down
37 changes: 27 additions & 10 deletions src/main/core/ipfs/node/index.js
Expand Up @@ -4,26 +4,30 @@ const getPort = require('get-port')
const Ctl = require('ipfsd-ctl')
const defaultConf = require('./settings')
const log = require('logplease').create('IPFS')
const IPFSClient = require('ipfs-http-client')

// Path settings and util helper lib
const { removeFiles } = require('../../utils')
const { ROOT_IPFS_DIR } = require('../../settings')

const DEFAULT_LOOPBACK = '127.0.0.1'
const RETRY_GRACE = 20 * 1000
const DEFAULT_API_PORT = 5001
const DEFAULT_GATEWAY_PORT = 8080
const KILL_TIMEOUT = 15 * 1000
const DEFAULT_API_PORT = 6002
const DEFAULT_GATEWAY_PORT = 9090
const DEFAULT_SWARM_TCP = 4010
const DEFAULT_SWARM_WS = 4011
const resolveIpfsPaths = () => require('go-ipfs').path()
.replace('app.asar', 'app.asar.unpacked')
.replace('app.asar', 'app.asar.unpacked');

const forceKill = async (isInstance) => {
log.info('Forcing stop')
await isInstance.stop()
log.warn('Cleaning bad repo')
await removeFiles(ROOT_IPFS_DIR)
}; const initIpfsNode = async (isInstance) => {
};

const initIpfsNode = async (isInstance) => {
// Check if running time dir exists
log.warn('Starting node')
try {
Expand All @@ -34,7 +38,20 @@ const forceKill = async (isInstance) => {
}

return isInstance.start()
}; const ipfsFactory = async (conf = {}) => {
};

const ipfsFactory = async (conf = {}) => {

// Link to current local node
if ('node' in conf) {
log.info('Using provided node on port:', conf.node)
return IPFSClient.create({
host: DEFAULT_LOOPBACK,
port: conf.node,
protocol: 'http'
})
}

// Find available ports to avoid conflict
const [api, gateway, swarmTCP, swarmWS] = await Promise.all([
getPort({ port: DEFAULT_API_PORT }),
Expand All @@ -57,13 +74,13 @@ const forceKill = async (isInstance) => {
},
...conf
},
ipfsHttpModule: require('ipfs-http-client'),
ipfsHttpModule: IPFSClient,
ipfsBin: resolveIpfsPaths(),
forceKill: true,
disposable: false,
forceKillTimeout: KILL_TIMEOUT,
args: ['--enable-pubsub-experiment'],
forceKill: true,
remote: false,
forceKillTimeout: KILL_TIMEOUT,
type: 'go'
})

Expand All @@ -87,15 +104,15 @@ const forceKill = async (isInstance) => {
} catch (e) {
// Avoid throw default error
log.error('Fail on start')
await forceKill(isInstance)
return false
}
await forceKill(isInstance)

const ipfsApi = isInstance?.api
const id = ipfsApi.peerId
ipfsApi.kill = async () => isInstance.stop()
log.info(`Started ${isInstance.started}`)
log.info('Running ipfs id', id.id)
ipfsApi.kill = async () => isInstance.stop()

return ipfsApi
}
Expand Down
50 changes: 26 additions & 24 deletions src/main/core/node.js
Expand Up @@ -13,7 +13,7 @@ const log = require('logplease')
const DEFAULT_HOLD = 10 * 1000

module.exports = class Node extends EventEmitter {
constructor (conf = {}) {
constructor(conf = {}) {
super()
this.conf = conf
this.seedMode = false
Expand All @@ -25,11 +25,11 @@ module.exports = class Node extends EventEmitter {
this.db = null // Current opened db
}

get [Symbol.toStringTag] () {
get [Symbol.toStringTag]() {
return 'Node'
}

static getInstance (conf) {
static getInstance(conf) {
return new Node(conf)
}

Expand All @@ -40,7 +40,7 @@ module.exports = class Node extends EventEmitter {
* @param {*} [settings={}]
* @return {*}
*/
open (address, settings = {}) {
open(address, settings = {}) {
return this.orbit.open(address, {
...{
replicate: true,
Expand All @@ -58,30 +58,30 @@ module.exports = class Node extends EventEmitter {
*
* @param {boolean} [isRunningSeed=false]
*/
setInSeedMode (isRunningSeed = false) {
setInSeedMode(isRunningSeed = false) {
this.seedMode = isRunningSeed
}

async getIngestKey () {
return await this.resolveKey(
async getIngestKey() {
return this.resolveKey(
this.rawIngestKey
)
}

sanitizeKey (address) {
sanitizeKey(address) {
return key.sanitizedKey(address)
}

get rawIngestKey () {
get rawIngestKey() {
return key.getIngestKey()
}

get hasValidCache () {
get hasValidCache() {
const [validCache] = this.cache
return validCache
}

get cache () {
get cache() {
const cache = key.readFromStorage()
const validCache = cache && 'tmp' in cache
return [validCache, cache]
Expand All @@ -104,7 +104,7 @@ module.exports = class Node extends EventEmitter {
* @param ipns {string} IPNS hash
* @return {string} Orbit address resolver key from ipns
*/
async resolveKey (ipns) {
async resolveKey(ipns) {
if (!ipns) return false
if (~ipns.indexOf('zd')) return ipns

Expand All @@ -125,11 +125,11 @@ module.exports = class Node extends EventEmitter {
}

/**
* Opem orbit address and set events listeners
* Open orbit address and set events listeners
* @param key: orbit address
* @param res: callback
*/
async run (key, res) {
async run(key, res) {
log.info('Starting movies db:', key)
this.db = await this.open(key).catch(async (e) => {
console.log(e)
Expand Down Expand Up @@ -165,13 +165,13 @@ module.exports = class Node extends EventEmitter {
/**
* Kill all - party all
*/
async party (msg = 'Invalid Key') {
async party(msg = 'Invalid Key') {
log.warn('Party rock')
this.emit('node-chaos', msg)
await this.close(true)
}

get pubsub () {
get pubsub() {
if (!this.orbit) return {}
return this.orbit._pubsub
}
Expand All @@ -182,16 +182,18 @@ module.exports = class Node extends EventEmitter {
* and get providers for db
* @param res: callback
*/
async nodeReady (res) {
async nodeReady(res) {
log.info('Node ready')
log.info('Loading db..')
const raw = await this.getIngestKey()
if (!raw) return false // Avoid move forward
const address = this.sanitizeKey(raw)

// Get orbit instance and next line connect providers
this.orbit = await this.instanceOB()
log.warn('Sanitized key:', address)
this.emit('node-step', 'Connecting')

await this.run(address, res) // Wait for orbit to open db
this.emit('node-raised') // When all ready to handle orbit
await provider.findProv(this.node, raw)
Expand All @@ -200,17 +202,17 @@ module.exports = class Node extends EventEmitter {
/**
* Orbit db factory
*/
instanceOB () {
instanceOB() {
return (this.orbit && Promise.resolve(this.orbit)) ||
OrbitDB.createInstance(this.node, this.conf.orbit)
OrbitDB.createInstance(this.node, this.conf.orbit)
}

/**
* IPFS factory handler
* try keep node alive if cannot do it after MAX_RETRIES
* app get killed :(
*/
instanceNode () {
instanceNode() {
return new Promise(async (resolve) => {
// If fail to much.. get fuck out
log.info('Setting up node..')
Expand All @@ -224,7 +226,7 @@ module.exports = class Node extends EventEmitter {
})
}

start () {
start() {
this.closed = false // Restore closed state
if (this.ready) return Promise.resolve(this.db)
return new Promise(async (resolve) => {
Expand All @@ -235,7 +237,7 @@ module.exports = class Node extends EventEmitter {
})
}

async close (forceDrop = false) {
async close(forceDrop = false) {
// Keep this states on top to avoid
// run any event while nodes get closed
this.closed = true // Closed already
Expand All @@ -256,7 +258,7 @@ module.exports = class Node extends EventEmitter {
}
}

if (this.node) {
if (this.node && this.node?.kill) {
log.warn('Killing Nodes')
await this.node.kill().catch(
() => log.error('Fail trying to stop node')
Expand All @@ -274,7 +276,7 @@ module.exports = class Node extends EventEmitter {
this.peers.clear()
}

async get (hash) {
async get(hash) {
const oplog = (this.db.oplog || this.db._oplog)
const result = oplog.values.find(v => v.hash === hash)
if (!result || !hash) return false
Expand Down

0 comments on commit 93f46ac

Please sign in to comment.