Skip to content

Commit

Permalink
[#2051] Add instagram source (#2082)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismatix committed Jul 5, 2021
1 parent 5bb909a commit dca9a94
Show file tree
Hide file tree
Showing 23 changed files with 438 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import co.airy.avro.communication.Channel;
import co.airy.avro.communication.ChannelConnectionState;
import co.airy.avro.communication.Metadata;
import co.airy.core.sources.facebook.api.Api;
import co.airy.core.sources.facebook.api.ApiException;
import co.airy.core.sources.facebook.api.model.PageWithConnectInfo;
import co.airy.core.sources.facebook.payload.ConnectRequestPayload;
import co.airy.core.sources.facebook.payload.ConnectInstagramRequestPayload;
import co.airy.core.sources.facebook.payload.ConnectPageRequestPayload;
import co.airy.core.sources.facebook.payload.DisconnectChannelRequestPayload;
import co.airy.core.sources.facebook.payload.ExploreRequestPayload;
import co.airy.core.sources.facebook.payload.ExploreResponsePayload;
Expand Down Expand Up @@ -78,7 +80,7 @@ ResponseEntity<?> explore(@RequestBody @Valid ExploreRequestPayload requestPaylo
}

@PostMapping("/channels.facebook.connect")
ResponseEntity<?> connect(@RequestBody @Valid ConnectRequestPayload requestPayload) {
ResponseEntity<?> connectFacebook(@RequestBody @Valid ConnectPageRequestPayload requestPayload) {
final String token = requestPayload.getPageToken();
final String pageId = requestPayload.getPageId();

Expand Down Expand Up @@ -115,7 +117,53 @@ ResponseEntity<?> connect(@RequestBody @Valid ConnectRequestPayload requestPaylo
}
}

@PostMapping("/channels.facebook.disconnect")
@PostMapping("/channels.instagram.connect")
ResponseEntity<?> connectInstagram(@RequestBody @Valid ConnectInstagramRequestPayload requestPayload) {
final String token = requestPayload.getPageToken();
final String pageId = requestPayload.getPageId();
final String accountId = requestPayload.getAccountId();

final String channelId = UUIDv5.fromNamespaceAndName("instagram", accountId).toString();

try {
final String longLivingUserToken = api.exchangeToLongLivingUserAccessToken(token);
final PageWithConnectInfo fbPageWithConnectInfo = api.getPageForUser(pageId, longLivingUserToken);

api.connectPageToApp(fbPageWithConnectInfo.getAccessToken());

final MetadataMap metadataMap = MetadataMap.from(List.of(
newChannelMetadata(channelId, MetadataKeys.ChannelKeys.NAME, Optional.ofNullable(requestPayload.getName()).orElse(String.format("%s Instagram account", fbPageWithConnectInfo.getNameWithLocationDescriptor())))
));

Optional.ofNullable(requestPayload.getImageUrl())
.ifPresent((imageUrl) -> {
final Metadata metadata = newChannelMetadata(channelId, MetadataKeys.ChannelKeys.IMAGE_URL, imageUrl);
metadataMap.put(metadata.getKey(), metadata);
});

final ChannelContainer container = ChannelContainer.builder()
.channel(
Channel.newBuilder()
.setId(channelId)
.setConnectionState(ChannelConnectionState.CONNECTED)
.setSource("instagram")
.setSourceChannelId(accountId)
.setToken(longLivingUserToken)
.build()
)
.metadataMap(metadataMap).build();

stores.storeChannelContainer(container);

return ResponseEntity.ok(fromChannelContainer(container));
} catch (ApiException e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(new RequestErrorResponsePayload(e.getMessage()));
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}
}

@PostMapping(path = {"/channels.facebook.disconnect", "/channels.instagram.disconnect"})
ResponseEntity<?> disconnect(@RequestBody @Valid DisconnectChannelRequestPayload requestPayload) {
final String channelId = requestPayload.getChannelId().toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean needsMetadataFetched(Conversation conversation) {
}

public List<KeyValue<String, Metadata>> fetchMetadata(String conversationId, Conversation conversation) {
final UserProfile profile = Optional.ofNullable(getProfile(conversation)).orElse(new UserProfile());
final UserProfile profile = Optional.ofNullable(getUserProfile(conversation)).orElse(new UserProfile());

final List<KeyValue<String, Metadata>> recordList = new ArrayList<>();

Expand Down Expand Up @@ -101,7 +101,26 @@ public List<KeyValue<String, Metadata>> fetchMetadata(String conversationId, Con
return recordList;
}

public UserProfile getProfile(Conversation conversation) {
public UserProfile getUserProfile(Conversation conversation) {
if (conversation.getChannel().getSource().equals("instagram")) {
return getInstagramProfile(conversation);
}

return getMessengerProfile(conversation);
}

private UserProfile getInstagramProfile(Conversation conversation) {
final String sourceConversationId = conversation.getSourceConversationId();
final String token = conversation.getChannel().getToken();
try {
return api.getInstagramProfile(sourceConversationId, token);
} catch (Exception profileApiException) {
log.error("Instagram profile api failed", profileApiException);
return new UserProfile();
}
}

private UserProfile getMessengerProfile(Conversation conversation) {
final String sourceConversationId = conversation.getSourceConversationId();
final String token = conversation.getChannel().getToken();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -63,14 +64,16 @@ public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent)

channelStream.toTable(Materialized.as(channelsStore));

final List<String> sources = List.of("facebook", "instagram");

// Channels table
KTable<String, Channel> channelsTable = channelStream
.filter((sourceChannelId, channel) -> "facebook".equalsIgnoreCase(channel.getSource())
.filter((sourceChannelId, channel) -> sources.contains(channel.getSource())
&& channel.getConnectionState().equals(ChannelConnectionState.CONNECTED)).toTable();

// Facebook messaging stream by conversation-id
final KStream<String, Message> messageStream = builder.<String, Message>stream(new ApplicationCommunicationMessages().name())
.filter((messageId, message) -> "facebook".equalsIgnoreCase(message.getSource()))
.filter((messageId, message) -> sources.contains(message.getSource()))
.selectKey((messageId, message) -> message.getConversationId());

// Metadata table
Expand All @@ -85,8 +88,8 @@ public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent)
return aggregate;
});

// Conversation table
final KTable<String, Conversation> conversationTable = messageStream
// Context table
final KTable<String, Conversation> contextTable = messageStream
.groupByKey()
.aggregate(Conversation::new,
(conversationId, message, conversation) -> {
Expand All @@ -106,12 +109,15 @@ public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent)

// Send outbound messages
messageStream.filter((messageId, message) -> DeliveryState.PENDING.equals(message.getDeliveryState()))
.join(conversationTable, (message, conversation) -> new SendMessageRequest(conversation, message))
.mapValues(connector::sendMessage)
.join(contextTable, (message, conversation) -> new SendMessageRequest(conversation, message))
.map((conversationId, sendMessageRequest) -> {
final Message message = connector.sendMessage(sendMessageRequest);
return KeyValue.pair(message.getId(), message);
})
.to(new ApplicationCommunicationMessages().name());

// Fetch missing metadata
conversationTable
contextTable
.toStream()
.leftJoin(metadataTable, (conversation, metadataMap) -> conversation
.toBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.airy.core.sources.facebook.api;

import co.airy.core.sources.facebook.api.model.InstagramProfile;
import co.airy.core.sources.facebook.api.model.LongLivingUserAccessToken;
import co.airy.core.sources.facebook.api.model.PageWithConnectInfo;
import co.airy.core.sources.facebook.api.model.Pages;
Expand All @@ -15,6 +16,7 @@
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.ApplicationListener;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
Expand All @@ -34,6 +36,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/*
@see https://developers.facebook.com/docs/messenger-platform/reference/send-api/
Expand Down Expand Up @@ -96,6 +99,20 @@ private <T> T apiResponse(String url, HttpMethod method, Class<T> clazz) throws
}



// https://developers.facebook.com/docs/messenger-platform/instagram/features/user-profile
public UserProfile getInstagramProfile(String sourceConversationId, String token) {
ResponseEntity<InstagramProfile> responseEntity = restTemplate.getForEntity(baseUrl + "/{ig-id}?fields=name,profile_pic&access_token={access_token}",
InstagramProfile.class, sourceConversationId, token);
if (responseEntity.getStatusCode() != HttpStatus.OK) {
throw new ApiException("Call unsuccessful, received HTTP status " + responseEntity.getStatusCodeValue());
}
return Optional.ofNullable(responseEntity.getBody())
.map((body) -> UserProfile.builder().firstName(body.getName()).profilePic(body.getProfilePic()).build())
.orElseThrow();
}


// See "Retrieving a Person's Profile" in https://developers.facebook.com/docs/messenger-platform/identity/user-profile
public UserProfile getProfileFromContact(String sourceConversationId, String token) {
String reqUrl = String.format(baseUrl + "/%s?fields=first_name,last_name,profile_pic&access_token=%s",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package co.airy.core.sources.facebook.api.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class InstagramProfile {
private String name;
private String profilePic;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package co.airy.core.sources.facebook.payload;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotNull;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ConnectInstagramRequestPayload {
@NotNull
private String pageId;
@NotNull
private String accountId;
@NotNull
private String pageToken;
private String name;
private String imageUrl;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ConnectRequestPayload {
public class ConnectPageRequestPayload {
@NotNull
private String pageId;
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ public class EventsRouter implements DisposableBean, ApplicationListener<Applica
public void startStream() {
final StreamsBuilder builder = new StreamsBuilder();

final List<String> sources = List.of("facebook", "instagram");

// Channels table
KTable<String, Channel> channelsTable = builder.<String, Channel>stream(new ApplicationCommunicationChannels().name())
.groupBy((k, v) -> v.getSourceChannelId())
.reduce((aggValue, newValue) -> newValue)
.filter((sourceChannelId, channel) -> "facebook".equalsIgnoreCase(channel.getSource())
.filter((sourceChannelId, channel) -> sources.contains(channel.getSource())
&& channel.getConnectionState().equals(ChannelConnectionState.CONNECTED));

builder.<String, String>stream(new SourceFacebookEvents().name())
Expand Down Expand Up @@ -106,7 +108,7 @@ public void startStream() {
final String messageId = UUIDv5.fromNamespaceAndName(channel.getId(), payload).toString();

try {
final Message.Builder messageBuilder = messageParser.parse(payload);
final Message.Builder messageBuilder = messageParser.parse(payload, channel.getSource());

return KeyValue.pair(
messageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ String getSourceConversationId(final JsonNode webhookMessaging) throws NullPoint
: webhookMessaging.get("sender").get("id").asText();
}

public Message.Builder parse(final String payload) throws Exception {
public Message.Builder parse(final String payload, final String source) throws Exception {
final JsonNode webhookMessaging = objectMapper.readTree(payload);

final JsonNode message = webhookMessaging.get("message");
Expand All @@ -52,7 +52,7 @@ public Message.Builder parse(final String payload) throws Exception {
} else if (appId != null && !appId.equals(this.facebookAppId)) {
// Third party app
senderId = appId;
} else if (appId == null) {
} else if (appId == null && !source.equals("instagram")) {
// Sent by Facebook moderator via Facebook inbox
senderId = getSourceConversationId(webhookMessaging);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
Expand Down Expand Up @@ -69,7 +70,10 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {

messageStream.filter((messageId, message) -> DeliveryState.PENDING.equals(message.getDeliveryState()))
.join(contextTable, (message, sendMessageRequest) -> sendMessageRequest.toBuilder().message(message).build())
.mapValues(connector::sendMessage)
.map((conversationId, sendMessageRequest) -> {
final Message message = connector.sendMessage(sendMessageRequest);
return KeyValue.pair(message.getId(), message);
})
.to(new ApplicationCommunicationMessages().name());

streams.start(builder.build(), appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
Expand Down Expand Up @@ -84,7 +85,10 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {

messageStream.filter((messageId, message) -> DeliveryState.PENDING.equals(message.getDeliveryState()))
.join(contextTable, (message, sendMessageRequest) -> sendMessageRequest.toBuilder().message(message).build())
.mapValues(connector::sendMessage)
.map((conversationId, sendMessageRequest) -> {
final Message message = connector.sendMessage(sendMessageRequest);
return KeyValue.pair(message.getId(), message);
})
.to(new ApplicationCommunicationMessages().name());

streams.start(builder.build(), appId);
Expand Down
16 changes: 16 additions & 0 deletions docs/docs/api/endpoints/channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ import ConnectFacebook from './connect-facebook.mdx'

<ConnectFacebook />

### Instagram

import ConnectInstagram from './connect-instagram.mdx'

<ConnectInstagram />

### Google

import ConnectGoogle from './connect-google.mdx'
Expand Down Expand Up @@ -186,6 +192,16 @@ POST /channels.facebook.disconnect

<ChannelDisconnect />

### Instagram

Disconnects an instagram account from Airy Core.

```
POST /channels.instagram.disconnect
```

<ChannelDisconnect />

### Google

```
Expand Down
Loading

0 comments on commit dca9a94

Please sign in to comment.