Skip to content

Commit

Permalink
Merge 7359f63 into 8280bec
Browse files Browse the repository at this point in the history
  • Loading branch information
abhimanyugupta07 committed Sep 16, 2020
2 parents 8280bec + 7359f63 commit 8e7ac42
Show file tree
Hide file tree
Showing 11 changed files with 557 additions and 4 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @ExpediaGroup/apiary-committers
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).


## [0.0.2] - TBD
### Added
* Integration tests.

### Changed
* Updated `springframework.boot.version` to `2.3.3.RELEASE` (was `2.1.3.RELEASE`).
* Updated `springframework.version` to `5.2.8.RELEASE` (was `5.1.5.RELEASE`).

## [0.0.1] - 2020-08-03
### Added
* First Release.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class DroneFly implements ApplicationContextAware {
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
new SpringApplicationBuilder(DroneFly.class)
.properties("spring.config.additional-location:classpath:/drone-fly-app.yml,${config:null}")
.properties("spring.config.additional-location:classpath:/drone-fly-app.yml")
.properties("server.port:${endpoint.port:8008}")
.build()
.run(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void createTableEvent() throws MetaException, NoSuchObjectException {
assertThat(result.getTable().getDbName()).isEqualTo(DB_NAME);
assertThat(result.getTable().getTableName()).isEqualTo(TABLE_NAME);
assertThat(result.getTable().getSd().getLocation()).isEqualTo(TABLE_LOCATION);
assertThat(result.getTable().getParameters().get("EXTERNAL")).isEqualTo("TRUE");
}

@Test
Expand Down
78 changes: 78 additions & 0 deletions drone-fly-integration-tests/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.expediagroup</groupId>
<artifactId>drone-fly-parent</artifactId>
<version>0.0.2-SNAPSHOT</version>
</parent>

<artifactId>drone-fly-integration-tests</artifactId>

<dependencies>
<dependency>
<groupId>com.expediagroup</groupId>
<artifactId>drone-fly-app</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>com.tdunning</groupId>
<artifactId>json</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.expediagroup.apiary</groupId>
<artifactId>kafka-metastore-listener</artifactId>
<version>6.0.2</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/**
* Copyright (C) 2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.dataplatform.dronefly.core.integration;

import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType.ADD_PARTITION;
import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType.CREATE_TABLE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.fail;

import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.BOOTSTRAP_SERVERS;
import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.CLIENT_ID;
import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.TOPIC_NAME;
import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.DATABASE;
import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.TABLE;
import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.TOPIC;
import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.buildPartition;
import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.buildTable;
import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.buildTableParameters;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.awaitility.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import com.google.common.collect.Lists;

import com.expediagroup.apiary.extensions.events.metastore.kafka.listener.KafkaMetaStoreEventListener;
import com.expediagroup.dataplatform.dronefly.app.DroneFly;

@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { TOPIC }, partitions = 1)
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class DroneFlyIntegrationTest {

private @Mock HMSHandler hmsHandler;

protected final ExecutorService executorService = Executors.newFixedThreadPool(1);
private static Configuration CONF = new Configuration();

private KafkaMetaStoreEventListener kafkaMetaStoreEventListener;

@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;

private BlockingQueue<ConsumerRecord<String, String>> records;

private KafkaMessageListenerContainer<String, String> container;

@BeforeAll
void setUp() throws InterruptedException {
/**
* The code upto line 110 is required so that EmbeddedKafka waits for the consumer group assignment to complete.
* https://stackoverflow.com/questions/47312373/embeddedkafka-sending-messages-to-consumer-after-delay-in-subsequent-test
*/

Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs,
new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());

System.setProperty("instance.name", "test");
System.setProperty("apiary.bootstrap.servers", embeddedKafkaBroker.getBrokersAsString());
System.setProperty("apiary.kafka.topic.name", TOPIC);
System.setProperty("apiary.listener.list", "com.expediagroup.dataplatform.dronefly.core.integration.DummyListener");
initKafkaListener();

executorService.execute(() -> DroneFly.main(new String[] {}));
await().atMost(Duration.TEN_MINUTES).until(DroneFly::isRunning);
}

private void initKafkaListener() {
CONF.set(BOOTSTRAP_SERVERS.key(), embeddedKafkaBroker.getBrokersAsString());
CONF.set(CLIENT_ID.key(), "apiary-kafka-listener");
CONF.set(TOPIC_NAME.key(), TOPIC);

kafkaMetaStoreEventListener = new KafkaMetaStoreEventListener(CONF);
}

@BeforeEach
public void setup() {

}

@AfterEach
public void reset() {
DummyListener.reset();
}

@AfterAll
public void stop() throws InterruptedException {
DroneFly.stop();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}

@Test
public void typical() {
AddPartitionEvent addPartitionEvent = new AddPartitionEvent(buildTable(), buildPartition(), true, hmsHandler);
kafkaMetaStoreEventListener.onAddPartition(addPartitionEvent);

CreateTableEvent createTableEvent = new CreateTableEvent(buildTable(), true, hmsHandler);
kafkaMetaStoreEventListener.onCreateTable(createTableEvent);

await().atMost(5, TimeUnit.SECONDS).until(() -> DummyListener.getNumEvents() > 1);

assertThat(DummyListener.getNumEvents()).isEqualTo(2);

ListenerEvent receivedEventOne = DummyListener.get(0);
ListenerEvent receivedEventTwo = DummyListener.get(1);

assertEvent(receivedEventOne, ADD_PARTITION);
assertEvent(receivedEventTwo, CREATE_TABLE);
}

private void assertEvent(ListenerEvent event, EventType eventType) {
assertThat(event.getStatus()).isTrue();

switch (eventType) {
case ADD_PARTITION:
assertThat(event).isInstanceOf(AddPartitionEvent.class);
AddPartitionEvent addPartitionEvent = (AddPartitionEvent) event;
assertThat(addPartitionEvent.getTable().getDbName()).isEqualTo(DATABASE);
assertThat(addPartitionEvent.getTable().getTableName()).isEqualTo(TABLE);
Iterator<Partition> iterator = addPartitionEvent.getPartitionIterator();
List<Partition> partitions = new ArrayList<>();
while (iterator.hasNext()) {
partitions.add(iterator.next());
}
assertThat(partitions).isEqualTo(Lists.newArrayList(buildPartition()));
assertThat(addPartitionEvent.getTable().getParameters()).isEqualTo(buildTableParameters());
break;
case CREATE_TABLE:
assertThat(event).isInstanceOf(CreateTableEvent.class);
CreateTableEvent createTableEvent = (CreateTableEvent) event;
assertThat(createTableEvent.getTable().getDbName()).isEqualTo(DATABASE);
assertThat(createTableEvent.getTable().getTableName()).isEqualTo(TABLE);
break;
default:
fail("Received an event type other than ADD_PARTITION or CREATE_TABLE.");
break;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright (C) 2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.dataplatform.dronefly.core.integration;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;

import com.google.common.collect.Lists;

public class DroneFlyIntegrationTestUtils {

static final String TOPIC = "apiary-events";
static final String DATABASE = "database";
static final String TABLE = "table";

public static Table buildTable() {
return buildTable(null);
}

public static Table buildTable(String tableName) {
List<FieldSchema> partitions = Lists.newArrayList();
partitions.add(new FieldSchema("a", "string", "comment"));
partitions.add(new FieldSchema("b", "string", "comment"));
partitions.add(new FieldSchema("c", "string", "comment"));
return new Table(tableName == null ? TABLE : tableName, DATABASE, "me", 1, 1, 1, new StorageDescriptor(),
partitions, buildTableParameters(), "originalText", "expandedText", "tableType");
}

public static Partition buildPartition() {
return buildPartition(null);
}

public static Partition buildPartition(String partitionName) {
List<String> values = Lists.newArrayList();
values.add(partitionName + "1");
values.add(partitionName + "2");
StorageDescriptor sd = new StorageDescriptor();
sd.setStoredAsSubDirectories(false);
return new Partition(values, DATABASE, TABLE, 1, 1, sd, buildTableParameters());
}

public static Map<String, String> buildTableParameters() {
Map<String, String> parameters = new HashMap<>();
parameters.put("key1", "value1");
parameters.put("key2", "value2");
return parameters;
}

public static String buildQualifiedTableName() {
return DATABASE + "." + TABLE;
}

}
Loading

0 comments on commit 8e7ac42

Please sign in to comment.