Skip to content

Commit

Permalink
Acxiom#345 Added a simple step that handles reading the first line of…
Browse files Browse the repository at this point in the history
… a file and creating a simple Schema.
  • Loading branch information
dafreels committed Mar 20, 2023
1 parent 068cb18 commit 5705db3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 20 deletions.
@@ -1,12 +1,13 @@
package com.acxiom.metalus.steps

import com.acxiom.metalus.PipelineContext
import com.acxiom.metalus.annotations.{StepFunction, StepObject, StepParameter, StepParameters, StepResults}
import com.acxiom.metalus.{PipelineContext, PipelineStepResponse}
import com.acxiom.metalus.annotations._
import com.acxiom.metalus.connectors.FileConnector
import com.acxiom.metalus.fs.{FileManager, FileResource}
import com.acxiom.metalus.sql.{Attribute, AttributeType, Schema}
import org.slf4j.LoggerFactory

import java.io.{InputStream, OutputStream}
import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream}
import java.util.Date

@StepObject
Expand Down Expand Up @@ -189,7 +190,7 @@ object FileManagerSteps {
"Get an InputStream",
"Gets an InputStream using the provided FileManager",
"Pipeline",
"Connectors")
"FileManager")
@StepParameters(Map(
"fileManager" -> StepParameter(None, Some(true), None, None,
None, None, Some("The FileManager to use to get the InputStream")),
Expand All @@ -207,7 +208,7 @@ object FileManagerSteps {
"Get an OutputStream",
"Gets an OutputStream using the provided FileManager",
"Pipeline",
"Connectors")
"FileManager")
@StepParameters(Map(
"fileManager" -> StepParameter(None, Some(true), None, None,
None, None, Some("The FileManager to use to get the OutputStream")),
Expand All @@ -231,7 +232,7 @@ object FileManagerSteps {
"Rename a File",
"Renames a file using the provided FileManager",
"Pipeline",
"Connectors")
"FileManager")
@StepParameters(Map(
"fileManager" -> StepParameter(None, Some(true), None, None,
None, None, Some("The FileManager to use for the rename operation")),
Expand All @@ -247,9 +248,8 @@ object FileManagerSteps {
"Get File Size",
"Gets the size of a file",
"Pipeline",
"Connectors")
@StepParameters(Map(
"fileManager" -> StepParameter(None, Some(true), None, None,
"FileManager")
@StepParameters(Map("fileManager" -> StepParameter(None, Some(true), None, None,
None, None, Some("The FileManager to use for the size operation")),
"path" -> StepParameter(None, Some(true), None, None,
None, None, Some("The path of the file"))
Expand All @@ -260,7 +260,7 @@ object FileManagerSteps {
"Does File Exist",
"Checks whether a file exists",
"Pipeline",
"Connectors")
"FileManager")
@StepParameters(Map(
"fileManager" -> StepParameter(None, Some(true), None, None,
None, None, Some("The FileManager to use for the size operation")),
Expand All @@ -273,7 +273,7 @@ object FileManagerSteps {
"Get a File Listing",
"Gets a file listing using the provided FileManager",
"Pipeline",
"Connectors")
"FileManager")
@StepParameters(Map(
"fileManager" -> StepParameter(None, Some(true), None, None,
None, None, Some("The FileManager to use for the rename operation")),
Expand All @@ -289,14 +289,32 @@ object FileManagerSteps {
"Get a Directory Listing",
"Gets a directory listing using the provided FileManager",
"Pipeline",
"Connectors")
"FileManager")
@StepParameters(Map(
"fileManager" -> StepParameter(None, Some(true), None, None,
None, None, Some("The FileManager to use for the rename operation")),
"path" -> StepParameter(None, Some(true), None, None,
None, None, Some("The path of the file being renamed"))
))
def getDirectoryListing(fileManager: FileManager, path: String): List[FileResource] = fileManager.getDirectoryListing(path)

@StepFunction("100b2c7d-c1fb-5fe2-b9d1-dd9fff103272",
"Read header from a file",
"This step will load the first line of a file and parse it into a Schema",
"Pipeline",
"FileManager")
@StepParameters(Map("file" -> StepParameter(None, Some(true), None, None, description = Some("The file resource to read")),
"separator" -> StepParameter(None, Some(false), None, None, description = Some("The column separator. Defaults to ,"))))
@StepResults(primaryType = "List[com.acxiom.metalus.sql.Schema]", secondaryTypes = None)
def readHeader(file: FileResource, separator: Option[String]): PipelineStepResponse = {
val input = new BufferedReader(new InputStreamReader(file.getInputStream()))
val head = input.readLine()
input.close()
val columns = head.split(separator.getOrElse(",").toCharArray.head)
.map(_.toUpperCase).toList
.map(col => Attribute(col, AttributeType("String"), None, None))
PipelineStepResponse(Some(Schema(columns)))
}
}

case class CopyResults(success: Boolean, fileSize: Long, durationMS: Long, startTime: Date, endTime: Date)
Expand Up @@ -3,6 +3,7 @@ package com.acxiom.metalus.steps
import com.acxiom.metalus._
import com.acxiom.metalus.connectors.{LocalFileConnector, SFTPFileConnector}
import com.acxiom.metalus.context.ContextManager
import com.acxiom.metalus.sql.Schema
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funspec.AnyFunSpec
import org.slf4j.event.Level
Expand Down Expand Up @@ -40,7 +41,7 @@ class FileManagerStepsTests extends AnyFunSpec with BeforeAndAfterAll {

describe("FileManagerSteps - Basic") {
val localFileConnector = LocalFileConnector("my-connector", None, None)
it("Should get input and output streams") {
it("should get input and output streams") {
val local = localFileConnector.getFileManager(pipelineContext)
val temp = File.createTempFile("fm-out", ".txt")
temp.deleteOnExit()
Expand All @@ -54,7 +55,7 @@ class FileManagerStepsTests extends AnyFunSpec with BeforeAndAfterAll {
assert(Source.fromInputStream(in).mkString == "chicken")
}

it("Should check if a file exists") {
it("should check if a file exists") {
val local = localFileConnector.getFileManager(pipelineContext)
val tempDir = Files.createTempDirectory("test").toFile
tempDir.deleteOnExit()
Expand All @@ -69,7 +70,7 @@ class FileManagerStepsTests extends AnyFunSpec with BeforeAndAfterAll {
assert(FileManagerSteps.exists(local, temp.getAbsolutePath))
}

it("Should rename a file") {
it("should rename a file") {
val local = localFileConnector.getFileManager(pipelineContext)
val temp = File.createTempFile("bad-name", ".txt")
temp.deleteOnExit()
Expand All @@ -84,7 +85,7 @@ class FileManagerStepsTests extends AnyFunSpec with BeforeAndAfterAll {
assert(FileManagerSteps.exists(local, temp1.getAbsolutePath))
}

it("Should get a file size") {
it("should get a file size") {
val local = localFileConnector.getFileManager(pipelineContext)
val tempDir = Files.createTempDirectory("test").toFile
tempDir.deleteOnExit()
Expand All @@ -100,7 +101,7 @@ class FileManagerStepsTests extends AnyFunSpec with BeforeAndAfterAll {
assert(FileManagerSteps.getSize(local, temp.getAbsolutePath) == size)
}

it("Should delete a file") {
it("should delete a file") {
val local = localFileConnector.getFileManager(pipelineContext)
val tempDir = Files.createTempDirectory("test").toFile
tempDir.deleteOnExit()
Expand All @@ -117,7 +118,7 @@ class FileManagerStepsTests extends AnyFunSpec with BeforeAndAfterAll {
assert(!FileManagerSteps.exists(local, temp.getAbsolutePath))
}

it("Should get a file listing") {
it("should get a file listing") {
val local = localFileConnector.getFileManager(pipelineContext)
val tempDir = Files.createTempDirectory("test").toFile
tempDir.deleteOnExit()
Expand All @@ -134,7 +135,7 @@ class FileManagerStepsTests extends AnyFunSpec with BeforeAndAfterAll {
assert(listing.head.fileName == "file.txt")
}

it("Should get a directory listing") {
it("should get a directory listing") {
val local = localFileConnector.getFileManager(pipelineContext)
val tempDir = Files.createTempDirectory("dlisting").toFile
tempDir.deleteOnExit()
Expand All @@ -153,9 +154,34 @@ class FileManagerStepsTests extends AnyFunSpec with BeforeAndAfterAll {
assert(listing.size == 1)
assert(listing.head.fileName == "sub")
}

it ("should read a header row") {
val local = FileManagerSteps.getFileManager(localFileConnector, pipelineContext)
val tempDir = Files.createTempDirectory("test").toFile
tempDir.deleteOnExit()
val temp = new File(tempDir.getAbsolutePath, "header.txt")
temp.deleteOnExit()
val file = local.getFileResource(temp.getAbsolutePath)
val out = file.getOutputStream()
out.write(
"""COL1|COL2|COL3|COL4
|1|TEST|Y|TN
|2|PROD|N|AR
|""".stripMargin.getBytes)
out.flush()
out.close()
val header = FileManagerSteps.readHeader(file, Some("|"))
assert(header.primaryReturn.isDefined)
val columns = header.primaryReturn.get.asInstanceOf[Schema]
assert(columns.attributes.size == Constants.FOUR)
assert(columns.attributes.head.name == "COL1")
assert(columns.attributes(1).name == "COL2")
assert(columns.attributes(2).name == "COL3")
assert(columns.attributes(3).name == "COL4")
}
}

describe("FileManagerSteps - Copy") {
describe("FileManagerSteps - SFTP") {
it("Should fail when strict host checking is enabled against localhost") {
val sftpConnector = SFTPFileConnector("localhost", "sftp-connector", None,
Some(UserNameCredential(Map("username" -> "tester", "password" -> "testing"))),
Expand Down Expand Up @@ -192,6 +218,7 @@ class FileManagerStepsTests extends AnyFunSpec with BeforeAndAfterAll {
val copiedHdfsFile = local.getFileListing(temp.getParent).find(_.fileName == "COPIED_DATA.csv")
assert(copiedHdfsFile.isDefined)
assert(originalSftpFile.get.size == copiedHdfsFile.get.size)
assert(FileManagerSteps.compareFileSizes(sftp, "/MOCK_DATA.csv", local, s"${temp.getParent}/COPIED_DATA.csv") == 0)

// Copy from HDFS to SFTP
FileManagerSteps.copy(local, temp.getAbsolutePath, sftp, "/HDFS_COPIED_DATA.csv")
Expand Down

0 comments on commit 5705db3

Please sign in to comment.