Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: resolve many lints #88

Merged
merged 10 commits into from
Apr 20, 2023
8 changes: 4 additions & 4 deletions datagen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ program
new Option(
'-n, --number <int>',
'Number of records to generate. For infinite records, use -1'
).default('10').argParser((value) => parseInt(value))
).default('10').argParser((value) => parseInt(value, 10))
)
.option('-c, --clean', 'Clean (delete) Kafka topics and schema subjects previously created')
.option('-dr, --dry-run', 'Dry run (no data will be produced to Kafka)')
Expand Down Expand Up @@ -105,8 +105,8 @@ if (!global.wait) {
}

if (global.clean) {
let topics = []
for (let table of parsedSchema) {
const topics = []
for (const table of parsedSchema) {
topics.push(table._meta.topic)
}
await cleanKafka(options.format, topics)
Expand All @@ -117,7 +117,7 @@ if (!global.wait) {
await dataGenerator({
format: options.format,
schema: parsedSchema,
number: options.number
iterations: options.number
})

await end();
Expand Down
20 changes: 10 additions & 10 deletions src/dataGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import { OutputFormat } from './formats/outputFormat.js';
import { AvroFormat } from './formats/avroFormat.js';
import { JsonFormat } from './formats/jsonFormat.js';

async function* asyncGenerator(number: number) {
async function* asyncGenerator(iterations: number) {
let i = 0;
// If number is -1, generate infinite records
if (number === -1) {
if (iterations === -1) {
while (true) {
yield i;
i++;
}
} else {
for (i; i < number; i++) {
for (i; i < iterations; i++) {
yield i;
}
}
Expand All @@ -35,11 +35,11 @@ function sleep(s: number) {
export default async function dataGenerator({
format,
schema,
number
iterations
}: {
format: string;
schema: string;
number: number;
iterations: number;
}): Promise<void> {

let payload: string;
Expand All @@ -60,14 +60,14 @@ export default async function dataGenerator({
producer = await KafkaProducer.create(outputFormat);
}

for await (const iteration of asyncGenerator(number)) {
for await (const iteration of asyncGenerator(iterations)) {
global.iterationIndex = iteration;
let megaRecord = await generateMegaRecord(schema);
const megaRecord = await generateMegaRecord(schema);

if (iteration == 0) {
if (iteration === 0) {
await producer?.prepare(megaRecord);
if (global.debug && global.dryRun && format == 'avro') {
let avroSchemas = await AvroFormat.getAvroSchemas(megaRecord);
if (global.debug && global.dryRun && format === 'avro') {
await AvroFormat.getAvroSchemas(megaRecord);
}
}

Expand Down
28 changes: 14 additions & 14 deletions src/formats/avroFormat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ export class AvroFormat implements OutputFormat {

if (password && username) {
configuration["auth"] = {
username: username,
password: password
username,
password
};
}

Expand All @@ -32,10 +32,10 @@ export class AvroFormat implements OutputFormat {
constructor(registry: SchemaRegistry) {
this.registry = registry;
}

private static nameHook() {
let index = 0;
return function (schema, opts) {
return (schema, opts) => {
switch (schema.type) {
case 'enum':
case 'fixed':
Expand All @@ -46,16 +46,16 @@ export class AvroFormat implements OutputFormat {
}
};
}

// @ts-ignore
static async getAvroSchemas(megaRecord: any) {
let avroSchemas = {};
for (let topic in megaRecord) {
const avroSchemas = {};
for (const topic in megaRecord) {
// @ts-ignore
let avroSchema = Type.forValue(megaRecord[topic].records[0], { typeHook: this.nameHook() }).schema();
const avroSchema = Type.forValue(megaRecord[topic].records[0], { typeHook: this.nameHook() }).schema();
avroSchema["name"] = topic
avroSchema["namespace"] = "com.materialize"

if (global.debug) {
alert({
type: `success`,
Expand All @@ -72,22 +72,22 @@ export class AvroFormat implements OutputFormat {
async register(megaRecord: any): Promise<void> {
const avroSchemas = await AvroFormat.getAvroSchemas(megaRecord);
for (const topic in avroSchemas) {
let options = { subject: `${topic}-value` }
let avroSchema = avroSchemas[topic]
const options = { subject: `${topic}-value` }
const avroSchema = avroSchemas[topic]
try {
const resp = await this.registry.register({
type: SchemaType.AVRO,
schema: JSON.stringify(avroSchema)
},
options
)

alert({
type: `success`,
name: `Schema registered!`,
msg: `Subject: ${options.subject}, ID: ${resp.id}`
});

this.schemas[topic] = {
'schemaId': resp.id,
'schema': avroSchema
Expand All @@ -98,7 +98,7 @@ export class AvroFormat implements OutputFormat {
name: `Failed to register schema.`,
msg: `${error}`
});

process.exit(1);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/kafka/cleanKafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async function deleteSchemaSubjects(topics: any): Promise<void> {
const schemaRegistryUrl = Env.required("SCHEMA_REGISTRY_URL");

for await (const topic of topics) {
let url = `${schemaRegistryUrl}/subjects/${topic}-value?permanent=false`;
const url = `${schemaRegistryUrl}/subjects/${topic}-value?permanent=false`;
await axios.delete(
url,
{
Expand Down Expand Up @@ -50,15 +50,15 @@ export default async function cleanKafka(format: string, topics: any): Promise<v
await admin.connect();
try {
await admin.deleteTopics({
topics: topics
topics
})
console.log(`deleted Kafka topics ${topics}`)
} catch (error) {
console.log(error)
}
await admin.disconnect();

if (format != 'avro') {
if (format !== 'avro') {
console.log("Skipping Schema Registry")
} else {
await deleteSchemaSubjects(topics);
Expand Down
8 changes: 4 additions & 4 deletions src/kafka/createTopics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ export default async function createTopics(megaRecord: any): Promise<void> {
const admin = kafka.admin();
await admin.connect();
const topics = await admin.listTopics();
let topicConfigs = [];
let replicationFactor = await getReplicationFactor(admin);
const topicConfigs = [];
const replicationFactor = await getReplicationFactor(admin);

for (const topic in megaRecord) {

Expand All @@ -22,9 +22,9 @@ export default async function createTopics(megaRecord: any): Promise<void> {
});
topicConfigs.push(
{
topic: topic,
topic,
replicationFactor,
numPartitions: 1,
replicationFactor: replicationFactor,
configEntries: [
{
name: 'cleanup.policy',
Expand Down
3 changes: 1 addition & 2 deletions src/kafka/kafkaConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ export default async function kafkaConfig() {
connectionTimeout: 10_000,
authenticationTimeout: 10_000
};
const kafka = new Kafka(conf);
return kafka;
return new Kafka(conf);
}

if (sslCaLocation && sslCertLocation && sslKeyLocation) {
Expand Down
4 changes: 2 additions & 2 deletions src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ export class KafkaProducer {
}

async send(key: any, value: any, topic: string) {
let encoded = await this.format.encode(value, topic);
const encoded = await this.format.encode(value, topic);
await this.producer.send({
topic: topic,
topic,
messages: [{
key: key?.toString(),
value: encoded
Expand Down
34 changes: 16 additions & 18 deletions src/schemas/generateMegaRecord.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
import { faker } from '@faker-js/faker';
import alert from 'cli-alerts';


export async function generateRandomRecord(fakerRecord: any, generatedRecord: any = {}){
export async function generateRandomRecord(fakerRecord: any, generatedRecord: any = {}) {
// helper function to generate a record from json schema with faker data

for (const field in fakerRecord) {
if (field in generatedRecord) {
continue
}
if (typeof fakerRecord[field] === 'object') {
generatedRecord[field] = await generateRandomRecord(fakerRecord[field])
continue
}
if (fakerRecord[field] === 'iteration.index'){
}

if (fakerRecord[field] === 'iteration.index') {
generatedRecord[field] = global.iterationIndex + 1;
continue;
}

if (fakerRecord[field].match("faker\..*")) {
try {
let generatedValue =
let generatedValue =
(new Function(
'faker',
`return ${fakerRecord[field]};`
))(faker);
))(faker);
if (generatedValue instanceof Date) {
generatedValue = generatedValue.toISOString();
}
generatedRecord[field] = generatedValue;

} catch (error) {
alert({
type: `error`,
Expand All @@ -56,7 +54,7 @@ export async function generateMegaRecord(schema: any) {
// goal is to return a "mega record" with structure
// {topic: {key: the topic key field name, records: [list of records to send to Kafka]}
// where the records obey the relationships specified in the input schema file
let megaRecord = {} as any;
const megaRecord = {} as any;
for (const table of schema) {
const { _meta, ...fakerRecord } = table;
let topic = _meta.topic;
Expand All @@ -67,7 +65,7 @@ export async function generateMegaRecord(schema: any) {
// populate the initial record for the topic
if (!megaRecord[topic]) {
megaRecord[topic] = { "key": null, "records": [] };
let newRecord = await generateRandomRecord(fakerRecord);
const newRecord = await generateRandomRecord(fakerRecord);
megaRecord[topic].records.push(newRecord);
}

Expand All @@ -86,7 +84,7 @@ export async function generateMegaRecord(schema: any) {
// for records that already exist, generate values
// for every field that doesn't already have a value.
megaRecord[topic]["key"] = _meta.key
for (let existingRecord of megaRecord[topic]["records"]){
for (let existingRecord of megaRecord[topic]["records"]) {
existingRecord = await generateRandomRecord(fakerRecord, existingRecord);
}

Expand All @@ -102,17 +100,17 @@ export async function generateMegaRecord(schema: any) {
for (const existingRecord of megaRecord[topic].records) {
// ensure the new record obeys the foreign key constraint
// specified in the relationship
let newRecords = [];
let existingValue = existingRecord[relationship.parent_field];
const newRecords = [];
const existingValue = existingRecord[relationship.parent_field];
if (Array.isArray(existingValue)) {
for (let i = 0; i < existingValue.length; i++) {
let newRecord = {};
const newRecord = {};
newRecord[relationship.child_field] = existingValue[i]
newRecords.push(newRecord);
}
} else {
for (let i = 1; i <= relationship.records_per; i++) {
let newRecord = {};
const newRecord = {};
newRecord[relationship.child_field] = existingValue;
newRecords.push(newRecord);
}
Expand All @@ -130,12 +128,12 @@ export async function generateMegaRecord(schema: any) {
// We sweep through one more time to make sure all the records have all the fields they need without
// overriding existing fields that have been populated already.
for (const table of schema) {
const {_meta, ...fakerRecord} = table;
const { _meta, ...fakerRecord } = table;
let topic = _meta.topic;
if (global.prefix) {
topic = `${global.prefix}_${topic}`;
}
for (let existingRecord of megaRecord[topic].records){
for (let existingRecord of megaRecord[topic].records) {
existingRecord = await generateRandomRecord(fakerRecord, existingRecord);
}
}
Expand Down
24 changes: 12 additions & 12 deletions src/schemas/parseAvroSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ export async function parseAvroSchema(schemaFile: any) {
});

if (global.debug) {
const parsed = avro.parse(schemaFile);
console.log(parsed);
const avroSchema = avro.parse(schemaFile);
console.log(avroSchema);
}

let schema = [];
let parsed = JSON.parse(schemaFile);
const parsed = JSON.parse(schemaFile);
schema.push(parsed);

schema = await convertAvroSchemaToJson(schema);
Expand All @@ -24,28 +24,28 @@ export async function parseAvroSchema(schemaFile: any) {


function convertAvroSchemaToJson(schema: any, nonRoot: boolean = false): any {
let jsonSchema = [];
const jsonSchema = [];
schema.forEach(table => {
let schema = {};
if(!nonRoot) {
if (!nonRoot) {
schema = {
_meta: {
topic: table.name
}
};
}
table.fields.forEach(column => {

if ((column.type === 'record')) {

schema[column.name] = convertAvroSchemaToJson(column.type, true)[0];
schema[column.name] = convertAvroSchemaToJson(column.type, true)[0];

} else if (typeof column.type === "object" && !Array.isArray(column.type) && column.type.type === 'record') {

} else if(typeof column.type === "object" && !Array.isArray(column.type) && column.type.type === 'record') {
schema[column.name] = convertAvroSchemaToJson([column.type], true)[0];
schema[column.name] = convertAvroSchemaToJson([column.type], true)[0];

}

}

else {
if (Array.isArray(column.type)) {
if (column.type.length === 2 && column.type[0] === 'null') {
Expand Down
Loading