Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public GooglePubsubHeaderFilterStrategy() {
public GooglePubsubHeaderFilterStrategy(boolean includeAllGoogleProperties) {
setOutFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH);
setInFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH);
// Filter authorization on both directions for security
getOutFilter().add("authorization");
getInFilter().add("authorization");
if (!includeAllGoogleProperties) {
ignoreGoogProperties();
}
Expand All @@ -42,7 +44,9 @@ protected void ignoreGoogProperties() {
filterStartWith[DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH.length + 2] = "goog";
setOutFilterStartsWith(filterStartWith);
setInFilterStartsWith(filterStartWith);
// Filter grpc-timeout on both directions
getOutFilter().add("grpc-timeout");
getInFilter().add("grpc-timeout");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void send(Exchange exchange) throws Exception {
for (String camelHeader : exchange.getIn().getHeaders().keySet()) {
String value = exchange.getIn().getHeader(camelHeader, String.class);
if (headerFilterStrategy != null
&& headerFilterStrategy.applyFilterToExternalHeaders(camelHeader, value, exchange)) {
&& headerFilterStrategy.applyFilterToCamelHeaders(camelHeader, value, exchange)) {
continue;
}
messageBuilder.putAttributes(camelHeader, value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.google.pubsub.integration;

import java.util.List;

import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.google.pubsub.GooglePubsubComponent;
import org.apache.camel.component.google.pubsub.PubsubTestSupport;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.DefaultHeaderFilterStrategy;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

/**
* Integration test that verifies component-level HeaderFilterStrategy configuration works correctly to filter sensitive
* headers like Authorization and Cookie before sending to Google Pub/Sub.
*/
public class ComponentLevelHeaderFilterStrategyIT extends PubsubTestSupport {

private static final String TOPIC_NAME = "headerFilterTopic";
private static final String SUBSCRIPTION_NAME = "headerFilterSubscription";

@EndpointInject("mock:receiveResult")
private MockEndpoint receiveResult;

@Produce("direct:from")
private ProducerTemplate producer;

@Override
public void createTopicSubscription() {
createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME);
}

@Override
protected void addPubsubComponent(CamelContext context) {
// Configure a custom HeaderFilterStrategy at component level
// to filter sensitive headers like Authorization and Cookie
DefaultHeaderFilterStrategy strategy = new DefaultHeaderFilterStrategy();
strategy.setFilterOnMatch(true);
strategy.getOutFilter().add("Authorization");
strategy.getOutFilter().add("Cookie");

GooglePubsubComponent component = new GooglePubsubComponent();
component.setEndpoint(service.getServiceAddress());
component.setAuthenticate(false);
component.setHeaderFilterStrategy(strategy);

context.addComponent("google-pubsub", component);
context.getPropertiesComponent().setLocation("ref:prop");
}

@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
from("direct:from")
.routeId("HeaderFilter_Send")
.to("google-pubsub:{{project.id}}:" + TOPIC_NAME);

from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?synchronousPull=true")
.routeId("HeaderFilter_Receive")
.to("mock:receiveResult");
}
};
}

@Test
public void testSensitiveHeadersAreFiltered() throws Exception {
Exchange exchange = new DefaultExchange(context);

// Set sensitive headers that should be filtered (with different case variations)
exchange.getIn().setBody("Test message with sensitive headers");
exchange.getIn().setHeader("Authorization", "Bearer SensitiveAuthToken");
exchange.getIn().setHeader("Cookie", "session=abc123");
// Also test case insensitivity
exchange.getIn().setHeader("AUTHORIZATION", "Bearer Token2");
exchange.getIn().setHeader("cookie", "session=xyz");

// Set a custom header that should NOT be filtered
String customHeaderKey = "X-Custom-Header";
String customHeaderValue = "CustomValue";
exchange.getIn().setHeader(customHeaderKey, customHeaderValue);

receiveResult.expectedMessageCount(1);
receiveResult.expectedBodiesReceivedInAnyOrder(exchange.getIn().getBody());

producer.send(exchange);

receiveResult.assertIsSatisfied(5000);

List<Exchange> receivedExchanges = receiveResult.getExchanges();
assertNotNull(receivedExchanges, "Received exchanges should not be null");
assertEquals(1, receivedExchanges.size(), "Should receive exactly one message");

Exchange receivedExchange = receivedExchanges.get(0);

// Verify sensitive headers are filtered out (all case variations)
assertNull(receivedExchange.getIn().getHeader("Authorization"),
"Authorization header should be filtered and not received");
assertNull(receivedExchange.getIn().getHeader("AUTHORIZATION"),
"AUTHORIZATION header should be filtered and not received");
assertNull(receivedExchange.getIn().getHeader("Cookie"),
"Cookie header should be filtered and not received");
assertNull(receivedExchange.getIn().getHeader("cookie"),
"cookie header should be filtered and not received");

// Verify non-sensitive custom header is preserved
assertEquals(customHeaderValue, receivedExchange.getIn().getHeader(customHeaderKey),
"Custom header should be preserved");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.camel.component.google.pubsub.GooglePubsubHeaderFilterStrategy;
import org.apache.camel.component.google.pubsub.PubsubTestSupport;
import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class PubsubHeaderFilterTest extends PubsubTestSupport {
Expand All @@ -34,4 +36,46 @@ public void testPubsubHeaderFilter(String headerName) {
assertTrue(googlePubsubHeaderFilterStrategy.applyFilterToExternalHeaders(headerName, "value", exchange),
headerName + " not filtered");
}

@ParameterizedTest
@ValueSource(strings = { "authorization", "Authorization", "AUTHORIZATION" })
public void testAuthorizationHeaderFilteredOnOutbound(String headerName) {
// Tests that sensitive headers are filtered when sending FROM Camel TO Pub/Sub (producer)
GooglePubsubHeaderFilterStrategy strategy = new GooglePubsubHeaderFilterStrategy(false);
Exchange exchange = new DefaultExchange(context);
assertTrue(strategy.applyFilterToCamelHeaders(headerName, "Bearer token", exchange),
headerName + " should be filtered on outbound (Camel to Pub/Sub)");
}

@ParameterizedTest
@ValueSource(strings = { "authorization", "Authorization", "AUTHORIZATION" })
public void testAuthorizationHeaderFilteredOnInbound(String headerName) {
// Tests that sensitive headers are filtered when receiving FROM Pub/Sub TO Camel (consumer)
GooglePubsubHeaderFilterStrategy strategy = new GooglePubsubHeaderFilterStrategy(false);
Exchange exchange = new DefaultExchange(context);
assertTrue(strategy.applyFilterToExternalHeaders(headerName, "Bearer token", exchange),
headerName + " should be filtered on inbound (Pub/Sub to Camel)");
}

@ParameterizedTest
@ValueSource(strings = { "grpc-timeout", "Grpc-Timeout", "GRPC-TIMEOUT" })
public void testGrpcTimeoutHeaderFilteredOnBothDirections(String headerName) {
GooglePubsubHeaderFilterStrategy strategy = new GooglePubsubHeaderFilterStrategy(false);
Exchange exchange = new DefaultExchange(context);
assertTrue(strategy.applyFilterToCamelHeaders(headerName, "30s", exchange),
headerName + " should be filtered on outbound");
assertTrue(strategy.applyFilterToExternalHeaders(headerName, "30s", exchange),
headerName + " should be filtered on inbound");
}

@Test
public void testNonSensitiveHeadersNotFiltered() {
GooglePubsubHeaderFilterStrategy strategy = new GooglePubsubHeaderFilterStrategy(false);
Exchange exchange = new DefaultExchange(context);
// Regular headers should NOT be filtered
assertFalse(strategy.applyFilterToCamelHeaders("X-Custom-Header", "value", exchange),
"Custom headers should not be filtered on outbound");
assertFalse(strategy.applyFilterToExternalHeaders("X-Custom-Header", "value", exchange),
"Custom headers should not be filtered on inbound");
}
}
Loading