Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
},
"dependencies": {
"@aws-sdk/client-s3": "^3.962.0",
"amqplib": "^0.10.9",
"bcrypt": "^6.0.0",
"cors": "^2.8.5",
"dotenv": "^17.2.3",
Expand All @@ -28,6 +29,7 @@
"@commitlint/cli": "^20.2.0",
"@commitlint/config-conventional": "^20.2.0",
"@eslint/js": "^9.39.2",
"@types/amqplib": "^0.10.8",
"@types/bcrypt": "^6.0.0",
"@types/cors": "^2.8.19",
"@types/express": "^5.0.6",
Expand Down
4 changes: 2 additions & 2 deletions backend/src/Services/authToken.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { appDataSouce } from '../data-source';
import { appDataSource } from '../data-source';
import { RefreshTokenEntity } from '../entities/refreshToken';
import { User } from '../entities/User';
import { generateRefreshToken, hashRefreshToken } from './refreshToken';
Expand All @@ -15,7 +15,7 @@ export const createRefreshTokenSession = async (user: User) => {

expiresAt.setDate(expiresAt.getDate() + REFRESH_TOKEN_DAYS);

const repo = appDataSouce.getRepository(RefreshTokenEntity);
const repo = appDataSource.getRepository(RefreshTokenEntity);
console.log('Got refresh token repository');

await repo.save({
Expand Down
4 changes: 2 additions & 2 deletions backend/src/data-source.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import 'dotenv/config';
import { DataSource } from 'typeorm';
import { User } from './entities/User';
import { Otp } from './entities/opt';
import { Otp } from './entities/otp';
import { RefreshTokenEntity } from './entities/refreshToken';
if (!process.env.DATABASE_URL) {
throw new Error('DATABASE_URL is not defined');
}
export const appDataSouce = new DataSource({
export const appDataSource = new DataSource({
type: 'postgres',
url: process.env.DATABASE_URL,
ssl: {
Expand Down
File renamed without changes.
5 changes: 5 additions & 0 deletions backend/src/messaging/jobTypes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface SendOtpJob {
phone: string;
otp: string;
purpose: 'login' | 'register' | 'reset_password';
}
36 changes: 36 additions & 0 deletions backend/src/messaging/rabbitmq/connect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import * as amqp from "amqplib";
import { Channel } from "amqplib";
import { QUEUES } from "./queues";

const RABBITMQ_URL =
process.env.RABBITMQ_URL || "amqp://guest:guest@localhost:5672";

let channel: Channel | null = null;

export const connectRabbitMQ = async (): Promise<void> => {
if (channel) return;

try {
console.log("🔌 Connecting to RabbitMQ...");

const connection = await amqp.connect(RABBITMQ_URL);
channel = await connection.createChannel();

// declare queues ONCE
for (const queue of Object.values(QUEUES)) {
await channel.assertQueue(queue, { durable: true });
}

console.log("RabbitMQ connected");
} catch (err) {
console.error("RabbitMQ connection failed", err);
process.exit(1);
}
};

export const getChannel = (): Channel => {
if (!channel) {
throw new Error("RabbitMQ channel not initialized");
}
return channel;
};
33 changes: 33 additions & 0 deletions backend/src/messaging/rabbitmq/consume/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { connectRabbitMQ, getChannel } from "../connect";
import { QUEUES } from "../queues";
import { SendOtpJob } from "../../jobTypes";

const startOtpWorker = async () => {
await connectRabbitMQ();
const channel = getChannel();

channel.prefetch(1); // process one OTP at a time

console.log("OTP Worker running");

channel.consume(QUEUES.SEND_OTP, async (msg) => {
if (!msg) return;

try {
const job: SendOtpJob = JSON.parse(msg.content.toString());

console.log(`Sending OTP ${job.otp} to ${job.phone}`);

// Here is where Twilio / SMS provider goes
await new Promise((r) => setTimeout(r, 1500));

channel.ack(msg);
console.log("OTP sent");
} catch (err) {
console.error("OTP failed", err);
channel.nack(msg, false, false);
}
});
};

startOtpWorker();
17 changes: 17 additions & 0 deletions backend/src/messaging/rabbitmq/publish.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { getChannel } from "./connect";
import { QUEUES, QueueKey } from "./queues";

export const publish = async <T>(
queue: QueueKey,
payload: T
): Promise<void> => {
const channel = getChannel();

channel.sendToQueue(
QUEUES[queue],
Buffer.from(JSON.stringify(payload)),
{ persistent: true }
);

console.log("📤 OTP job queued");
};
5 changes: 5 additions & 0 deletions backend/src/messaging/rabbitmq/queues.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const QUEUES = {
SEND_OTP: "send_otp_queue",
} as const;

export type QueueKey = keyof typeof QUEUES;
22 changes: 14 additions & 8 deletions backend/src/modules/auth/auth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ import { logger } from '../../utils/logger';
import { registerSchema, phoneSchema, loginSchema } from './auth.schema';
import { sendOtpSms } from '../../Services/sms.service';
import { generateotp } from '../../utils/otp';
import { appDataSouce } from '../../data-source';
import { Otp } from '../../entities/opt';
import { appDataSource } from '../../data-source';
import { Otp } from '../../entities/otp';
import { User } from '../../entities/User';
import { signAccessToken } from '../../Services/jwt.service';
import { createRefreshTokenSession } from '../../Services/authToken';
import bcrypt from 'bcrypt';
// import { publish } from '../../messaging/rabbitmq/publish';

export const sendOtp = async (
req: Request,
Expand All @@ -28,8 +29,8 @@ export const sendOtp = async (

const phoneNumber = result.data.phoneNumber;

const userRepo = appDataSouce.getRepository(User);
const otpRepo = appDataSouce.getRepository(Otp);
const userRepo = appDataSource.getRepository(User);
const otpRepo = appDataSource.getRepository(Otp);

const existingUser = await userRepo.findOne({
where: { phoneNumber },
Expand All @@ -52,6 +53,11 @@ export const sendOtp = async (

await sendOtpSms(phoneNumber, otpCode.toString());

// await publish('SEND_OTP', {
// phone: phoneNumber,
// otp: otpCode.toString(),
// });

await otpRepo.delete({ phoneNumber });
await otpRepo.save({
phoneNumber,
Expand Down Expand Up @@ -83,7 +89,7 @@ export const verifyotp = async (
.status(400)
.json({ message: ' otp and phoneNumber are required' });
}
const otpRepo = appDataSouce.getRepository(Otp);
const otpRepo = appDataSource.getRepository(Otp);
const otpRecord = await otpRepo.findOne({
where: {
phoneNumber,
Expand Down Expand Up @@ -155,8 +161,8 @@ export const register = async (
});
}

const otpRepo = appDataSouce.getRepository(Otp);
const userRepo = appDataSouce.getRepository(User);
const otpRepo = appDataSource.getRepository(Otp);
const userRepo = appDataSource.getRepository(User);

const otpRecord = await otpRepo.findOne({
where: { id: otpId },
Expand Down Expand Up @@ -231,7 +237,7 @@ export const login = async (

const { phoneNumber, password } = result.data;

const userRepo = appDataSouce.getRepository(User);
const userRepo = appDataSource.getRepository(User);

// 2️⃣ find user
const user = await userRepo.findOne({
Expand Down
6 changes: 3 additions & 3 deletions backend/src/modules/health/health.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import express from 'express';
import { appDataSouce } from '../../data-source';
import { appDataSource } from '../../data-source';
const Healthrouter = express.Router();
Healthrouter.get('/', async (req, res) => {
try {
if (!appDataSouce.isInitialized) {
if (!appDataSource.isInitialized) {
return res.status(503).json({
status: 'eroor',
service: 'mysocial-code-backend',
db: 'not initialized',
});
}
await appDataSouce.query('SELECT 1');
await appDataSource.query('SELECT 1');
return res.status(200).json({
status: 'ok',
service: 'mysocial-code-backend',
Expand Down
4 changes: 2 additions & 2 deletions backend/src/modules/user/user.controller.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { NextFunction, Request, Response } from 'express';
import { PutObjectCommand } from '@aws-sdk/client-s3';
import { r2 } from '../../utils/r2';
import { appDataSouce } from '../../data-source';
import { appDataSource } from '../../data-source';
import { User } from '../../entities/User';
import { logger } from '../../utils/logger';
export const uploadAvatar = async (
Expand All @@ -25,7 +25,7 @@ export const uploadAvatar = async (
}),
);
const imageUrl = `${process.env.R2_ENDPOINT}/${process.env.R2_BUCKET_NAME}/${key}`;
const userRepo = appDataSouce.getRepository(User);
const userRepo = appDataSource.getRepository(User);
await userRepo.update(userId, { profileImageUrl: imageUrl });
res
.status(200)
Expand Down
4 changes: 2 additions & 2 deletions backend/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import dotenv from 'dotenv';
dotenv.config();
import app from './app';
import { logger } from './utils/logger';
import { appDataSouce } from './data-source';
import { appDataSource } from './data-source';
if (!process.env.PORT) {
throw new Error('PORT is not defined in environment variables');
}

(async () => {
try {
await appDataSouce.initialize();
await appDataSource.initialize();
logger.info('database connected success fully');
} catch (error) {
logger.error({ err: error }, 'error connecting the database');
Expand Down
2 changes: 1 addition & 1 deletion frontend/lib/api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import axios from 'axios';
const api = axios.create({
baseURL: 'http://10.10.1.200:4000',
baseURL: 'http://172.28.32.1:4000',
timeout: 20000,
headers: {
'Content-Type': 'application/json',
Expand Down
Loading