Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need help with worker node. Error: No .lua files found! #2629

Closed
niZmosis opened this issue Jul 18, 2023 · 1 comment
Closed

Need help with worker node. Error: No .lua files found! #2629

niZmosis opened this issue Jul 18, 2023 · 1 comment

Comments

@niZmosis
Copy link

niZmosis commented Jul 18, 2023

I am attempting to run bull on my worker node droplet. It works fine on my mac, but once deployed to my linux (ubuntu) droplet, I get this error:

Error: No .lua files found! at loadScripts (/root/myapp/server/dist/worker.cjs:166934:15) at async /root/myapp/server/dist/worker.cjs:166924:19

The error if from this error log:
jobQueue.add(jobName, payload, { repeat: { cron: cronString, tz: 'Etc/UTC' }, jobId: jobName, removeOnComplete: true, removeOnFail: true, }) .catch((error) => console.error(Error scheduling job ${jobName}: ${error.message}) )

Here is my tasks.js file, which I call startTasks() from my workers index.js.
(Sorry about the code block being messed up)

`import Bull from 'bull'
import cronParser from 'cron-parser'
import { updateTopPlayedAssets } from './jobs'
import { updateTokenList } from './tokenList'
import AdminModel from '../../models/admin.model'
import config from '../../config/config'
import { JOB_NAMES } from '../../utils/constants'

let listenersAdded = false

const jobQueue = new Bull(config.REDIS_QUEUE_NAME, {
redis: config.REDIS_CONFIG,
})

function isValidCron(cron) {
try {
cronParser.parseExpression(cron)
return true
} catch (err) {
return false
}
}

function cleanupQueue() {
jobQueue.clean(1_000 * 60 * 60, 'completed') // Cleans jobs completed more than an hour ago
jobQueue.clean(1_000 * 60 * 60, 'failed') // Cleans jobs failed more than an hour ago
}

function calculateEstimatedCompletionTime({ schedule, isInterval }) {
const nowTime = Date.now()

// If the schedule can be converted to a number, treat it as an interval in seconds
if (isInterval && !Number.isNaN(Number(schedule))) {
return {
beginTime: nowTime,
repeatInterval: Number(schedule),
}
}

// Otherwise, treat it as a cron string
const parsed = cronParser.parseExpression(schedule)
const repeatInterval = parsed.next().getTime() - parsed.prev().getTime()

return {
beginTime: parsed.prev().getTime(),
repeatInterval,
}
}

function addListeners() {
if (listenersAdded) return

jobQueue.on('failed', (job, err) => {
console.error(Job ${job.id} failed with error ${err.message})
})

jobQueue.process(JOB_NAMES.RUN_TOKEN_LIST, async () => {
await updateTokenList()
})

jobQueue.process(JOB_NAMES.RUN_TOP_ASSET_STATS, async () => {
await updateTopPlayedAssets()
})

listenersAdded = true
}

function scheduleJob(jobName, cronString, payload = {}, intervalInMs = null) {
if (!cronString && intervalInMs === null) {
throw new Error(
Must provide either a cron string or an interval in milliseconds for job ${jobName}
)
}

if (intervalInMs === 0 || cronString === '0') {
// Remove repeatable job
jobQueue.removeRepeatable(jobName, { jobId: jobName })

// Get all jobs with the same name
jobQueue
  .getJobs()
  .then((jobs) => {
    jobs.forEach((job) => {
      if (job.name === jobName) {
        // Remove each job
        job
          .remove()
          .catch((err) =>
            console.error(`Error removing job ${job.id}: ${err.message}`)
          )
      }
    })
  })
  .catch((err) => console.error(`Error getting jobs: ${err.message}`))

cleanupQueue()

return

}

if (cronString && !isValidCron(cronString)) {
throw new Error(Invalid cron string for job ${jobName})
}

const { beginTime, repeatInterval } = calculateEstimatedCompletionTime({
schedule: cronString ?? intervalInMs,
isInterval: intervalInMs !== null,
})

payload.startTime = beginTime
payload.repeatInterval = repeatInterval

if (intervalInMs !== null && intervalInMs > 0) {
jobQueue
.add(jobName, payload, {
repeat: { every: intervalInMs },
jobId: jobName,
removeOnComplete: true,
removeOnFail: true,
})
.catch((error) =>
console.error(Error scheduling job ${jobName}: ${error.message})
)
} else if (cronString && cronString !== '' && cronString !== '0') {
jobQueue
.add(jobName, payload, {
repeat: { cron: cronString, tz: 'Etc/UTC' },
jobId: jobName,
removeOnComplete: true,
removeOnFail: true,
})
.catch((error) =>
console.error(Error scheduling job ${jobName}: ${error.message})
)
}

cleanupQueue()
}

async function updateRunningJobSchedule(jobName, schedule) {
const activeJobs = await jobQueue.getJobs([
'active',
'completed',
'failed',
'waiting',
'delayed',
'paused',
])

const removePromises = []
const addPromises = []

for (const job of activeJobs) {
if (job && job.name === jobName) {
const existingJobIsInterval =
job.opts.repeat?.every !== undefined && job.opts.repeat?.every !== null

  const jobSchedule = existingJobIsInterval
    ? job.opts.repeat.every
    : job.opts.repeat.cron

  if (schedule !== jobSchedule) {
    const existingSchedule = existingJobIsInterval
      ? job.opts.repeat.every
      : job.opts.repeat.cron

    removePromises.push(job.remove())

    if (
      (existingJobIsInterval && schedule !== 0) ||
      (!existingJobIsInterval && schedule !== '0')
    ) {
      const { beginTime, repeatInterval } =
        calculateEstimatedCompletionTime({
          schedule: existingSchedule,
          isInterval: existingJobIsInterval,
        })

      addPromises.push(
        jobQueue.add(
          job.name,
          {
            ...job.data,
            startTime: beginTime,
            repeatInterval,
          },
          {
            ...job.opts,
            repeat: existingJobIsInterval
              ? { every: schedule, tz: 'Etc/UTC' }
              : { cron: schedule, tz: 'Etc/UTC' },
          }
        )
      )
    }
  }
}

}

try {
await Promise.allSettled(removePromises)
} catch (error) {
console.error(Error removing jobs: ${error.message})
}

await Promise.all(addPromises)
}

async function rescheduleJobsFromSettings(adminSettings) {
const { taskSettings } = adminSettings

const jobs = await jobQueue.getJobs([
'active',
'completed',
'failed',
'waiting',
'delayed',
'paused',
])

const existingJobNames = jobs ? jobs.map((job) => job.name) : []

// runTokenList
if (!existingJobNames.includes(JOB_NAMES.RUN_TOKEN_LIST)) {
scheduleJob(JOB_NAMES.RUN_TOKEN_LIST, taskSettings.tokenListInterval)
}
if (
existingJobNames.includes(JOB_NAMES.RUN_TOKEN_LIST) &&
taskSettings.tokenListInterval !== null &&
taskSettings.tokenListInterval !== undefined
) {
await updateRunningJobSchedule(
JOB_NAMES.RUN_TOKEN_LIST,
taskSettings.tokenListInterval
)
}

// runTopAssetStats
if (!existingJobNames.includes(JOB_NAMES.RUN_TOP_ASSET_STATS)) {
scheduleJob(
JOB_NAMES.RUN_TOP_ASSET_STATS,
taskSettings.topAssetStatsInterval
)
}
if (
existingJobNames.includes(JOB_NAMES.RUN_TOP_ASSET_STATS) &&
taskSettings.topAssetStatsInterval !== null &&
taskSettings.topAssetStatsInterval !== undefined
) {
await updateRunningJobSchedule(
JOB_NAMES.RUN_TOP_ASSET_STATS,
taskSettings.topAssetStatsInterval
)
}
}

const changeStream = AdminModel.watch()
changeStream.on('change', async (change) => {
if (
change.operationType === 'update' &&
change.updateDescription &&
change.updateDescription.updatedFields
) {
const { updatedFields } = change.updateDescription

if (
  updatedFields.taskSettings
) {
  const adminDocument = await AdminModel.findOne({ configId: 1 }).lean()
  rescheduleJobsFromSettings(adminDocument)
}

}
})

export async function startTasks() {
const settings = await AdminModel.findOne({ configId: 1 }).lean()

addListeners()

rescheduleJobsFromSettings(settings)
}
`

Bull Version: 4.10.4
Node: 16.15.0
Ubuntu 20.04 (LTS) x64

Redis is working as it should, both local and my Digital Ocean Redis DB, as my worker and api can talk to each other. This code is working locally, but not on my ubuntu droplet. I have tried to reinstall the node modules, and have checked bulls source file and it all matches my local machine.

@manast
Copy link
Member

manast commented Jul 20, 2023

I recommend you upgrade to BullMQ as there is much better support for loading .lua files that work in most intricate scenarios: https://github.com/taskforcesh/bullmq
I am closing as we will not perform any action for Bull regarding this issue.

@manast manast closed this as completed Jul 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants