Skip to content

AlbatrosDeveloper/nestjs-temporal-example

Repository files navigation

Temporal Bookstore

Proyecto NestJS con Temporal como motor de orquestacion de workflows. La arquitectura es agnostica al lenguaje: permite ejecutar workflows y activities en TypeScript, Python, Go, Java o cualquier SDK de Temporal.

Tabla de Contenidos


Arquitectura

+-------------------------------------------------------------------+
|  AppModule                                                        |
|  TemporalModule.forRoot()  <-- client + worker manager            |
|                                                                   |
|  +-------------------------+  +--------------------------------+  |
|  |  libs/temporal/          |  |  modules/borrowing/            |  |
|  |  (infraestructura)       |  |  (dominio)                     |  |
|  |                          |  |                                |  |
|  |  TemporalService         |  |  BorrowingService              |  |
|  |  TemporalClientFactory   |  |    -> temporal.startWorkflow() |  |
|  |  TemporalWorkerManager   |  |                                |  |
|  |  TaskQueueRegistry       |  |  temporal/                     |  |
|  |                          |  |    borrowing.workflow.ts        |  |
|  |                          |  |    borrowing.activities.ts      |  |
|  |                          |  |    borrowing.external-activit.. |  |
|  |                          |  |    borrowing.task-queue.ts      |  |
|  |                          |  |    borrowing.constants.ts       |  |
|  +-------------------------+  +--------------------------------+  |
+-------------------------------------------------------------------+
         |                              |
         v                              v
   Temporal Server              Workers TS / Python / Go

Principios de diseno:

  1. Infraestructura en libs/, dominio en modules/ -- el lib Temporal solo contiene infraestructura generica. Cada modulo es dueno de sus workflows, activities y task queues.
  2. Agnostico al lenguaje -- TemporalService trabaja con strings para nombres de workflow y task queue. Funciona igual si el worker es TS, Python, Go o Java.
  3. Activities cross-language -- un workflow TS puede llamar activities de Python/Go usando proxyActivities con un taskQueue diferente. Solo necesitas una interfaz TypeScript que describa el contrato.
  4. Estructura plana y simple -- sin subdirectorios innecesarios dentro de libs/temporal/.

Estructura de Archivos

src/
  libs/
    temporal/                              # Infraestructura (compartida)
      index.ts                             # Barrel export
      temporal.module.ts                   # Modulo dinamico NestJS
      temporal.service.ts                  # startWorkflow() / executeWorkflow()
      temporal.client-factory.ts           # Crea el Client de Temporal
      temporal.worker-manager.ts           # Levanta workers TS registrados
      temporal.registry.ts                 # Registro de task queues
      temporal.utils.ts                    # extractActivities() helper
      temporal.interfaces.ts               # Interfaces y tipos
      temporal.constants.ts                # Tokens de inyeccion

  modules/
    borrowing/
      borrowing.module.ts
      borrowing.service.ts
      borrowing.controller.ts
      temporal/                            # Temporal del dominio borrowing
        index.ts                           # Barrel export
        borrowing.constants.ts             # Task queues y nombres de workflow
        borrowing.activities.ts            # Activities locales (TypeScript)
        borrowing.external-activities.ts   # Interfaces de activities externas (Python)
        borrowing.workflow.ts              # Workflow que mezcla activities TS + Python
        borrowing.task-queue.ts            # Registro del task queue

Configuracion

Variables de Entorno

TEMPORAL_ADDRESS=localhost:7233
TEMPORAL_NAMESPACE=default

Global Config (src/libs/config/global.config.ts)

export const GlobalConfig = () => ({
  // ...
  temporal: {
    address: process.env.TEMPORAL_ADDRESS || 'localhost:7233',
    namespace: process.env.TEMPORAL_NAMESPACE || 'default',
  },
});

Modulo Temporal (Infraestructura)

El TemporalModule es un modulo dinamico @Global() con tres modos:

TemporalModule.forRoot()

Setup completo: client + workers. Usar en AppModule.

@Module({
  imports: [TemporalModule.forRoot()],
})
export class AppModule {}

Registra: TemporalClientFactory, TemporalService, TaskQueueRegistry, TemporalWorkerManager.

TemporalModule.forClient()

Solo cliente, sin workers. Usar cuando solo necesitas llamar workflows externos.

@Module({
  imports: [TemporalModule.forClient()],
})
export class AppModule {}

Registra solo TemporalClientFactory y TemporalService.

TemporalModule.forFeature()

Registrar task queues desde modulos de dominio.

@Module({
  imports: [TemporalModule.forFeature([BorrowingTaskQueueRegistration])],
  providers: [BorrowingService, BorrowingActivities],
})
export class BorrowingModule {}

TemporalService API

startWorkflow(options) -- Fire-and-forget

Inicia un workflow y retorna el workflowId. No espera resultado.

const workflowId = await this.temporal.startWorkflow({
  workflow: 'processBorrowing',
  taskQueue: 'TEMPORAL_BOOKSTORE',
  workflowId: 'process-borrowing-123',  // opcional
  args: [{ bookId: '...' }],
});

executeWorkflow<T>(options) -- Esperar resultado

Inicia un workflow y espera su resultado.

const result = await this.temporal.executeWorkflow<SyncResult>({
  workflow: 'sync_materials',
  taskQueue: 'ave-etl-py',
  args: [{ env_key: 'production' }],
});

getClient() -- Acceso directo al Client

Para signals, queries, schedules, cancelaciones, etc.

const client = this.temporal.getClient();
const handle = client.workflow.getHandle('workflow-id-123');
await handle.signal('approve');
await handle.query('getStatus');
await handle.cancel();

extractActivities(instance) -- Registro automatico de activities

Funcion utilitaria que extrae todos los metodos publicos de una clase de activities y los bindea automaticamente. Elimina la necesidad de listar cada metodo manualmente en el task-queue registration.

Sin extractActivities (manual):

useFactory: (activities: MiActivities): TaskQueueConfig => ({
  taskQueue: MI_TASK_QUEUE,
  workflowsPath: join(__dirname, 'mi.workflow'),
  activities: {
    paso1: activities.paso1.bind(activities),
    paso2: activities.paso2.bind(activities),
    paso3: activities.paso3.bind(activities),
    // ... hay que agregar cada metodo nuevo manualmente
  },
}),

Con extractActivities (automatico):

import { extractActivities } from '../../libs/temporal';

useFactory: (activities: MiActivities): TaskQueueConfig => ({
  taskQueue: MI_TASK_QUEUE,
  workflowsPath: join(__dirname, 'mi.workflow'),
  activities: extractActivities(activities),
  // Cuando agregas un nuevo metodo a MiActivities, se registra solo
}),

La funcion recorre el prototype de la instancia, filtra el constructor, y bindea cada metodo. Asi, cuando agregas una nueva activity a la clase, el worker la recoge automaticamente sin tocar el task-queue registration.


Casos de Uso

Caso 1: Workflow TypeScript local

El caso mas comun. Workflow y activities implementados en TypeScript.

1. Constantes:

// modules/borrowing/temporal/borrowing.constants.ts
export const BORROWING_TASK_QUEUE = 'TEMPORAL_BOOKSTORE';
export const BORROWING_WORKFLOW = 'processBorrowing';

2. Activities:

// modules/borrowing/temporal/borrowing.activities.ts
@Injectable()
export class BorrowingActivities {
  async verifyInventory(payload: any) { return payload; }
  async createBorrowingEntry(payload: any) { return payload; }
  async sendConfirmation(payload: any) { return payload; }
}

3. Workflow:

// modules/borrowing/temporal/borrowing.workflow.ts
import { proxyActivities } from '@temporalio/workflow';
import type { BorrowingActivities } from './borrowing.activities';

const { verifyInventory, createBorrowingEntry, sendConfirmation } =
  proxyActivities<BorrowingActivities>({
    startToCloseTimeout: '30 minutes',
    retry: { maximumAttempts: 5, initialInterval: '1 minute' },
  });

export async function processBorrowing(payload: any): Promise<void> {
  const verified = await verifyInventory(payload);
  const borrowing = await createBorrowingEntry(verified);
  await sendConfirmation(borrowing);
}

4. Task queue registration:

extractActivities() extrae y bindea automaticamente todos los metodos de la clase. No necesitas listarlos manualmente.

// modules/borrowing/temporal/borrowing.task-queue.ts
import { join } from 'path';
import { TaskQueueConfig, TaskQueueRegistration, extractActivities } from '../../../libs/temporal';
import { BorrowingActivities } from './borrowing.activities';
import { BORROWING_TASK_QUEUE } from './borrowing.constants';

export const BorrowingTaskQueueRegistration: TaskQueueRegistration = {
  provide: `TASK_QUEUE_${BORROWING_TASK_QUEUE}`,
  useFactory: (activities: BorrowingActivities): TaskQueueConfig => ({
    taskQueue: BORROWING_TASK_QUEUE,
    workflowsPath: join(__dirname, 'borrowing.workflow'),
    activities: extractActivities(activities),
  }),
  inject: [BorrowingActivities],
};

5. Modulo:

@Module({
  imports: [TemporalModule.forFeature([BorrowingTaskQueueRegistration])],
  providers: [BorrowingService, BorrowingActivities],
})
export class BorrowingModule {}

6. Servicio:

await this.temporal.startWorkflow({
  workflow: BORROWING_WORKFLOW,
  workflowId: `process-borrowing-${borrowing.id}`,
  taskQueue: BORROWING_TASK_QUEUE,
  args: [borrowing],
});

Caso 2: Activities externas dentro de un workflow TS

Este es el patron clave para cross-language. Un workflow TypeScript puede llamar activities que corren en un worker de Python, Go o cualquier otro lenguaje usando proxyActivities con un taskQueue diferente.

Como funciona

Workflow TS (worker TEMPORAL_BOOKSTORE)
  |
  |-- verifyInventory()        --> activity local (TS worker)
  |-- createBorrowingEntry()   --> activity local (TS worker)
  |-- getBookRecommendations() --> activity remota (Python worker ml-python-workers)
  |-- sendConfirmation()       --> activity local (TS worker)

Temporal se encarga del ruteo: cuando el workflow llama una activity con un taskQueue diferente, Temporal la despacha al worker correcto automaticamente.

Paso 1: Crear la interfaz de las activities externas

Este archivo no tiene implementacion. Solo define el contrato (nombres de funcion, parametros, respuesta) que el worker de Python debe cumplir.

// modules/borrowing/temporal/borrowing.external-activities.ts

export interface BookRecommendation {
  bookId: string;
  title: string;
  score: number;
}

export interface ReadingPattern {
  favoriteGenres: string[];
  avgBooksPerMonth: number;
  preferredAuthors: string[];
}

export interface MLPythonActivities {
  getBookRecommendations(params: {
    customerId: string;
    bookId: string;
  }): Promise<BookRecommendation[]>;

  analyzeReadingPattern(params: {
    customerId: string;
  }): Promise<ReadingPattern>;
}

Paso 2: Agregar la constante del task queue externo

// modules/borrowing/temporal/borrowing.constants.ts
export const BORROWING_TASK_QUEUE = 'TEMPORAL_BOOKSTORE';
export const BORROWING_WORKFLOW = 'processBorrowing';

export const ML_PYTHON_TASK_QUEUE = 'ml-python-workers';

Paso 3: Usar proxyActivities con el task queue externo en el workflow

// modules/borrowing/temporal/borrowing.workflow.ts
import { proxyActivities } from '@temporalio/workflow';
import type { BorrowingActivities } from './borrowing.activities';
import type { MLPythonActivities } from './borrowing.external-activities';
import { ML_PYTHON_TASK_QUEUE } from './borrowing.constants';

// Activities locales -- corren en el worker de TypeScript (este proyecto)
const { verifyInventory, createBorrowingEntry, sendConfirmation } =
  proxyActivities<BorrowingActivities>({
    startToCloseTimeout: '30 minutes',
    retry: { maximumAttempts: 5, initialInterval: '1 minute' },
  });

// Activities externas -- corren en el worker de Python
const { getBookRecommendations } = proxyActivities<MLPythonActivities>({
  taskQueue: ML_PYTHON_TASK_QUEUE,
  startToCloseTimeout: '10 minutes',
  retry: { maximumAttempts: 3, initialInterval: '30 seconds' },
});

export async function processBorrowing(payload: any): Promise<void> {
  const verified = await verifyInventory(payload);
  const borrowing = await createBorrowingEntry(verified);

  // Llama al worker de Python para obtener recomendaciones
  const recommendations = await getBookRecommendations({
    customerId: borrowing.customerEmail,
    bookId: borrowing.bookId,
  });

  await sendConfirmation({ ...borrowing, recommendations });
}

La diferencia clave

Activity local Activity externa
proxyActivities Sin taskQueue (usa el del workflow) Con taskQueue apuntando al worker externo
Implementacion Clase @Injectable() en TypeScript En Python/Go/Java (otro proceso)
Archivo TS borrowing.activities.ts (clase) borrowing.external-activities.ts (solo interfaz)
Registro en task-queue.ts Si, via extractActivities() No, Temporal lo rutea automaticamente

Que debe hacer el worker de Python

El worker de Python debe registrar activities con exactamente los mismos nombres que la interfaz TypeScript:

# worker_python.py (ejemplo)
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

@activity.defn(name="getBookRecommendations")
async def get_book_recommendations(params: dict) -> list:
    customer_id = params["customerId"]
    book_id = params["bookId"]
    # ... logica de ML ...
    return [{"bookId": "...", "title": "...", "score": 0.95}]

@activity.defn(name="analyzeReadingPattern")
async def analyze_reading_pattern(params: dict) -> dict:
    customer_id = params["customerId"]
    # ... analisis ...
    return {"favoriteGenres": ["sci-fi"], "avgBooksPerMonth": 3, "preferredAuthors": []}

async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
        client,
        task_queue="ml-python-workers",
        activities=[get_book_recommendations, analyze_reading_pattern],
    )
    await worker.run()

Lo importante es que:

  • El task_queue del worker Python coincida con ML_PYTHON_TASK_QUEUE
  • Los nombres de las activities (name="getBookRecommendations") coincidan con los nombres de la interfaz TypeScript
  • La estructura de parametros y respuesta sea compatible (JSON serializable)

Caso 3: Workflow externo completo (Python, Go, Java)

Cuando el workflow completo corre en otro lenguaje (no solo las activities), no necesitas definir nada en TypeScript excepto constantes e interfaces. Solo usas TemporalService para dispararlo.

1. Constantes e interfaces:

// modules/etl/temporal/etl.constants.ts
export const ETL_TASK_QUEUE = 'ave-etl-py';

export const ETL_WORKFLOWS = {
  SYNC_MATERIALS: 'sync_materials',
  SYNC_MATERIAL_STOCK: 'sync_material_stock',
} as const;
// modules/etl/temporal/etl.interfaces.ts
export interface SyncMaterialsParams {
  env_key: string;
}

export interface SyncResult {
  total: number;
  inserted: number;
  updated: number;
  errors: number;
}

2. Servicio:

@Injectable()
export class EtlService {
  constructor(private readonly temporal: TemporalService) {}

  // Fire-and-forget
  async triggerMaterialSync(params: SyncMaterialsParams) {
    await this.temporal.startWorkflow({
      workflow: ETL_WORKFLOWS.SYNC_MATERIALS,
      taskQueue: ETL_TASK_QUEUE,
      args: [params],
    });
  }

  // Esperar resultado
  async syncMaterialsAndWait(params: SyncMaterialsParams): Promise<SyncResult> {
    return this.temporal.executeWorkflow<SyncResult>({
      workflow: ETL_WORKFLOWS.SYNC_MATERIALS,
      taskQueue: ETL_TASK_QUEUE,
      args: [params],
    });
  }
}

3. Modulo (sin forFeature porque no hay workers TS):

@Module({
  providers: [EtlService],
})
export class EtlModule {}
// TemporalService esta disponible globalmente gracias a forRoot() en AppModule.

Caso 4: Solo cliente sin workers

Si tu servicio NestJS solo necesita llamar workflows (no ejecutarlos), usa forClient():

@Module({
  imports: [TemporalModule.forClient()],
})
export class AppModule {}

Util cuando:

  • Todos tus workflows corren en workers externos
  • Tienes un servicio NestJS que solo dispara workflows
  • No necesitas levantar workers TypeScript

Caso 5: Uso avanzado del Client

@Injectable()
export class OrderService {
  constructor(private readonly temporal: TemporalService) {}

  async approveOrder(workflowId: string) {
    const client = this.temporal.getClient();
    const handle = client.workflow.getHandle(workflowId);
    await handle.signal('approve');
  }

  async getOrderStatus(workflowId: string) {
    const client = this.temporal.getClient();
    const handle = client.workflow.getHandle(workflowId);
    return handle.query('getStatus');
  }

  async cancelOrder(workflowId: string) {
    const client = this.temporal.getClient();
    const handle = client.workflow.getHandle(workflowId);
    await handle.cancel();
  }

  async createDailySync() {
    const client = this.temporal.getClient();
    await client.schedule.create({
      scheduleId: 'daily-sync',
      spec: { intervals: [{ every: '24h' }] },
      action: {
        type: 'startWorkflow',
        workflowType: 'sync_materials',
        taskQueue: 'ave-etl-py',
        args: [{ env_key: 'production' }],
      },
    });
  }
}

Guia: Agregar un nuevo Workflow TS

1. Crear temporal/ dentro del modulo

src/modules/<tu-modulo>/
  temporal/
    index.ts
    <nombre>.constants.ts
    <nombre>.activities.ts
    <nombre>.external-activities.ts   # solo si usas activities externas
    <nombre>.workflow.ts
    <nombre>.task-queue.ts

2. Constantes

export const MI_TASK_QUEUE = 'MI_TASK_QUEUE';
export const MI_WORKFLOW = 'miWorkflow';
export const EXTERNAL_TASK_QUEUE = 'external-workers';  // si aplica

3. Activities locales

@Injectable()
export class MiActivities {
  async paso1(data: any) { /* ... */ }
  async paso2(data: any) { /* ... */ }
}

4. Interfaces de activities externas (si aplica)

export interface ExternalActivities {
  procesarEnPython(params: { key: string }): Promise<{ result: string }>;
}

5. Workflow

import { proxyActivities } from '@temporalio/workflow';
import type { MiActivities } from './<nombre>.activities';
import type { ExternalActivities } from './<nombre>.external-activities';
import { EXTERNAL_TASK_QUEUE } from './<nombre>.constants';

const { paso1, paso2 } = proxyActivities<MiActivities>({
  startToCloseTimeout: '10 minutes',
});

const { procesarEnPython } = proxyActivities<ExternalActivities>({
  taskQueue: EXTERNAL_TASK_QUEUE,
  startToCloseTimeout: '5 minutes',
});

export async function miWorkflow(data: any) {
  const r1 = await paso1(data);
  const r2 = await procesarEnPython({ key: r1.id });
  return await paso2(r2);
}

6. Task queue registration

Usa extractActivities() para registrar automaticamente todos los metodos de la clase. No necesitas listarlos manualmente ni actualizarlos cuando agregas nuevas activities.

import { extractActivities } from '../../../libs/temporal';

export const MiTaskQueueRegistration: TaskQueueRegistration = {
  provide: `TASK_QUEUE_${MI_TASK_QUEUE}`,
  useFactory: (activities: MiActivities): TaskQueueConfig => ({
    taskQueue: MI_TASK_QUEUE,
    workflowsPath: join(__dirname, '<nombre>.workflow'),
    activities: extractActivities(activities),
    // Las activities externas NO se registran aqui -- Temporal las rutea solo
  }),
  inject: [MiActivities],
};

7. Modulo

@Module({
  imports: [TemporalModule.forFeature([MiTaskQueueRegistration])],
  providers: [MiService, MiActivities],
})
export class MiModule {}

8. Servicio

await this.temporal.startWorkflow({
  workflow: MI_WORKFLOW,
  taskQueue: MI_TASK_QUEUE,
  args: [data],
});

Infraestructura Local

Levantar Temporal Server

cd dev/temporal
docker compose up -d

Esto levanta:

  • Temporal Server en localhost:7233
  • Temporal UI en localhost:8080
  • PostgreSQL para persistencia de Temporal

Variables de entorno

DATABASE_URL=postgres://nexus:nuMberone@localhost:5432/temporal_library
TEMPORAL_ADDRESS=localhost:7233
TEMPORAL_NAMESPACE=default

Scripts

yarn dev                 # Desarrollo con watch
yarn build               # Compilar
yarn prod                # Ejecutar compilado
yarn migrate             # Ejecutar migraciones
yarn migration:generate  # Generar migracion
yarn test                # Tests unitarios
yarn test:e2e            # Tests e2e
yarn lint                # ESLint con auto-fix
yarn format              # Prettier

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors