-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-3724 - Add Put/Fetch Parquet Processors #1712
Conversation
…hParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - Refactoring AbstractPutHDFSRecord to use schema access strategy - Adding custom validate to AbstractPutHDFSRecord and adding handling of UNION types when writing Records as Avro - Refactoring project structure to get CS API references out of nifi-commons, introducing nifi-extension-utils under nifi-nar-bundles - Updating abstract put/fetch processors to obtain the WriteResult and update flow file attributes
Reviewing... |
|
||
// properties | ||
public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() | ||
.name("Hadoop Configuration Resources") | ||
.description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop " | ||
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") | ||
.required(false) | ||
.addValidator(createMultipleFilesExistValidator()) | ||
.addValidator(HadoopValidators.MULTIPLE_FILE_EXISTS_VALIDATOR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment -- until I read the source code for this, my interpretation was that this validator ensured that multiple files existed -- i.e. one file provided would fail. Perhaps we can rename this ONE_OR_MORE_FILES_EXIST_VALIDATOR
? Not a giant issue but potentially confusing.
public static final String AVRO_SCHEMA_FORMAT = "avro"; | ||
|
||
public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException { | ||
final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we guard against null
here?
} | ||
|
||
public static DataType determineDataType(final Schema avroSchema) { | ||
final Type avroType = avroSchema.getType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment for null
check.
"The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'"); | ||
|
||
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() | ||
.name("Schema Registry") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a name
value without a space and provide a displayName
field with human-facing value (i.e. "Schema Registry").
.build(); | ||
|
||
public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder() | ||
.name("Schema Access Strategy") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a name
value without a space and provide a displayName
field with human-facing value (i.e. "Schema Access Strategy").
|
||
public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder() | ||
.name("Schema Name") | ||
.description("Specifies the name of the schema to lookup in the Schema Registry property") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a name
value without a space and provide a displayName
field with human-facing value (i.e. "Schema Name").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to be clear the only thing that needs to be truly Human Readable is displayName. However, the 'name' doesn't have to look like something only a computer would like :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mean it has to be difficult to read, but as this is used as the value identifier when the flow is serialized to XML, some formatters/etc. could break lines on the space or otherwise manipulate it. I think it's safer to avoid spaces (and most of the other examples are formatted-like-this
).
recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema); | ||
|
||
// if we fail to create the RecordReader then we want to route to failure, so we need to | ||
// handle this separately from the other IOExceptions which normally rout to retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rout -> route.
successFlowFile = session.putAllAttributes(successFlowFile, attributes); | ||
|
||
final URI uri = path.toUri(); | ||
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, successFlowFile, stopWatch.getDuration()}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add unit of milliseconds
to the log output of the duration.
} | ||
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve | ||
} | ||
if (!renamed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My read of this is that if the rename operation fails 10x, the source file is deleted. Is that captured anywhere in the docs/Javadocs, etc.? Would be a little confusing for a user unless the only context for this method is renaming the temporary file to the persistent one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior is that it will try up to 10 times to rename the dot file to the final name, and then if it still hasn't been renamed it will delete the dot file and route the flow file to failure. This behavior came from the existing PutHDFS so I kept the same behavior for consistency, but I can add something to the capability description of PutParquet describing this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a method Javadoc comment explaining that is fine.
I ran I loaded a template provided by Bryan which generated flowfiles, merged them, and wrote them to Parquet format (on local disk using the
If the One minor issue:
|
Thanks Andy, I just pushed a commit that addresses your comments, we should be good to go. I am going to look into the template issue, but I agree that it is not caused by the changes in this PR. |
Thanks. Merging. |
Hi, Does this enable writing the parquet files to S3 bucket? If not then is there any way I can achieve the same? |
Has anybody been able to use fetchParquet processor successfully? I am getting SchemaNotFound exception. I have created the file with PutParquet and Spark can read this parquet file. |
@nellashapiro123 it would probably be best to ask this on the mailing lists: If you send an email, please provide more info about your flow like which reader and writer is FetchParquet using? what schema access strategy is each reader and writer using? and if using schema access by name, what is the value of the schema.name attribute coming into FetchParquet? |
This PR adds a new nifi-parquet-bundle with PutParquet and FetchParquet processors. These work similar to PutHDFS and FetchHDFS, but instead read and write Records.
While working on this I needed to reuse portions of the record reader/writer code, and thus refactored some of the project structure which caused many files to move around.
Summary of changes:
To test the Parquet processors you can create a core-site.xml with a local file system and read/write parquet to local directories: