Skip to content

Commit

Permalink
change modification_time to TimestampType
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaohua628 committed Dec 21, 2021
1 parent 65e79ab commit 3b3d635
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
Expand Up @@ -37,7 +37,7 @@ object PartitionedFileUtil {
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts,
file.getModificationTime, file.getLen)
file.getModificationTime * 1000L, file.getLen)
}
} else {
Seq(getPartitionedFile(file, filePath, partitionValues))
Expand All @@ -50,7 +50,7 @@ object PartitionedFileUtil {
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts,
file.getModificationTime, file.getLen)
file.getModificationTime * 1000L, file.getLen)
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
Expand Down
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}


/**
Expand Down Expand Up @@ -188,7 +188,7 @@ object FileFormat {
.add(StructField(FILE_PATH, StringType))
.add(StructField(FILE_NAME, StringType))
.add(StructField(FILE_SIZE, LongType))
.add(StructField(FILE_MODIFICATION_TIME, LongType))
.add(StructField(FILE_MODIFICATION_TIME, TimestampType))

// create a file metadata struct col
def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT)
Expand Down
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.NextIterator
* @param filePath URI of the file to read
* @param start the beginning offset (in bytes) of the block.
* @param length number of bytes to read.
* @param modificationTime The modification time of the input file, in milliseconds.
* @param modificationTime The modification time of the input file, in microseconds.
* @param fileSize The length of the input file (not the block), in bytes.
*/
case class PartitionedFile(
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources

import java.io.File
import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row}
Expand Down Expand Up @@ -101,13 +102,13 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
METADATA_FILE_PATH -> realF0.toURI.toString,
METADATA_FILE_NAME -> realF0.getName,
METADATA_FILE_SIZE -> realF0.length(),
METADATA_FILE_MODIFICATION_TIME -> realF0.lastModified()
METADATA_FILE_MODIFICATION_TIME -> new Timestamp(realF0.lastModified())
)
val f1Metadata = Map(
METADATA_FILE_PATH -> realF1.toURI.toString,
METADATA_FILE_NAME -> realF1.getName,
METADATA_FILE_SIZE -> realF1.length(),
METADATA_FILE_MODIFICATION_TIME -> realF1.lastModified()
METADATA_FILE_MODIFICATION_TIME -> new Timestamp(realF1.lastModified())
)

f(df, f0Metadata, f1Metadata)
Expand Down Expand Up @@ -158,9 +159,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
df.select(
// substring of file name
substring(col(METADATA_FILE_NAME), 1, 3),
// convert timestamp in millis to unixtime and to date format
from_unixtime(col(METADATA_FILE_MODIFICATION_TIME).divide(lit(1000)), "yyyy-MM")
.as("_file_modification_date"),
// format timestamp
date_format(col(METADATA_FILE_MODIFICATION_TIME), "yyyy-MM")
.as("_file_modification_year_month"),
// convert to kb
col(METADATA_FILE_SIZE).divide(lit(1024)).as("_file_size_kb"),
// get the file format
Expand Down

0 comments on commit 3b3d635

Please sign in to comment.