Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Cleanup codebase #78

Merged
merged 5 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions examples/blog.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
"_meta": {
"topic": "mz_datagen_blog_users",
"key": "id",
"relationships": [
{
"relationships": [
{
"topic": "mz_datagen_blog_posts",
"parent_field": "id",
"child_field": "user_id",
"records_per": 2
}
]
]
},
"id": "datatype.number(100)",
"name": "internet.userName",
Expand All @@ -24,14 +24,14 @@
"_meta": {
"topic": "mz_datagen_blog_posts",
"key": "id",
"relationships": [
{
"relationships": [
{
"topic": "mz_datagen_blog_comments",
"parent_field": "id",
"child_field": "post_id",
"records_per": 2
}
]
]
},
"id": "datatype.number(1000)",
"user_id": "datatype.number(100)",
Expand All @@ -58,4 +58,4 @@
"views": "datatype.number({\"min\": 100, \"max\": 1000})",
"status": "datatype.number(1)"
}
]
]
12 changes: 6 additions & 6 deletions examples/ecommerce.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
"_meta": {
"topic": "mz_datagen_ecommerce_users",
"key": "id",
"relationships": [
{
"relationships": [
{
"topic": "mz_datagen_ecommerce_purchases",
"parent_field": "id",
"child_field": "user_id",
"records_per": 4
}
]
]
},
"id": "datatype.number(100)",
"name": "internet.userName",
Expand All @@ -24,14 +24,14 @@
"_meta": {
"topic": "mz_datagen_ecommerce_purchases",
"key": "id",
"relationships": [
"relationships": [
{
"topic": "mz_datagen_ecommerce_items",
"parent_field": "item_id",
"child_field": "id",
"records_per": 1
}
]
]
},
"id": "datatype.number(1000)",
"user_id": "datatype.number(100)",
Expand All @@ -49,4 +49,4 @@
"material": "commerce.productMaterial",
"created_at": "date.past(5)"
}
]
]
129 changes: 21 additions & 108 deletions src/dataGenerator.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import alert from 'cli-alerts';
import crypto from 'crypto';
import createTopic from './kafka/createTopic.js';
import schemaRegistryConfig from './kafka/schemaRegistryConfig.js';
import { kafkaProducer, connectKafkaProducer, disconnectKafkaProducer } from './kafka/producer.js';
import {
getAvroEncodedRecord,
registerSchema,
getAvroSchema
} from './schemas/schemaRegistry.js';

import { KafkaProducer } from './kafka/producer.js';
import { generateMegaRecord } from './schemas/generateMegaRecord.js';
import { OutputFormat } from './formats/outputFormat.js';
import { AvroFormat } from './formats/avroFormat.js';
import { JsonFormat } from './formats/jsonFormat.js';

async function* asyncGenerator(number: number) {
let i = 0;
Expand Down Expand Up @@ -37,56 +32,6 @@ function sleep(s: number) {
return new Promise(resolve => setTimeout(resolve, s));
}

async function prepareTopic(topic: string) {
if (global.dryRun) {
alert({
type: `success`,
name: `Dry run: Skipping topic creation...`,
msg: ``
});
return;
}

alert({
type: `success`,
name: `Creating Kafka topics...`,
msg: ``
});

try {
await createTopic(topic);
alert({
type: `success`,
name: `Created topic ${topic}`,
msg: ``
});
} catch (error) {
alert({
type: `error`,
name: `Error creating Kafka topic, try creating it manually...`,
msg: `\n ${error.message}`
});
process.exit(0);
}
}

async function prepareSchema(megaRecord: any, topic: any, registry: any, avroSchemas: any) {
alert({
type: `success`,
name: `Registering Avro schema...`,
msg: ``
});
let avroSchema = await getAvroSchema(
topic,
megaRecord[topic].records[0]
);
let schemaId = await registerSchema(avroSchema, registry);
avroSchemas[topic] = {};
avroSchemas[topic]['schemaId'] = schemaId;
avroSchemas[topic]['schema'] = avroSchema;
return avroSchemas;
}

export default async function dataGenerator({
format,
schema,
Expand All @@ -103,77 +48,45 @@ export default async function dataGenerator({
payload = crypto.randomBytes(global.recordSize).toString('hex');
}

let registry;
let producer;
let avroSchemas = {};
if(global.dryRun !== true){
producer = await connectKafkaProducer();
let producer: KafkaProducer | null = null;
if (global.dryRun !== true) {
let outputFormat: OutputFormat;
if (format === 'avro') {
outputFormat = await AvroFormat.create();
} else if (format === 'json') {
outputFormat = new JsonFormat();
}

producer = await KafkaProducer.create(outputFormat);
}

for await (const iteration of asyncGenerator(number)) {
global.iterationIndex = iteration;
let megaRecord = await generateMegaRecord(schema);

if (iteration == 0) {
if (format == 'avro') {
if (global.dryRun) {
alert({
type: `success`,
name: `Dry run: Skipping schema registration...`,
msg: ``
});
} else {
registry = await schemaRegistryConfig();
}
}
for (const topic in megaRecord) {
await prepareTopic(topic);
if (format == 'avro' && global.dryRun !== true) {
avroSchemas = await prepareSchema(
megaRecord,
topic,
registry,
avroSchemas
);
}
await producer?.prepare(topic, megaRecord);
}
}

for (const topic in megaRecord) {
for await (const record of megaRecord[topic].records) {
let encodedRecord = null;
let recordKey = null;
let key = null;
if (record[megaRecord[topic].key]) {
recordKey = record[megaRecord[topic].key];
key = record[megaRecord[topic].key];
}

if (global.recordSize) {
record.recordSizePayload = payload;
}

if (global.dryRun) {
alert({
type: `success`,
name: `Dry run: Skipping record production...`,
msg: `\n Topic: ${topic} \n Record key: ${recordKey} \n Payload: ${JSON.stringify(
sjwiesman marked this conversation as resolved.
Show resolved Hide resolved
record
)}`
});
} else {
if (format == 'avro') {
encodedRecord = await getAvroEncodedRecord(
record,
registry,
avroSchemas[topic]['schemaId']
);
}
await kafkaProducer(producer, recordKey, record, encodedRecord, topic);
}
await producer?.send(key, record, topic);
}
}

await sleep(global.wait);
}
if (global.dryRun !== true) {
await disconnectKafkaProducer(producer);
}

await producer?.close();
};
69 changes: 69 additions & 0 deletions src/formats/avroFormat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { SchemaRegistry, SchemaType } from "@kafkajs/confluent-schema-registry";
import { Env } from "../utils/env";
sjwiesman marked this conversation as resolved.
Show resolved Hide resolved
import { OutputFormat } from "./outputFormat";

export class AvroFormat implements OutputFormat {
private schemas: any = {};
private registry: SchemaRegistry;
chuck-alt-delete marked this conversation as resolved.
Show resolved Hide resolved

static async create(): Promise<AvroFormat> {
const url = Env.required("SCHEMA_REGISTRY_URL");
const username = Env.optional("SCHEMA_REGISTRY_USERNAME", null);
const password = Env.optional("SCHEMA_REGISTRY_PASSWORD", null);

const configuration = {
host: url
};

if (password && username) {
configuration["auth"] = {
username: username,
password: password
};
}

const registry = new SchemaRegistry(configuration);
return new AvroFormat(registry);
}

constructor(registry: SchemaRegistry) {
this.registry = registry;
}

async register(schema: any, topic: string): Promise<void> {
const options = { subject: `${schema["name"]}-value` }
try {
const resp = await this.registry.register({
type: SchemaType.AVRO,
schema: JSON.stringify(schema)
},
options
)

alert({
type: `success`,
name: `Schema registered!`,
msg: `Subject: ${options.subject}, ID: ${resp.id}`
});

this.schemas[topic] = {
'schemaId': resp.id,
'schema': schema
};
} catch (error) {
alert({
type: `error`,
name: `Failed to register schema.`,
msg: `${error}`
});

process.exit(1);
}
}

async encode(record: any, topic: string): Promise<Buffer> {
const schemaId = this.schemas[topic]['schemaId']
const encodedRecord = await this.registry.encode(schemaId, record);
return encodedRecord;
}
}
13 changes: 13 additions & 0 deletions src/formats/jsonFormat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { OutputFormat } from "./outputFormat";

export class JsonFormat implements OutputFormat {

register(schema: any, topic: string): Promise<void> {
return Promise.resolve();
}

encode(record: any, _: string): Promise<Buffer> {
const value = JSON.stringify(record);
return Promise.resolve(Buffer.from(value));
}
}
5 changes: 5 additions & 0 deletions src/formats/outputFormat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface OutputFormat {
register(schema: any, topic: string): Promise<void>;

encode(record: any, topic: string): Promise<Buffer>;
}
13 changes: 4 additions & 9 deletions src/kafka/cleanKafka.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import kafkaConfig from './kafkaConfig.js';
import axios from 'axios';
import dotenv from 'dotenv';
import alert from 'cli-alerts';
import { Env } from '../utils/env.js';

async function deleteSchemaSubjects(topics: any): Promise<void> {
dotenv.config();
if (!process.env.SCHEMA_REGISTRY_URL) {
console.error("Please set SCHEMA_REGISTRY_URL");
process.exit();
}
const schemaRegistryUrl = Env.required("SCHEMA_REGISTRY_URL");

for await (const topic of topics) {
let url = `${process.env.SCHEMA_REGISTRY_URL}/subjects/${topic}-value?permanent=false`;
let url = `${schemaRegistryUrl}/subjects/${topic}-value?permanent=false`;
await axios.delete(
url,
{
Expand All @@ -33,7 +30,6 @@ async function deleteSchemaSubjects(topics: any): Promise<void> {
}

export default async function cleanKafka(format: string, topics: any): Promise<void> {

if (global.dryRun) {
console.log("This is a dry run, so no resources will be deleted")
return
Expand Down Expand Up @@ -66,5 +62,4 @@ export default async function cleanKafka(format: string, topics: any): Promise<v
} else {
await deleteSchemaSubjects(topics);
}

};
1 change: 0 additions & 1 deletion src/kafka/createTopic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ export default async function createTopic(topic: string): Promise<void> {
const topics = await admin.listTopics();

if (!topics.includes(topic)) {

let replicationFactor = await getReplicationFactor(admin);

let topicConfigs = [
Expand Down
Loading