Skip to content

Commit

Permalink
Fix multiple pipeline executors inside pipeline causes error apache#3503
Browse files Browse the repository at this point in the history


- Reinforcement of Transform and Data class creation code with
reflection to better signal errors
- Removed reflection to create RecordsFromStreamData
- Deprecated transform RecordsFromStream
  • Loading branch information
nadment authored and bamaer committed Feb 24, 2024
1 parent 0d778ab commit e035dd2
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ The records were passed to this transform using either the Copy rows to result t

You can enter the metadata of the fields you are expecting from the previous pipeline in a workflow.

NOTE: Deprecated use RowsFromResult instead

|
== Supported Engines
[%noheader,cols="2,1a",frame=none, role="table-supported-engines"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.hop.pipeline.transform;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -95,10 +95,9 @@ public ITransform createTransform(
PipelineMeta pipelineMeta,
Pipeline pipeline) {
try {
ParameterizedType parameterizedType =
(ParameterizedType) this.getClass().getGenericSuperclass();
Class<Main> mainClass = (Class<Main>) parameterizedType.getActualTypeArguments()[0];
Class<Data> dataClass = (Class<Data>) parameterizedType.getActualTypeArguments()[1];
Type[] parameterizedTypes = getParameterizedTypes(getClass());
Class<Main> mainClass = (Class<Main>) parameterizedTypes[0];
Class<Data> dataClass = (Class<Data>) parameterizedTypes[1];

// Some tests class use BaseTransformMeta<ITransform,ITransformData>
if (mainClass.isInterface()) return null;
Expand All @@ -115,30 +114,49 @@ public ITransform createTransform(
});
return constructor.newInstance(
new Object[] {transformMeta, this, data, copyNr, pipelineMeta, pipeline});
} catch (InstantiationException
| IllegalAccessException
| IllegalArgumentException
| InvocationTargetException
| NoSuchMethodException e) {
throw new RuntimeException("Error create instance of transform: " + this.getName(), e);
} catch (RuntimeException | ReflectiveOperationException e) {
throw new RuntimeException("Error create instance of transform: " + getClass().getCanonicalName(), e);
}
}

@Override
@SuppressWarnings({"unchecked"})
public ITransformData createTransformData() {
try {
ParameterizedType parameterizedType =
(ParameterizedType) this.getClass().getGenericSuperclass();
@SuppressWarnings({"unchecked"})
Class<Data> dataClass = (Class<Data>) parameterizedType.getActualTypeArguments()[1];

Type[] parameterizedTypes = getParameterizedTypes(getClass());

Class<Data> dataClass = (Class<Data>) parameterizedTypes[1];

// Some tests class use BaseTransformMeta<ITransform,ITransformData>
if (dataClass.isInterface()) return null;

return dataClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Error create instance of transform data: " + this.getName(), e);
return dataClass.getDeclaredConstructor().newInstance();
} catch (RuntimeException | ReflectiveOperationException e) {
throw new RuntimeException("Error create instance of transform data: " + getClass().getCanonicalName(), e);
}
}

protected Type[] getParameterizedTypes(Class<?> clazz) {
Type[] types = getGenericType(clazz);
if (types.length > 0 && types[0] instanceof ParameterizedType) {
return ((ParameterizedType) types[0]).getActualTypeArguments();
}
return null;
}

protected Type[] getGenericType(Class<?> clazz) {
do {
Type type = clazz.getGenericSuperclass();
if (type != null) {
if (type instanceof ParameterizedType) {
return new Type[] {type};
}
}
// If the class is not parameterized, try parent class
clazz = clazz.getSuperclass();
} while (clazz != null);

return new Type[0];
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,17 @@
package org.apache.hop.pipeline.transforms.recordsfromstream;

import org.apache.hop.core.annotations.Transform;
import org.apache.hop.pipeline.Pipeline;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.pipeline.transform.ITransform;
import org.apache.hop.pipeline.transform.ITransformData;
import org.apache.hop.pipeline.transform.TransformMeta;
import org.apache.hop.pipeline.transforms.rowsfromresult.RowsFromResultMeta;

/**
* @Deprecated Use RowsFromResultMeta
*/
@Deprecated
@Transform(
id = "RecordsFromStream",
image = "recordsfromstream.svg",
Expand All @@ -28,4 +37,24 @@
categoryDescription = "i18n:org.apache.hop.pipeline.transform:BaseTransform.Category.Streaming",
keywords = "i18n::RecordsFromStreamMeta.keyword",
documentationUrl = "/pipeline/transforms/getrecordsfromstream.html")
public class RecordsFromStreamMeta extends RowsFromResultMeta {}
public class RecordsFromStreamMeta extends RowsFromResultMeta {

public RecordsFromStreamMeta() {
super();
}

@Override
public ITransform createTransform(
TransformMeta transformMeta,
ITransformData data,
int copyNr,
PipelineMeta pipelineMeta,
Pipeline pipeline) {
return new RecordsFromStream(transformMeta, this, (RecordsFromStreamData) data, copyNr, pipelineMeta, pipeline);
}

@Override
public ITransformData createTransformData() {
return new RecordsFromStreamData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.hop.pipeline.transform.BaseTransform;
import org.apache.hop.pipeline.transform.TransformMeta;

/** Reads results from a previous pipeline in a Job */
/** Reads results from a previous pipeline in a Workflow */
public class RowsFromResult extends BaseTransform<RowsFromResultMeta, RowsFromResultData> {

private static final Class<?> PKG = RowsFromResult.class; // For Translator
Expand Down

0 comments on commit e035dd2

Please sign in to comment.