Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue 79 #80

Merged
merged 2 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/dataGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ export default async function dataGenerator({
let megaRecord = await generateMegaRecord(schema);

if (iteration == 0) {
for (const topic in megaRecord) {
await producer?.prepare(topic, megaRecord);
}
await producer?.prepare(megaRecord);
}

for (const topic in megaRecord) {
Expand Down
97 changes: 70 additions & 27 deletions src/formats/avroFormat.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { SchemaRegistry, SchemaType } from "@kafkajs/confluent-schema-registry";
import { Env } from "../utils/env.js";
import { OutputFormat } from "./outputFormat";
import alert from 'cli-alerts';
import avroTypes from '@avro/types';
const { Type } = avroTypes;

export class AvroFormat implements OutputFormat {
private schemas: any = {};
Expand Down Expand Up @@ -29,35 +32,75 @@ export class AvroFormat implements OutputFormat {
constructor(registry: SchemaRegistry) {
this.registry = registry;
}

private nameHook() {
let index = 0;
return function (schema, opts) {
switch (schema.type) {
case 'enum':
case 'fixed':
case 'record':
schema.name = `Auto${index++}`;
break;
default:
}
};
}

// @ts-ignore
async getAvroSchemas(megaRecord: any) {
let avroSchemas = {};
for (let topic in megaRecord) {
// @ts-ignore
let avroSchema = Type.forValue(megaRecord[topic].records[0], { typeHook: this.nameHook() }).schema();
avroSchema["name"] = topic
avroSchema["namespace"] = "com.materialize"

if (global.debug) {
alert({
type: `success`,
name: `Avro Schema:`,
msg: `\n ${JSON.stringify(avroSchema, null, 2)}`
});
}

async register(schema: any, topic: string): Promise<void> {
const options = { subject: `${schema["name"]}-value` }
try {
const resp = await this.registry.register({
type: SchemaType.AVRO,
schema: JSON.stringify(schema)
},
options
)

alert({
type: `success`,
name: `Schema registered!`,
msg: `Subject: ${options.subject}, ID: ${resp.id}`
});

this.schemas[topic] = {
'schemaId': resp.id,
'schema': schema
};
} catch (error) {
alert({
type: `error`,
name: `Failed to register schema.`,
msg: `${error}`
});
avroSchemas[topic] = avroSchema;
}
return avroSchemas;
}

process.exit(1);
async register(megaRecord: any): Promise<void> {
const avroSchemas = await this.getAvroSchemas(megaRecord);
for (const topic in avroSchemas) {
let options = { subject: `${topic}-value` }
let avroSchema = avroSchemas[topic]
try {
const resp = await this.registry.register({
type: SchemaType.AVRO,
schema: JSON.stringify(avroSchema)
},
options
)

alert({
type: `success`,
name: `Schema registered!`,
msg: `Subject: ${options.subject}, ID: ${resp.id}`
});

this.schemas[topic] = {
'schemaId': resp.id,
'schema': avroSchema
};
} catch (error) {
alert({
type: `error`,
name: `Failed to register schema.`,
msg: `${error}`
});

process.exit(1);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/formats/jsonFormat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { OutputFormat } from "./outputFormat";

export class JsonFormat implements OutputFormat {

register(schema: any, topic: string): Promise<void> {
register(megaRecord: any): Promise<void> {
return Promise.resolve();
}

Expand Down
2 changes: 1 addition & 1 deletion src/formats/outputFormat.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface OutputFormat {
register(schema: any, topic: string): Promise<void>;
register(megaRecord: any): Promise<void>;

encode(record: any, topic: string): Promise<Buffer>;
}
69 changes: 38 additions & 31 deletions src/kafka/createTopic.ts → src/kafka/createTopics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,56 @@ const { ConfigResourceTypes } = pkg;
import kafkaConfig from './kafkaConfig.js';
import alert from 'cli-alerts';

export default async function createTopic(topic: string): Promise<void> {
export default async function createTopics(megaRecord: any): Promise<void> {
const kafka = await kafkaConfig();

if (global.debug) {
console.log(`Trying to create topic: ${topic}`);
}

if (global.prefix) {
topic = `${global.prefix}_${topic}`;
alert({
type: `success`,
name: `Using topic with prefix: ${topic}`,
msg: ``
});
}

// Check if the topic exists in the Kafka cluster if not create it
const admin = kafka.admin();
await admin.connect();
const topics = await admin.listTopics();
let topicConfigs = [];
let replicationFactor = await getReplicationFactor(admin);

if (!topics.includes(topic)) {
let replicationFactor = await getReplicationFactor(admin);
for (const topic in megaRecord) {

let topicConfigs = [
{
topic: topic,
numPartitions: 1,
replicationFactor: replicationFactor,
configEntries: [
{
name: 'cleanup.policy',
value: 'delete'
}
]
}
];
if (!topics.includes(topic)) {
alert({
type: `success`,
name: `Attempting to create topic ${topic}`,
msg: ``
});
topicConfigs.push(
{
topic: topic,
numPartitions: 1,
replicationFactor: replicationFactor,
configEntries: [
{
name: 'cleanup.policy',
value: 'delete'
}
]
});
}
}

try {
await admin
.createTopics({ validateOnly: false, topics: topicConfigs })
.finally(() => admin.disconnect());
alert({
type: `success`,
name: `Created topics!`,
msg: ``
});
} catch (error) {
alert({
type: `error`,
name: `Error creating Kafka topic, try creating it manually...`,
msg: `\n ${error.message}`
});
process.exit(1);
}

await admin.disconnect();
};

async function getReplicationFactor(admin: any) {
Expand Down
29 changes: 4 additions & 25 deletions src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Partitioners, Producer } from 'kafkajs';
import kafkaConfig from './kafkaConfig.js';
import alert from 'cli-alerts';
import { OutputFormat } from '../formats/outputFormat.js';
import createTopic from './createTopic.js';
import createTopics from './createTopics.js';

export class KafkaProducer {
private producer: Producer;
Expand All @@ -26,30 +26,9 @@ export class KafkaProducer {
this.format = format;
}

async prepare(topic: string, schema: any): Promise<void> {
alert({
type: `success`,
name: `Creating Kafka topics...`,
msg: ``
});

try {
await createTopic(topic);
alert({
type: `success`,
name: `Created topic ${topic}`,
msg: ``
});

await this.format.register(schema, topic);
} catch (error) {
alert({
type: `error`,
name: `Error creating Kafka topic, try creating it manually...`,
msg: `\n ${error.message}`
});
process.exit(0);
}
async prepare(megaRecord: any): Promise<void> {
await createTopics(megaRecord);
await this.format.register(megaRecord);
}

async send(key: any, value: any, topic: string) {
Expand Down
43 changes: 24 additions & 19 deletions src/schemas/generateMegaRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,23 @@ export async function generateMegaRecord(schema: any) {
let megaRecord = {} as any;
for (const table of schema) {
const { _meta, ...fakerRecord } = table;
let topic = _meta.topic;
if (global.prefix) {
topic = `${global.prefix}_${topic}`
}

// populate the initial record for the topic
if (!megaRecord[_meta.topic]) {
megaRecord[_meta.topic] = { "key": null, "records": [] };
if (!megaRecord[topic]) {
megaRecord[topic] = { "key": null, "records": [] };
let newRecord = await generateRandomRecord(fakerRecord);
megaRecord[_meta.topic].records.push(newRecord);
megaRecord[topic].records.push(newRecord);
}

// specify the key field for the topic
if ("key" in _meta) {
megaRecord[_meta.topic]["key"] = _meta.key;
megaRecord[topic]["key"] = _meta.key;
} else {
megaRecord[_meta.topic]["key"] = null;
megaRecord[topic]["key"] = null;
alert({
type: `warn`,
name: `No key specified. Using null key`,
Expand All @@ -82,26 +86,30 @@ export async function generateMegaRecord(schema: any) {

// for records that already exist, generate values
// for every field that doesn't already have a value.
megaRecord[_meta.topic]["key"] = _meta.key
for (let existingRecord of megaRecord[_meta.topic]["records"]){
megaRecord[topic]["key"] = _meta.key
for (let existingRecord of megaRecord[topic]["records"]){
existingRecord = await generateRandomRecord(fakerRecord, existingRecord);
}


if (_meta.relationships) {
for (const relationship of _meta.relationships) {
let relatedTopic = relationship.topic;
if (global.prefix) {
relatedTopic = `${global.prefix}_${relatedTopic}`
}
// for every existing record, generate "records_per"
// number of new records for the dependent topic
for (const existingRecord of megaRecord[_meta.topic].records) {
for (const existingRecord of megaRecord[topic].records) {
for (let i = 1; i <= relationship.records_per; i++) {
let newRecord = {}
// ensure the new record obeys the foriegn key constraint
// specified in the relationship
newRecord[relationship.child_field] = existingRecord[relationship.parent_field]
if (!megaRecord[relationship.topic]) {
megaRecord[relationship.topic] = { "key": _meta.key, "records": [] }
if (!megaRecord[relatedTopic]) {
megaRecord[relatedTopic] = { "key": _meta.key, "records": [] }
}
megaRecord[relationship.topic].records.push(newRecord);
megaRecord[relatedTopic].records.push(newRecord);
}
}
}
Expand All @@ -113,16 +121,13 @@ export async function generateMegaRecord(schema: any) {
// overriding existing fields that have been populated already.
for (const table of schema) {
const {_meta, ...fakerRecord} = table;
for (let existingRecord of megaRecord[_meta.topic].records){
let topic = _meta.topic;
if (global.prefix) {
topic = `${global.prefix}_${topic}`;
}
for (let existingRecord of megaRecord[topic].records){
existingRecord = await generateRandomRecord(fakerRecord, existingRecord);
}
}
return megaRecord;
}


// const fs = require('fs');
// const string = fs.readFileSync('./tests/schema.json', 'utf-8');
// let schema = JSON.parse(string);
// let megaRecord = generateMegaRecord(schema);
// console.log(JSON.stringify(megaRecord, null, 2))
39 changes: 0 additions & 39 deletions src/schemas/schemaRegistry.ts

This file was deleted.