Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.proxy.ProxyConfiguration;

import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.CREATED_TIME;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.CREATED_TIME_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
Expand All @@ -64,6 +66,8 @@
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MODIFIED_TIME;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MODIFIED_TIME_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_AVAILABLE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_AVAILABLE_DESC;
Expand All @@ -84,6 +88,8 @@
@WritesAttribute(attribute = SIZE, description = SIZE_DESC),
@WritesAttribute(attribute = SIZE_AVAILABLE, description = SIZE_AVAILABLE_DESC),
@WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
@WritesAttribute(attribute = CREATED_TIME, description = CREATED_TIME_DESC),
@WritesAttribute(attribute = MODIFIED_TIME, description = MODIFIED_TIME_DESC),
@WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC),
@WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC)
})
Expand Down Expand Up @@ -285,7 +291,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
final long startNanos = System.nanoTime();
try {
final File fileMetadata = fetchFileMetadata(fileId);
final Map<String, String> attributeMap = createAttributeMap(fileMetadata);
final Map<String, String> attributeMap = createGoogleDriveFileInfoBuilder(fileMetadata).build().toStringMap();

flowFile = fetchFile(fileMetadata, session, context, flowFile, attributeMap);

Expand Down Expand Up @@ -401,7 +407,7 @@ private File fetchFileMetadata(final String fileId) throws IOException {
.files()
.get(fileId)
.setSupportsAllDrives(true)
.setFields("id, name, createdTime, mimeType, size, exportLinks")
.setFields("id, name, createdTime, modifiedTime, mimeType, size, exportLinks")
.execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,49 @@

public class GoogleDriveAttributes {

public static final String ID = "drive.id";
public static final String DRIVE_ATTR_PREFIX = "drive.";

public static final String ID = DRIVE_ATTR_PREFIX + "id";
public static final String ID_DESC = "The id of the file";

public static final String FILENAME = CoreAttributes.FILENAME.key();
public static final String FILENAME_DESC = "The name of the file";

public static final String SIZE = "drive.size";
public static final String SIZE = DRIVE_ATTR_PREFIX + "size";
public static final String SIZE_DESC = "The size of the file. Set to 0 when the file size is not available (e.g. externally stored files).";

public static final String SIZE_AVAILABLE = "drive.size.available";
public static final String SIZE_AVAILABLE = DRIVE_ATTR_PREFIX + "size.available";
public static final String SIZE_AVAILABLE_DESC = "Indicates if the file size is known / available";

public static final String TIMESTAMP = "drive.timestamp";
public static final String TIMESTAMP = DRIVE_ATTR_PREFIX + "timestamp";
public static final String TIMESTAMP_DESC = "The last modified time or created time (whichever is greater) of the file." +
" The reason for this is that the original modified date of a file is preserved when uploaded to Google Drive." +
" 'Created time' takes the time when the upload occurs. However uploaded files can still be modified later.";

public static final String CREATED_TIME = DRIVE_ATTR_PREFIX + "created.time";
public static final String CREATED_TIME_DESC = "The file's creation time";

public static final String MODIFIED_TIME = DRIVE_ATTR_PREFIX + "modified.time";
public static final String MODIFIED_TIME_DESC = "The file's last modification time";

public static final String MIME_TYPE = CoreAttributes.MIME_TYPE.key();
public static final String MIME_TYPE_DESC = "The MIME type of the file";

public static final String PATH = DRIVE_ATTR_PREFIX + "path";
public static final String PATH_DESC = "The path of the file's directory from the base directory";

public static final String OWNER = DRIVE_ATTR_PREFIX + "owner";
public static final String OWNER_DESC = "The owner of the file";

public static final String LAST_MODIFYING_USER = DRIVE_ATTR_PREFIX + "last.modifying.user";
public static final String LAST_MODIFYING_USER_DESC = "The last modifying user of the file";

public static final String WEB_VIEW_LINK = DRIVE_ATTR_PREFIX + "web.view.link";
public static final String WEB_VIEW_LINK_DESC = "Web view link to the file";

public static final String WEB_CONTENT_LINK = DRIVE_ATTR_PREFIX + "web.content.link";
public static final String WEB_CONTENT_LINK_DESC = "Web content link to the file";

public static final String ERROR_MESSAGE = "error.message";
public static final String ERROR_MESSAGE_DESC = "The error message returned by Google Drive";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,18 @@
*/
package org.apache.nifi.processors.gcp.drive;

import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_AVAILABLE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;

import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class GoogleDriveFileInfo implements ListableEntity {
private static final RecordSchema SCHEMA;

static {
final List<RecordField> recordFields = new ArrayList<>();

recordFields.add(new RecordField(ID, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
recordFields.add(new RecordField(SIZE_AVAILABLE, RecordFieldType.BOOLEAN.getDataType(), false));
recordFields.add(new RecordField(TIMESTAMP, RecordFieldType.LONG.getDataType(), false));
recordFields.add(new RecordField(MIME_TYPE, RecordFieldType.STRING.getDataType()));

SCHEMA = new SimpleRecordSchema(recordFields);
}

private final String id;
private final String fileName;
Expand All @@ -60,6 +37,14 @@ public class GoogleDriveFileInfo implements ListableEntity {
private final long modifiedTime;
private final String mimeType;

private final String path;
private final String owner;
private final String lastModifyingUser;
private final String webViewLink;
private final String webContentLink;

private final RecordSchema recordSchema;

public String getId() {
return id;
}
Expand All @@ -84,22 +69,49 @@ public String getMimeType() {
return mimeType;
}

public String getPath() {
return path;
}

public String getLastModifyingUser() {
return lastModifyingUser;
}

public String getOwner() {
return owner;
}

public String getWebViewLink() {
return webViewLink;
}

public String getWebContentLink() {
return webContentLink;
}

@Override
public Record toRecord() {
final Map<String, Object> values = new HashMap<>();
return new MapRecord(recordSchema, toObjectMap());
}

values.put(ID, getId());
values.put(FILENAME, getName());
values.put(SIZE, getSize());
values.put(SIZE_AVAILABLE, isSizeAvailable());
values.put(TIMESTAMP, getTimestamp());
values.put(MIME_TYPE, getMimeType());
private Map<String, Object> toObjectMap() {
final Map<String, Object> attributes = new HashMap<>();

for (GoogleDriveFlowFileAttribute attribute : GoogleDriveFlowFileAttribute.values()) {
Optional.ofNullable(attribute.getValue(this))
.ifPresent(value -> attributes.put(attribute.getName(), value));
}

return new MapRecord(SCHEMA, values);
return attributes;
}

public static RecordSchema getRecordSchema() {
return SCHEMA;
public Map<String, String> toStringMap() {
return toObjectMap().entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().toString()
));
}

public static final class Builder {
Expand All @@ -110,6 +122,12 @@ public static final class Builder {
private long createdTime;
private long modifiedTime;
private String mimeType;
private String path;
private String owner;
private String lastModifyingUser;
private String webViewLink;
private String webContentLink;
private RecordSchema recordSchema;

public Builder id(String id) {
this.id = id;
Expand Down Expand Up @@ -146,6 +164,36 @@ public Builder mimeType(String mimeType) {
return this;
}

public Builder path(String path) {
this.path = path;
return this;
}

public Builder owner(String owner) {
this.owner = owner;
return this;
}

public Builder lastModifyingUser(String lastModifyingUser) {
this.lastModifyingUser = lastModifyingUser;
return this;
}

public Builder webViewLink(String webViewLink) {
this.webViewLink = webViewLink;
return this;
}

public Builder webContentLink(String webContentLink) {
this.webContentLink = webContentLink;
return this;
}

public Builder recordSchema(RecordSchema recordSchema) {
this.recordSchema = recordSchema;
return this;
}

public GoogleDriveFileInfo build() {
return new GoogleDriveFileInfo(this);
}
Expand Down Expand Up @@ -190,6 +238,12 @@ private GoogleDriveFileInfo(final Builder builder) {
this.createdTime = builder.createdTime;
this.modifiedTime = builder.modifiedTime;
this.mimeType = builder.mimeType;
this.path = builder.path;
this.owner = builder.owner;
this.lastModifyingUser = builder.lastModifyingUser;
this.webViewLink = builder.webViewLink;
this.webContentLink = builder.webContentLink;
this.recordSchema = builder.recordSchema;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,64 @@
*/
package org.apache.nifi.processors.gcp.drive;

import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;

import java.time.Instant;
import java.util.function.Function;

public enum GoogleDriveFlowFileAttribute {
ID(GoogleDriveAttributes.ID, GoogleDriveFileInfo::getId),
FILENAME(GoogleDriveAttributes.FILENAME, GoogleDriveFileInfo::getName),
SIZE(GoogleDriveAttributes.SIZE, fileInfo -> String.valueOf(fileInfo.getSize())),
SIZE_AVAILABLE(GoogleDriveAttributes.SIZE_AVAILABLE, fileInfo -> String.valueOf(fileInfo.isSizeAvailable())),
TIMESTAMP(GoogleDriveAttributes.TIMESTAMP, fileInfo -> String.valueOf(fileInfo.getTimestamp())),
MIME_TYPE(GoogleDriveAttributes.MIME_TYPE, GoogleDriveFileInfo::getMimeType);
ID(GoogleDriveAttributes.ID, GoogleDriveFileInfo::getId, RecordFieldType.STRING, false),
FILENAME(GoogleDriveAttributes.FILENAME, GoogleDriveFileInfo::getName, RecordFieldType.STRING, false),
SIZE(GoogleDriveAttributes.SIZE, GoogleDriveFileInfo::getSize, RecordFieldType.LONG, false),
SIZE_AVAILABLE(GoogleDriveAttributes.SIZE_AVAILABLE, GoogleDriveFileInfo::isSizeAvailable, RecordFieldType.BOOLEAN, false),
TIMESTAMP(GoogleDriveAttributes.TIMESTAMP, GoogleDriveFileInfo::getTimestamp, RecordFieldType.LONG, false),
CREATED_TIME(GoogleDriveAttributes.CREATED_TIME, fileInfo -> Instant.ofEpochMilli(fileInfo.getCreatedTime()).toString(), RecordFieldType.STRING, false),
MODIFIED_TIME(GoogleDriveAttributes.MODIFIED_TIME, fileInfo -> Instant.ofEpochMilli(fileInfo.getModifiedTime()).toString(), RecordFieldType.STRING, false),
MIME_TYPE(GoogleDriveAttributes.MIME_TYPE, GoogleDriveFileInfo::getMimeType, RecordFieldType.STRING, true),
PATH(GoogleDriveAttributes.PATH, GoogleDriveFileInfo::getPath, RecordFieldType.STRING, true),
OWNER(GoogleDriveAttributes.OWNER, GoogleDriveFileInfo::getOwner, RecordFieldType.STRING, true),
LAST_MODIFYING_USER(GoogleDriveAttributes.LAST_MODIFYING_USER, GoogleDriveFileInfo::getLastModifyingUser, RecordFieldType.STRING, true),
WEB_VIEW_LINK(GoogleDriveAttributes.WEB_VIEW_LINK, GoogleDriveFileInfo::getWebViewLink, RecordFieldType.STRING, true),
WEB_CONTENT_LINK(GoogleDriveAttributes.WEB_CONTENT_LINK, GoogleDriveFileInfo::getWebContentLink, RecordFieldType.STRING, true);

private final String name;
private final Function<GoogleDriveFileInfo, String> fromFileInfo;
private final Function<GoogleDriveFileInfo, Object> fromFileInfo;
private final RecordField recordField;

GoogleDriveFlowFileAttribute(String attributeName, Function<GoogleDriveFileInfo, String> fromFileInfo) {
this.name = attributeName;
GoogleDriveFlowFileAttribute(String name, Function<GoogleDriveFileInfo, Object> fromFileInfo, RecordFieldType recordFieldType, boolean recordFieldNullable) {
this.name = name;
this.fromFileInfo = fromFileInfo;
this.recordField = new RecordField(name, recordFieldType.getDataType(), recordFieldNullable);
}

public String getName() {
return name;
}

public String getValue(Record record) {
return record.getAsString(name);
public Object getValue(GoogleDriveFileInfo fileInfo) {
return fromFileInfo.apply(fileInfo);
}

public String getValue(GoogleDriveFileInfo fileInfo) {
return fromFileInfo.apply(fileInfo);
public RecordField getRecordField() {
return recordField;
}

public static GoogleDriveFlowFileAttribute getByName(final String name) {
for (GoogleDriveFlowFileAttribute item : values()) {
if (item.getName().equals(name)) {
return item;
}
}
throw new IllegalArgumentException("GoogleDriveFlowFileAttribute with name [" + name + "] does not exist");
}

public static boolean isValidName(final String name) {
try {
getByName(name);
return true;
} catch (IllegalArgumentException e) {
return false;
}
}
}
Loading