Skip to content
Merged
1 change: 1 addition & 0 deletions lang/c/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ add_avro_executable(test_interop_data)

add_avro_test_checkmem(test_data_structures)
add_avro_test_checkmem(test_avro_schema)
add_avro_test_checkmem(test_avro_commons_schema)
add_avro_test_checkmem(test_avro_schema_names)
add_avro_test_checkmem(test_avro_values)
add_avro_test_checkmem(test_avro_766)
Expand Down
147 changes: 147 additions & 0 deletions lang/c/tests/test_avro_commons_schema.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

#include "avro.h"
#include "avro_private.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#ifdef _WIN32
#include "msdirent.h"
#else
#include <dirent.h>
#endif

avro_writer_t avro_stderr;

static avro_schema_t read_common_schema_test(const char *dirpath) {
char schemafilepath[1024];
char jsontext[4096];

avro_schema_t schema;
int n = snprintf(schemafilepath, sizeof(schemafilepath), "%s/schema.json", dirpath);
if (n < 0) {
fprintf(stderr, "Size of dir path is too long %s !\n", dirpath);
exit(EXIT_FAILURE);
}
FILE* fp = fopen(schemafilepath, "r");
if (!fp) {
fprintf(stderr, "can't open file %s !\n", schemafilepath);
exit(EXIT_FAILURE);
}
int rval = fread(jsontext, 1, sizeof(jsontext) - 1, fp);
fclose(fp);
jsontext[rval] = '\0';

int test_rval = avro_schema_from_json(jsontext, 0, &schema, NULL);
if (test_rval != 0) {
fprintf(stderr, "fail! Can' read schema from file %s\n", schemafilepath);
exit(EXIT_FAILURE);
}
return schema;
}

static void create_writer(avro_schema_t schema, avro_file_writer_t* writer)
{
// create / reset copy.avro file.
FILE* copyFile = fopen("./copy.avro", "w");
if (!copyFile) {
fprintf(stderr, "can't create file copy.avro !\n");
exit(EXIT_FAILURE);
}
fclose(copyFile);

// create avro writer on file.
if (avro_file_writer_create("./copy.avro", schema, writer)) {
fprintf(stdout, "\nThere was an error creating db: %s", avro_strerror());
exit(EXIT_FAILURE);
}
}

static void read_data(const char *dirpath, avro_schema_t schema) {
char datafilepath[1024];
int n = snprintf(datafilepath, sizeof(datafilepath), "%s/data.avro", dirpath);
if (n < 0) {
fprintf(stderr, "Size of dir path is too long %s/data.avro !\n", dirpath);
exit(EXIT_FAILURE);
}

avro_file_reader_t reader;
avro_datum_t datum;
int rval = avro_file_reader(datafilepath, &reader);
if (rval) {
exit(EXIT_FAILURE);
}

avro_file_writer_t writer;
create_writer(schema, &writer);

int records_read = 0;
while ((rval = avro_file_reader_read(reader, schema, &datum)) == 0) {
records_read++;
if (avro_file_writer_append(writer, datum)) {
fprintf(stdout, "\nCan't write record: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}

avro_datum_decref(datum);
}
fprintf(stdout, "\nExit run test OK => %d records", records_read);
remove("./copy.avro");
fflush(stdout);
}

static void run_tests(const char *dirpath)
{
fprintf(stdout, "\nRun test for path '%s'", dirpath);
avro_schema_t schema = read_common_schema_test(dirpath);
read_data(dirpath, schema);
}



int main(int argc, char *argv[])
{
char *srcdir = "../../../share/test/data/schemas";
AVRO_UNUSED(argc);
AVRO_UNUSED(argv);

avro_stderr = avro_writer_file(stderr);

DIR* dir = opendir(srcdir);
if (dir == NULL) {
fprintf(stdout, "Unable to open '%s'\n", srcdir);
fflush(stdout);
exit(EXIT_FAILURE);
}
struct dirent *dent;
do {
dent = readdir(dir);

if (dent && dent->d_name[0] != '.' && dent->d_type == DT_DIR) {
char filepath[1024];
snprintf(filepath, sizeof(filepath), "%s/%s", srcdir, dent->d_name);
run_tests(filepath);
}
}
while(dent != NULL);
closedir(dir);

avro_writer_free(avro_stderr);
return EXIT_SUCCESS;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Stream;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestSchemaCommons {
private static final Logger LOG = LoggerFactory.getLogger(TestSchemaCommons.class);

@ParameterizedTest
@MethodSource("sharedFolders")
void runFolder(final File folder) throws IOException {
final File schemaSource = new File(folder, "schema.json");
final File data = new File(folder, "data.avro");

if (!schemaSource.exists()) {
LOG.warn("No 'schema.json' file on folder {}", folder.getPath());
return;
}
final Schema schema = new Schema.Parser().parse(schemaSource);
Assertions.assertNotNull(schema);

if (!data.exists()) {
LOG.warn("No 'data.avro' file on folder {}", folder.getPath());
return;
}

// output file
final String rootTest = Thread.currentThread().getContextClassLoader().getResource(".").getPath();
final File copyData = new File(rootTest, "copy.avro");

// Deserialize from disk
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(data, datumReader);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, copyData);
GenericRecord record = null;
int counter = 0;
while (dataFileReader.hasNext()) {
record = dataFileReader.next();
counter++;
Assertions.assertNotNull(record);
dataFileWriter.append(record);
}
Assertions.assertTrue(counter > 0, "no data in file");
}
}

public static Stream<Arguments> sharedFolders() {
File root = new File("../../../share/test/data/schemas");
return Arrays.stream(root.listFiles(File::isDirectory)).map(Arguments::of);
}

}
147 changes: 147 additions & 0 deletions lang/rust/avro/tests/shared.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use apache_avro::{types::Value, Codec, Reader, Schema, Writer};
use std::{
fmt,
fs::{DirEntry, File, ReadDir},
io::BufReader,
path::Path,
slice::Iter,
};

const ROOT_DIRECTORY: &str = "../../../share/test/data/schemas";

#[test]
fn test_schema() {
let directory: ReadDir = scan_shared_folder();
let mut result: Result<(), ErrorsDesc> = Ok(());
for f in directory {
let entry: DirEntry = match f {
Ok(entry) => entry,
Err(e) => core::panic!("Can't get file {}", e),
};
log::debug!("{:?}", entry.file_name());
if let Ok(ft) = entry.file_type() {
if ft.is_dir() {
let sub_folder =
ROOT_DIRECTORY.to_owned() + "/" + entry.file_name().to_str().unwrap();

let dir_result = test_folder(sub_folder.as_str());
if let Result::Err(ed) = dir_result {
result = match result {
Ok(()) => Err(ed),
Err(e) => Err(e.merge(&ed)),
}
}
}
}
}
if let Err(e) = result {
core::panic!("{}", e)
}
}

#[derive(Debug)]
struct ErrorsDesc {
details: Vec<String>,
}

impl ErrorsDesc {
fn new(msg: &str) -> ErrorsDesc {
ErrorsDesc {
details: vec![msg.to_string()],
}
}

fn add(&self, msg: &str) -> Self {
let mut new_vec = self.details.clone();
new_vec.push(msg.to_string());
Self { details: new_vec }
}

fn merge(&self, err: &ErrorsDesc) -> Self {
let mut new_vec = self.details.clone();
err.details
.iter()
.for_each(|d: &String| new_vec.push(d.clone()));
Self { details: new_vec }
}
}

impl fmt::Display for ErrorsDesc {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.details.join("\n").as_str())
}
}

fn test_folder(folder: &str) -> Result<(), ErrorsDesc> {
let file_name = folder.to_owned() + "/schema.json";
let content = std::fs::read_to_string(file_name).expect("Unable to find schema.jon file");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better if test_folder() returns a Result<(), Error>. This way all expect() calls could be simplified to ? and we could see the real error when it happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


let schema: Schema = Schema::parse_str(content.as_str()).expect("Can't read schema");

let data_file_name = folder.to_owned() + "/data.avro";
let data_path: &Path = Path::new(data_file_name.as_str());
let mut result = Result::Ok(());
if !data_path.exists() {
log::error!("{}", format!("folder {folder} does not exist"));
return Result::Err(ErrorsDesc::new(
format!("folder {folder} does not exist").as_str(),
));
} else {
let file: File = File::open(data_path).expect("Can't open data.avro");
let reader =
Reader::with_schema(&schema, BufReader::new(&file)).expect("Can't read data.avro");

let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);

let mut records: Vec<Value> = vec![];

for r in reader {
let record: Value = r.expect("Error on reading");
writer.append(record.clone()).expect("Error on write item");
records.push(record);
}

writer.flush().expect("Error on flush");
let bytes: Vec<u8> = writer.into_inner().unwrap();
let reader_bis =
Reader::with_schema(&schema, &bytes[..]).expect("Can't read flushed vector");

let mut records_iter: Iter<Value> = records.iter();
for r2 in reader_bis {
let record: Value = r2.expect("Error on reading");
let original = records_iter.next().expect("Error, no next");
if original != &record {
result = match result {
Ok(_) => Result::Err(ErrorsDesc::new(
format!("Records are not equals for folder : {folder}").as_str(),
)),
Err(e) => {
Err(e.add(format!("Records are not equals for folder : {folder}").as_str()))
}
}
}
}
}
result
}

fn scan_shared_folder() -> ReadDir {
std::fs::read_dir(ROOT_DIRECTORY).expect("Can't read root folder")
}
Loading