From e83cadc220863d78b2c256717abee88b18d33793 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Sun, 13 Nov 2016 18:25:27 +0800 Subject: [PATCH] [STORM-2082][SQL] add sql external module storm-sql-hdfs --- docs/storm-sql-reference.md | 22 ++- external/sql/pom.xml | 1 + .../storm-sql-external/storm-sql-hdfs/pom.xml | 104 ++++++++++++++ .../sql/hdfs/HdfsDataSourcesProvider.java | 135 ++++++++++++++++++ ...ache.storm.sql.runtime.DataSourcesProvider | 16 +++ .../sql/hdfs/TestHdfsDataSourcesProvider.java | 129 +++++++++++++++++ .../src/main/assembly/binary.xml | 7 + 7 files changed, 413 insertions(+), 1 deletion(-) create mode 100644 external/sql/storm-sql-external/storm-sql-hdfs/pom.xml create mode 100644 external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java create mode 100644 external/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider create mode 100644 external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java diff --git a/docs/storm-sql-reference.md b/docs/storm-sql-reference.md index 1accf558052..c14af1f197b 100644 --- a/docs/storm-sql-reference.md +++ b/docs/storm-sql-reference.md @@ -1272,6 +1272,7 @@ Please note that it supports only one letter for delimiter. | Kafka | org.apache.storm:storm-sql-kafka | `kafka://zkhost:port/broker_path?topic=topic` | Yes | Yes | Yes | Redis | org.apache.storm:storm-sql-redis | `redis://:[password]@host:port/[dbIdx]` | No | Yes | Yes | MongoDB | org.apache.stormg:storm-sql-mongodb | `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]` | No | Yes | Yes +| HDFS | org.apache.storm:storm-sql-hdfs | `hdfs://host:port/path-to-file` | No | Yes | Yes #### Socket @@ -1321,4 +1322,23 @@ You can use below as working reference for `--artifacts` option, and change depe `org.apache.storm:storm-sql-mongodb:2.0.0-SNAPSHOT,org.apache.storm:storm-mongodb:2.0.0-SNAPSHOT` -Storing record with preserving fields are not supported for now. \ No newline at end of file +Storing record with preserving fields are not supported for now. + +#### HDFS + +HDFS data source requires below properties to be set: + +* `hdfs.file.path`: HDFS file path +* `hdfs.file.name`: HDFS file name - please refer to [SimpleFileNameFormat]({{page.git-blob-base}}/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java) +* `hdfs.rotation.size.kb`: HDFS FileSizeRotationPolicy in KB +* `hdfs.rotation.time.seconds`: HDFS TimedRotationPolicy in seconds + +Please note that `hdfs.rotation.size.kb` and `hdfs.rotation.time.seconds` only one can be used for hdfs rotation. + +And note that `storm-sql-hdfs` requires users to provide `storm-hdfs`. +You can use below as working reference for `--artifacts` option, and change dependencies version if really needed: + +`org.apache.storm:storm-sql-hdfs:2.0.0-SNAPSHOT,org.apache.storm:storm-hdfs:2.0.0-SNAPSHOT` + +Also, hdfs configuration files should be provided. +You can put the `core-site.xml` and `hdfs-site.xml` into the `conf` directory which is in Storm installation directory. diff --git a/external/sql/pom.xml b/external/sql/pom.xml index 99d0c5038cb..02264f7463e 100644 --- a/external/sql/pom.xml +++ b/external/sql/pom.xml @@ -42,5 +42,6 @@ storm-sql-external/storm-sql-kafka storm-sql-external/storm-sql-redis storm-sql-external/storm-sql-mongodb + storm-sql-external/storm-sql-hdfs diff --git a/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml b/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml new file mode 100644 index 00000000000..9e0a5998cfa --- /dev/null +++ b/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml @@ -0,0 +1,104 @@ + + + + 4.0.0 + + + storm + org.apache.storm + 2.0.0-SNAPSHOT + ../../../../pom.xml + + + storm-sql-hdfs + + + + vesense + Xin Wang + data.xinwang@gmail.com + + + + + + org.apache.storm + storm-core + ${project.version} + provided + + + + org.slf4j + log4j-over-slf4j + + + + + org.apache.storm + storm-sql-runtime + ${project.version} + provided + + + org.apache.storm + storm-sql-runtime + ${project.version} + test + test-jar + + + org.apache.storm + storm-hdfs + ${project.version} + provided + + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + + + org.slf4j + slf4j-log4j12 + + + test + + + junit + junit + test + + + org.mockito + mockito-all + test + + + + src/jvm + src/test + + + ${basedir}/src/resources + + + + diff --git a/external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java new file mode 100644 index 00000000000..38c3fcb7464 --- /dev/null +++ b/external/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.hdfs; + +import com.google.common.base.Preconditions; +import org.apache.storm.hdfs.trident.HdfsState; +import org.apache.storm.hdfs.trident.HdfsStateFactory; +import org.apache.storm.hdfs.trident.HdfsUpdater; +import org.apache.storm.hdfs.trident.format.FileNameFormat; +import org.apache.storm.hdfs.trident.format.RecordFormat; +import org.apache.storm.hdfs.trident.format.SimpleFileNameFormat; +import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy; +import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy; +import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy; +import org.apache.storm.sql.runtime.DataSource; +import org.apache.storm.sql.runtime.DataSourcesProvider; +import org.apache.storm.sql.runtime.FieldInfo; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.sql.runtime.ISqlTridentDataSource; +import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer; +import org.apache.storm.sql.runtime.utils.FieldInfoUtils; +import org.apache.storm.sql.runtime.utils.SerdeUtils; +import org.apache.storm.trident.spout.ITridentDataSource; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.state.StateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; + +import java.net.URI; +import java.util.List; +import java.util.Properties; + +/** + * Create a HDFS sink based on the URI and properties. The URI has the format of hdfs://host:port/path-to-file + * The properties are in JSON format which specifies the name / path of the hdfs file and etc. + */ +public class HdfsDataSourcesProvider implements DataSourcesProvider { + + private static class HdfsTridentDataSource implements ISqlTridentDataSource { + private final String url; + private final Properties props; + private final IOutputSerializer serializer; + + private HdfsTridentDataSource(String url, Properties props, IOutputSerializer serializer) { + this.url = url; + this.props = props; + this.serializer = serializer; + } + + @Override + public ITridentDataSource getProducer() { + throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer"); + } + + @Override + public SqlTridentConsumer getConsumer() { + FileNameFormat fileNameFormat = new SimpleFileNameFormat() + .withPath(props.getProperty("hdfs.file.path", "/storm")) + .withName(props.getProperty("hdfs.file.name", "$TIME.$NUM.txt")); + + RecordFormat recordFormat = new TridentRecordFormat(serializer); + + FileRotationPolicy rotationPolicy; + String size = props.getProperty("hdfs.rotation.size.kb"); + String interval = props.getProperty("hdfs.rotation.time.seconds"); + Preconditions.checkArgument(size != null || interval != null, "Hdfs data source must contain file rotation config"); + + if (size != null) { + rotationPolicy = new FileSizeRotationPolicy(Float.parseFloat(size), FileSizeRotationPolicy.Units.KB); + } else { + rotationPolicy = new TimedRotationPolicy(Float.parseFloat(interval), TimedRotationPolicy.TimeUnit.SECONDS); + } + + HdfsState.Options options = new HdfsState.HdfsFileOptions() + .withFileNameFormat(fileNameFormat) + .withRecordFormat(recordFormat) + .withRotationPolicy(rotationPolicy) + .withFsUrl(url); + + StateFactory stateFactory = new HdfsStateFactory().withOptions(options); + StateUpdater stateUpdater = new HdfsUpdater(); + + return new SimpleSqlTridentConsumer(stateFactory, stateUpdater); + } + } + + private static class TridentRecordFormat implements RecordFormat { + private final IOutputSerializer serializer; + + private TridentRecordFormat(IOutputSerializer serializer) { + this.serializer = serializer; + } + + @Override + public byte[] format(TridentTuple tuple) { + //TODO we should handle '\n'. ref DelimitedRecordFormat + return serializer.write(tuple.getValues(), null).array(); + } + + } + + @Override + public String scheme() { + return "hdfs"; + } + + @Override + public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, + List fields) { + throw new UnsupportedOperationException(); + } + + @Override + public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, + Properties properties, List fields) { + List fieldNames = FieldInfoUtils.getFieldNames(fields); + IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames); + return new HdfsTridentDataSource(uri.toString(), properties, serializer); + } + +} diff --git a/external/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider new file mode 100644 index 00000000000..5fac84fc8f2 --- /dev/null +++ b/external/sql/storm-sql-external/storm-sql-hdfs/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.storm.sql.hdfs.HdfsDataSourcesProvider \ No newline at end of file diff --git a/external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java new file mode 100644 index 00000000000..14734386bad --- /dev/null +++ b/external/sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.hdfs; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.storm.hdfs.trident.HdfsState; +import org.apache.storm.hdfs.trident.HdfsStateFactory; +import org.apache.storm.hdfs.trident.HdfsUpdater; +import org.apache.storm.sql.runtime.DataSourcesRegistry; +import org.apache.storm.sql.runtime.FieldInfo; +import org.apache.storm.sql.runtime.ISqlTridentDataSource; +import org.apache.storm.trident.state.StateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.storm.hdfs.trident.HdfsState.HdfsFileOptions; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class TestHdfsDataSourcesProvider { + private static final List FIELDS = ImmutableList.of( + new FieldInfo("ID", int.class, true), + new FieldInfo("val", String.class, false)); + private static final Properties TBL_PROPERTIES = new Properties(); + + private static String hdfsURI; + private static MiniDFSCluster hdfsCluster; + + static { + TBL_PROPERTIES.put("hdfs.file.path", "/unittest"); + TBL_PROPERTIES.put("hdfs.file.name", "test1.txt"); + TBL_PROPERTIES.put("hdfs.rotation.time.seconds", "120"); + } + + @Before + public void setup() throws Exception { + Configuration conf = new Configuration(); + conf.set("fs.trash.interval", "10"); + conf.setBoolean("dfs.permissions", true); + File baseDir = new File("./target/hdfs/").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + hdfsCluster = builder.build(); + hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; + } + + @After + public void shutDown() throws IOException { + hdfsCluster.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void testHdfsSink() { + ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( + URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); + Assert.assertNotNull(ds); + + ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); + + Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); + Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); + + HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); + StateUpdater stateUpdater = consumer.getStateUpdater(); + + HdfsFileOptions options = mock(HdfsFileOptions.class); + Whitebox.setInternalState(state, "options", options); + + List tupleList = mockTupleList(); + + for (TridentTuple t : tupleList) { + stateUpdater.updateState(state, Collections.singletonList(t), null); + try { + verify(options).execute(Collections.singletonList(t)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private static List mockTupleList() { + List tupleList = new ArrayList<>(); + TridentTuple t0 = mock(TridentTuple.class); + TridentTuple t1 = mock(TridentTuple.class); + doReturn(1).when(t0).get(0); + doReturn(2).when(t1).get(0); + doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); + doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); + tupleList.add(t0); + tupleList.add(t1); + return tupleList; + } + +} diff --git a/storm-dist/binary/final-package/src/main/assembly/binary.xml b/storm-dist/binary/final-package/src/main/assembly/binary.xml index b099b1c6654..b58752efcb7 100644 --- a/storm-dist/binary/final-package/src/main/assembly/binary.xml +++ b/storm-dist/binary/final-package/src/main/assembly/binary.xml @@ -325,6 +325,13 @@ storm*jar + + ${project.basedir}/../../../external/sql/storm-sql-external/storm-sql-hdfs/target + external/sql/storm-sql-external/storm-sql-hdfs + + storm*jar + + ${project.basedir}/../../../external/sql/storm-sql-runtime/target/app-assembler/repo external/sql/storm-sql-runtime