Skip to content

Commit

Permalink
Merge branch 'release/0.28.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Pascal Holy committed Aug 11, 2021
2 parents 31e1b95 + 09ae1bd commit 2c1d1bf
Show file tree
Hide file tree
Showing 181 changed files with 1,191 additions and 554 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ jobs:
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
GITHUB_BRANCH: ${{ github.ref }}

- name: Publish helm charts to S3
if: startsWith(github.ref, 'refs/heads/release') || startsWith(github.ref, 'refs/heads/main') || startsWith(github.ref, 'refs/heads/develop')
run: |
./scripts/upload-helm-charts.sh
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION : ${{ secrets.AWS_REGION }}
GITHUB_BRANCH: ${{ github.ref }}

- name: Publish http-client library to npm
if: ${{ startsWith(github.ref, 'refs/heads/main') }}
run: |
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.27.1
0.28.0
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public static class Contact {
public static final String FETCH_STATE = "contact.fetch_state";
}

public static class Reaction {
public static final String EMOJI = "reaction.emoji";
public static final String SENT_AT = "reaction.sent_at";
}

public enum ContactFetchState {
ok("ok"),
failed("failed");
Expand All @@ -42,6 +47,7 @@ public static class ChannelKeys {

public static class MessageKeys {
public static final String SUGGESTIONS = "suggestions";
public static final String SOURCE_ID = "source_id";
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
import co.airy.core.sources.facebook.api.ApiException;
import co.airy.core.sources.facebook.api.Mapper;
import co.airy.core.sources.facebook.api.model.SendMessagePayload;
import co.airy.core.sources.facebook.api.model.SendMessageResponse;
import co.airy.core.sources.facebook.api.model.UserProfile;
import co.airy.core.sources.facebook.dto.Conversation;
import co.airy.core.sources.facebook.dto.SendMessageRequest;
import co.airy.log.AiryLoggerFactory;
import co.airy.model.metadata.MetadataKeys;
import co.airy.spring.auth.IgnoreAuthPattern;
import co.airy.spring.web.filters.RequestLoggingIgnorePatterns;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.streams.KeyValue;
import org.slf4j.Logger;
import org.springframework.context.annotation.Bean;
Expand All @@ -32,6 +35,7 @@
import static co.airy.model.metadata.MetadataKeys.ConversationKeys.ContactFetchState.ok;
import static co.airy.model.metadata.MetadataRepository.getId;
import static co.airy.model.metadata.MetadataRepository.newConversationMetadata;
import static co.airy.model.metadata.MetadataRepository.newMessageMetadata;

@Component
public class Connector {
Expand All @@ -46,31 +50,32 @@ public class Connector {
this.mapper = mapper;
}

public Message sendMessage(SendMessageRequest sendMessageRequest) {
public List<KeyValue<String, SpecificRecordBase>> sendMessage(SendMessageRequest sendMessageRequest) {
final Message message = sendMessageRequest.getMessage();
final Conversation conversation = sendMessageRequest.getConversation();

if (isMessageStale(message)) {
updateDeliveryState(message, DeliveryState.FAILED);
return message;
return List.of(KeyValue.pair(message.getId(), message));
}

try {
final String pageToken = conversation.getChannel().getToken();
final SendMessagePayload fbSendMessagePayload = mapper.fromSendMessageRequest(sendMessageRequest);

api.sendMessage(pageToken, fbSendMessagePayload);

final SendMessageResponse response = api.sendMessage(pageToken, fbSendMessagePayload);
final Metadata metadata = newMessageMetadata(message.getId(), MetadataKeys.MessageKeys.SOURCE_ID, response.getMessageId());
updateDeliveryState(message, DeliveryState.DELIVERED);
return message;

return List.of(KeyValue.pair(message.getId(), message), KeyValue.pair(getId(metadata).toString(), metadata));
} catch (ApiException e) {
log.error(String.format("Failed to send a message to Facebook \n SendMessageRequest: %s \n Error Message: %s \n", sendMessageRequest, e.getMessage()), e);
} catch (Exception e) {
log.error(String.format("Failed to send a message to Facebook \n SendMessageRequest: %s", sendMessageRequest), e);
}

updateDeliveryState(message, DeliveryState.FAILED);
return message;
return List.of(KeyValue.pair(message.getId(), message));
}

private boolean isMessageStale(Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class Stores implements ApplicationListener<ApplicationStartedEvent>, Dis
private final String channelsStore = "channels-store";
private final String applicationCommunicationChannels = new ApplicationCommunicationChannels().name();
private final String applicationCommunicationMetadata = new ApplicationCommunicationMetadata().name();
private final String applicationCommunicationMessages = new ApplicationCommunicationMessages().name();
private final KafkaProducer<String, SpecificRecordBase> producer;
private final Connector connector;

Expand All @@ -72,7 +73,7 @@ public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent)
&& channel.getConnectionState().equals(ChannelConnectionState.CONNECTED)).toTable();

// Facebook messaging stream by conversation-id
final KStream<String, Message> messageStream = builder.<String, Message>stream(new ApplicationCommunicationMessages().name())
final KStream<String, Message> messageStream = builder.<String, Message>stream(applicationCommunicationMessages)
.filter((messageId, message) -> message != null && sources.contains(message.getSource()))
.selectKey((messageId, message) -> message.getConversationId());

Expand Down Expand Up @@ -110,11 +111,17 @@ public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent)
// Send outbound messages
messageStream.filter((messageId, message) -> DeliveryState.PENDING.equals(message.getDeliveryState()))
.join(contextTable, (message, conversation) -> new SendMessageRequest(conversation, message))
.map((conversationId, sendMessageRequest) -> {
final Message message = connector.sendMessage(sendMessageRequest);
return KeyValue.pair(message.getId(), message);
})
.to(new ApplicationCommunicationMessages().name());
.flatMap((conversationId, sendMessageRequest) -> connector.sendMessage(sendMessageRequest))
.to((recordId, record, context) -> {
if (record instanceof Metadata) {
return applicationCommunicationMetadata;
}
if (record instanceof Message) {
return applicationCommunicationMessages;
}

throw new IllegalStateException("Unknown type for record " + record);
});

// Fetch missing metadata
contextTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import co.airy.core.sources.facebook.api.model.Pages;
import co.airy.core.sources.facebook.api.model.Participants;
import co.airy.core.sources.facebook.api.model.SendMessagePayload;
import co.airy.core.sources.facebook.api.model.SendMessageResponse;
import co.airy.core.sources.facebook.api.model.UserProfile;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -48,7 +49,7 @@ public class Api implements ApplicationListener<ApplicationReadyEvent> {

private RestTemplate restTemplate;

private static final String subscribedFields = "messages,messaging_postbacks,messaging_optins,message_deliveries,message_reads,messaging_payments,messaging_pre_checkouts,messaging_checkout_updates,messaging_account_linking,messaging_referrals,message_echoes,messaging_game_plays,standby,messaging_handovers,messaging_policy_enforcement,message_reactions,inbox_labels";
private static final String subscribedFields = "messages,messaging_postbacks,messaging_optins,message_deliveries,message_reads,messaging_payments,messaging_pre_checkouts,messaging_checkout_updates,messaging_account_linking,messaging_referrals,message_echoes,messaging_game_plays,standby,messaging_handovers,messaging_policy_enforcement,message_reactions,inbox_labels,message_reactions";
private static final String baseUrl = "https://graph.facebook.com/v11.0";
private static final String requestTemplate = baseUrl + "/me/messages?access_token=%s";
private final String pageFields = "fields=id,name_with_location_descriptor,access_token,picture,is_webhooks_subscribed";
Expand All @@ -71,9 +72,10 @@ public Api(ObjectMapper objectMapper, RestTemplateBuilder restTemplateBuilder,
this.apiSecret = apiSecret;
}

public void sendMessage(final String pageToken, SendMessagePayload sendMessagePayload) {
public SendMessageResponse sendMessage(final String pageToken, SendMessagePayload sendMessagePayload) {
String fbReqUrl = String.format(requestTemplate, pageToken);
restTemplate.postForEntity(fbReqUrl, new HttpEntity<>(sendMessagePayload, httpHeaders), FbSendMessageResponse.class);
final ResponseEntity<SendMessageResponse> responseEntity = restTemplate.postForEntity(fbReqUrl, new HttpEntity<>(sendMessagePayload, httpHeaders), SendMessageResponse.class);
return responseEntity.getBody();
}

public List<PageWithConnectInfo> getPagesInfo(String accessToken) throws Exception {
Expand All @@ -98,8 +100,6 @@ private <T> T apiResponse(String url, HttpMethod method, Class<T> clazz) throws
return objectMapper.readValue(responseEntity.getBody(), clazz);
}



// https://developers.facebook.com/docs/messenger-platform/instagram/features/user-profile
public UserProfile getInstagramProfile(String sourceConversationId, String token) {
ResponseEntity<InstagramProfile> responseEntity = restTemplate.getForEntity(baseUrl + "/{ig-id}?fields=name,profile_pic&access_token={access_token}",
Expand Down Expand Up @@ -192,15 +192,4 @@ public void handleError(ClientHttpResponse response) throws IOException {
.additionalMessageConverters(new MappingJackson2HttpMessageConverter(objectMapper))
.build();
}

@Data
@NoArgsConstructor
@AllArgsConstructor
private static class FbSendMessageResponse {
@JsonProperty("recipient_id")
private String recipientId;

@JsonProperty("message_id")
private String messageId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package co.airy.core.sources.facebook.api.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class SendMessageResponse {
@JsonProperty("recipient_id")
private String recipientId;
@JsonProperty("message_id")
private String messageId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import co.airy.avro.communication.Message;
import co.airy.core.sources.facebook.api.Api;
import co.airy.core.sources.facebook.api.model.SendMessagePayload;
import co.airy.core.sources.facebook.api.model.SendMessageResponse;
import co.airy.kafka.schema.Topic;
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMessages;
Expand Down Expand Up @@ -41,6 +42,7 @@
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

@SpringBootTest(classes = AirySpringBootApplication.class)
@TestPropertySource(value = "classpath:test.properties")
Expand Down Expand Up @@ -98,7 +100,7 @@ void canSendMessageViaTheFacebookApi() throws Exception {

ArgumentCaptor<SendMessagePayload> payloadCaptor = ArgumentCaptor.forClass(SendMessagePayload.class);
ArgumentCaptor<String> tokenCaptor = ArgumentCaptor.forClass(String.class);
doNothing().when(api).sendMessage(tokenCaptor.capture(), payloadCaptor.capture());
when(api.sendMessage(tokenCaptor.capture(), payloadCaptor.capture())).thenReturn(new SendMessageResponse("recipient id", "message id"));

kafkaTestHelper.produceRecords(List.of(
new ProducerRecord<>(applicationCommunicationChannels.name(), channelId, Channel.newBuilder()
Expand Down Expand Up @@ -141,7 +143,7 @@ void canSendMessageViaTheFacebookApi() throws Exception {
.setIsFromContact(false)
.build())
);

retryOnException(() -> {
final SendMessagePayload sendMessagePayload = payloadCaptor.getValue();
assertThat(sendMessagePayload.getRecipient().getId(), equalTo(sourceConversationId));
Expand Down
1 change: 1 addition & 0 deletions backend/sources/facebook/events-router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ app_deps = [
"//backend:base_app",
"//backend/model/channel",
"//backend/model/message",
"//backend/model/metadata",
"//lib/java/uuid",
"//lib/java/log",
"//lib/java/kafka/schema:source-facebook-events",
Expand Down
Loading

0 comments on commit 2c1d1bf

Please sign in to comment.