Skip to content

Commit

Permalink
[Fix] [Connectors-v2-file-ftp] ftp source|sink add connection mode pa…
Browse files Browse the repository at this point in the history
…rameter test class (apache#6077)
  • Loading branch information
xumingbei committed Dec 26, 2023
1 parent 77633bf commit 39c0ff0
Show file tree
Hide file tree
Showing 5 changed files with 639 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.connector.file.ftp;

import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestHelper;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.stream.Stream;

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason =
"1.The apache-compress version is not compatible with apache-poi. 2.Spark Engine is not compatible with commons-net")
@Slf4j
public class FtpWithPassiveModeToFileIT extends TestSuiteBase implements TestResource {

private static final String FTP_IMAGE = "fauria/vsftpd:latest";

private static final String ftp_CONTAINER_HOST = "ftp";

private static final int FTP_PORT = 21;

private static final String USERNAME = "seatunnel";

private static final String PASSWORD = "pass";

private GenericContainer<?> ftpContainer;

@BeforeAll
@Override
public void startUp() throws Exception {
ftpContainer =
new GenericContainer<>(FTP_IMAGE)
.withExposedPorts(FTP_PORT)
.withNetwork(NETWORK)
.withExposedPorts(FTP_PORT)
.withNetworkAliases(ftp_CONTAINER_HOST)
.withEnv("FILE_OPEN_MODE", "0666")
.withEnv("WRITE_ENABLE", "YES")
.withEnv("ALLOW_WRITEABLE_CHROOT", "YES")
.withEnv("ANONYMOUS_ENABLE", "YES")
.withEnv("LOCAL_ENABLE", "YES")
.withEnv("LOCAL_UMASK", "000")
.withEnv("FTP_USER", USERNAME)
.withEnv("FTP_PASS", PASSWORD)
.withEnv("PASV_ADDRESS", "0.0.0.0")
.withEnv("PASV_ENABLE", "YES")
.withEnv("PASV_MIN_PORT", "21110")
.withEnv("PASV_MAX_PORT", "21115")
.withLogConsumer(new Slf4jLogConsumer(log))
.withPrivilegedMode(true);

ftpContainer.setPortBindings(
Lists.newArrayList(
"21:21",
"21110:21110",
"21111:21111",
"21112:21112",
"21113:21113",
"21114:21114",
"21115:21115"));
ftpContainer.start();
Startables.deepStart(Stream.of(ftpContainer)).join();
log.info("ftp container started");

ContainerUtil.copyFileIntoContainers(
"/json/e2e.json",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
ftpContainer);

ContainerUtil.copyFileIntoContainers(
"/text/e2e.txt",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
ftpContainer);

ContainerUtil.copyFileIntoContainers(
"/excel/e2e.xlsx",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
ftpContainer);

ContainerUtil.copyFileIntoContainers(
"/excel/e2e.xlsx",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
ftpContainer);

ftpContainer.execInContainer("sh", "-c", "chmod -R 777 /home/vsftpd/seatunnel/");
ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp /home/vsftpd/seatunnel/");
}

@TestTemplate
public void testFtpFileReadAndWrite(TestContainer container)
throws IOException, InterruptedException {
TestHelper helper = new TestHelper(container);
// test write ftp excel file
helper.execute("/excel/fake_source_to_ftp_passive_excel.conf");
// test read ftp excel file
helper.execute("/excel/ftp_passive_excel_to_assert.conf");
// test read ftp excel file with projection
helper.execute("/excel/ftp_passive_excel_projection_to_assert.conf");
// test read ftp excel file with filter
helper.execute("/excel/ftp_passive_filter_excel_to_assert.conf");
// // test write ftp text file
// helper.execute("/text/fake_to_ftp_file_text.conf");
// // test read skip header
// helper.execute("/text/ftp_file_text_skip_headers.conf");
// // test read ftp text file
// helper.execute("/text/ftp_file_text_to_assert.conf");
// // test read ftp text file with projection
// helper.execute("/text/ftp_file_text_projection_to_assert.conf");
// // test write ftp json file
// helper.execute("/json/fake_to_ftp_file_json.conf");
// // test read ftp json file
// helper.execute("/json/ftp_file_json_to_assert.conf");
// // test write ftp parquet file
// helper.execute("/parquet/fake_to_ftp_file_parquet.conf");
// // test write ftp orc file
// helper.execute("/orc/fake_to_ftp_file_orc.conf");
}

@AfterAll
@Override
public void tearDown() {
if (ftpContainer != null) {
ftpContainer.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
result_table_name = "ftp"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}
}

sink {
FtpFile {
host = "ftp"
port = 21
user = seatunnel
password = pass
path = "/tmp/seatunnel/excel"
source_table_name = "ftp"
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format_type = "excel"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
active_mode_status = false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FtpFile {
host = "ftp"
port = 21
user = seatunnel
password = pass
path = "/tmp/seatunnel/read/excel"
result_table_name = "ftp"
file_format_type = excel
field_delimiter = ;
read_columns = [c_string, c_boolean]
skip_header_row_number = 1
active_mode_status = false
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}
}


sink {
Assert {
source_table_name = "ftp"
rules {
row_rules = [
{
rule_type = MAX_ROW
rule_value = 5
}
],
field_rules = [
{
field_name = c_string
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = c_boolean
field_type = boolean
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}

0 comments on commit 39c0ff0

Please sign in to comment.