Skip to content

Commit

Permalink
Add JpaCursorItemReader implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
fmbenhassine authored and mminella committed Sep 16, 2020
1 parent d87588c commit 4185400
Show file tree
Hide file tree
Showing 4 changed files with 654 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.database;

import java.util.Iterator;
import java.util.Map;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Query;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.database.orm.JpaQueryProvider;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/**
* {@link org.springframework.batch.item.ItemStreamReader} implementation based
* on JPA {@link Query#getResultStream()}. It executes the JPQL query when
* initialized and iterates over the result set as {@link #read()} method is called,
* returning an object corresponding to the current row. The query can be set
* directly using {@link #setQueryString(String)}, or using a query provider via
* {@link #setQueryProvider(JpaQueryProvider)}.
*
* The implementation is <b>not</b> thread-safe.
*
* @author Mahmoud Ben Hassine
* @param <T> type of items to read
* @since 4.3
*/
public class JpaCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T>
implements InitializingBean {

private EntityManagerFactory entityManagerFactory;
private EntityManager entityManager;
private String queryString;
private JpaQueryProvider queryProvider;
private Map<String, Object> parameterValues;
private Iterator<T> iterator;

/**
* Create a new {@link JpaCursorItemReader}.
*/
public JpaCursorItemReader() {
setName(ClassUtils.getShortName(JpaCursorItemReader.class));
}

/**
* Set the JPA entity manager factory.
*
* @param entityManagerFactory JPA entity manager factory
*/
public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
this.entityManagerFactory = entityManagerFactory;
}

/**
* Set the JPA query provider.
*
* @param queryProvider JPA query provider
*/
public void setQueryProvider(JpaQueryProvider queryProvider) {
this.queryProvider = queryProvider;
}

/**
* Set the JPQL query string.
*
* @param queryString JPQL query string
*/
public void setQueryString(String queryString) {
this.queryString = queryString;
}

/**
* Set the parameter values to be used for the query execution.
*
* @param parameterValues the values keyed by parameter names used in
* the query string.
*/
public void setParameterValues(Map<String, Object> parameterValues) {
this.parameterValues = parameterValues;
}

@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(this.entityManagerFactory, "EntityManagerFactory is required");
if (this.queryProvider == null) {
Assert.hasLength(this.queryString, "Query string is required when queryProvider is null");
}
}

@Override
@SuppressWarnings("unchecked")
protected void doOpen() throws Exception {
this.entityManager = this.entityManagerFactory.createEntityManager();
if (this.entityManager == null) {
throw new DataAccessResourceFailureException("Unable to create an EntityManager");
}
if (this.queryProvider != null) {
this.queryProvider.setEntityManager(this.entityManager);
}
Query query = createQuery();
if (this.parameterValues != null) {
this.parameterValues.forEach(query::setParameter);
}
this.iterator = query.getResultStream().iterator();
}

private Query createQuery() {
if (this.queryProvider == null) {
return this.entityManager.createQuery(this.queryString);
}
else {
return this.queryProvider.createQuery();
}
}

@Override
protected T doRead() {
return this.iterator.hasNext() ? this.iterator.next() : null;
}

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
super.update(executionContext);
this.entityManager.clear();
}

@Override
protected void doClose() {
if (this.entityManager != null) {
this.entityManager.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.database.builder;

import java.util.Map;

import javax.persistence.EntityManagerFactory;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.item.database.JpaCursorItemReader;
import org.springframework.batch.item.database.orm.JpaQueryProvider;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.util.Assert;

/**
* Builder for {@link JpaCursorItemReader}.
*
* @author Mahmoud Ben Hassine
*
* @since 4.3
*/
public class JpaCursorItemReaderBuilder<T> {

private EntityManagerFactory entityManagerFactory;
private String queryString;
private JpaQueryProvider queryProvider;
private Map<String, Object> parameterValues;
private boolean saveState = true;
private String name;
private int maxItemCount = Integer.MAX_VALUE;
private int currentItemCount;

/**
* Configure if the state of the {@link ItemStreamSupport}
* should be persisted within the {@link ExecutionContext}
* for restart purposes.
*
* @param saveState defaults to true
* @return The current instance of the builder.
*/
public JpaCursorItemReaderBuilder<T> saveState(boolean saveState) {
this.saveState = saveState;

return this;
}

/**
* The name used to calculate the key within the {@link ExecutionContext}.
* Required if {@link #saveState(boolean)} is set to true.
*
* @param name name of the reader instance
* @return The current instance of the builder.
* @see ItemStreamSupport#setName(String)
*/
public JpaCursorItemReaderBuilder<T> name(String name) {
this.name = name;

return this;
}

/**
* Configure the max number of items to be read.
*
* @param maxItemCount the max items to be read
* @return The current instance of the builder.
* @see AbstractItemCountingItemStreamItemReader#setMaxItemCount(int)
*/
public JpaCursorItemReaderBuilder<T> maxItemCount(int maxItemCount) {
this.maxItemCount = maxItemCount;

return this;
}

/**
* Index for the current item. Used on restarts to indicate where to start from.
*
* @param currentItemCount current index
* @return this instance for method chaining
* @see AbstractItemCountingItemStreamItemReader#setCurrentItemCount(int)
*/
public JpaCursorItemReaderBuilder<T> currentItemCount(int currentItemCount) {
this.currentItemCount = currentItemCount;

return this;
}

/**
* A map of parameter values to be set on the query. The key of the map is
* the name of the parameter to be set with the value being the value to be set.
*
* @param parameterValues map of values
* @return this instance for method chaining
* @see JpaCursorItemReader#setParameterValues(Map)
*/
public JpaCursorItemReaderBuilder<T> parameterValues(Map<String, Object> parameterValues) {
this.parameterValues = parameterValues;

return this;
}

/**
* A query provider. This should be set only if {@link #queryString(String)}
* have not been set.
*
* @param queryProvider the query provider
* @return this instance for method chaining
* @see JpaCursorItemReader#setQueryProvider(JpaQueryProvider)
*/
public JpaCursorItemReaderBuilder<T> queryProvider(JpaQueryProvider queryProvider) {
this.queryProvider = queryProvider;

return this;
}

/**
* The JPQL query string to execute. This should only be set if
* {@link #queryProvider(JpaQueryProvider)} has not been set.
*
* @param queryString the JPQL query
* @return this instance for method chaining
* @see JpaCursorItemReader#setQueryString(String)
*/
public JpaCursorItemReaderBuilder<T> queryString(String queryString) {
this.queryString = queryString;

return this;
}

/**
* The {@link EntityManagerFactory} to be used for executing the configured
* {@link #queryString}.
*
* @param entityManagerFactory {@link EntityManagerFactory} used to create
* {@link javax.persistence.EntityManager}
* @return this instance for method chaining
*/
public JpaCursorItemReaderBuilder<T> entityManagerFactory(EntityManagerFactory entityManagerFactory) {
this.entityManagerFactory = entityManagerFactory;

return this;
}

/**
* Returns a fully constructed {@link JpaCursorItemReader}.
*
* @return a new {@link JpaCursorItemReader}
*/
public JpaCursorItemReader<T> build() {
Assert.notNull(this.entityManagerFactory, "An EntityManagerFactory is required");
if (this.saveState) {
Assert.hasText(this.name, "A name is required when saveState is set to true");
}
if (this.queryProvider == null) {
Assert.hasLength(this.queryString, "Query string is required when queryProvider is null");
}

JpaCursorItemReader<T> reader = new JpaCursorItemReader<>();
reader.setEntityManagerFactory(this.entityManagerFactory);
reader.setQueryProvider(this.queryProvider);
reader.setQueryString(this.queryString);
reader.setParameterValues(this.parameterValues);
reader.setCurrentItemCount(this.currentItemCount);
reader.setMaxItemCount(this.maxItemCount);
reader.setSaveState(this.saveState);
reader.setName(this.name);
return reader;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.database;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.sample.Foo;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;

/**
* @author Mahmoud Ben Hassine
*/
public class JpaCursorItemReaderCommonTests extends
AbstractDatabaseItemStreamItemReaderTests {

@Override
protected ItemReader<Foo> getItemReader() throws Exception {
LocalContainerEntityManagerFactoryBean factoryBean =
new LocalContainerEntityManagerFactoryBean();
factoryBean.setDataSource(getDataSource());
factoryBean.setPersistenceUnitName("bar");
factoryBean.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
factoryBean.afterPropertiesSet();

String jpqlQuery = "from Foo";
JpaCursorItemReader<Foo> itemReader = new JpaCursorItemReader<>();
itemReader.setQueryString(jpqlQuery);
itemReader.setEntityManagerFactory(factoryBean.getObject());
itemReader.afterPropertiesSet();
itemReader.setSaveState(true);
return itemReader;
}

@Override
protected void pointToEmptyInput(ItemReader<Foo> tested) throws Exception {
JpaCursorItemReader<Foo> reader = (JpaCursorItemReader<Foo>) tested;
reader.close();
reader.setQueryString("from Foo foo where foo.id = -1");
reader.afterPropertiesSet();
reader.open(new ExecutionContext());
}
}
Loading

0 comments on commit 4185400

Please sign in to comment.