Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve debug logging #1150

Merged
merged 1 commit into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.source.{DataSource, SourceFormatType}
import com.linkedin.feathr.offline.util.PartitionLimiter
import com.linkedin.feathr.offline.util.datetime.DateTimeInterval
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.io.File
Expand All @@ -27,6 +28,7 @@ private[offline] abstract class DataSourceAccessor(val source: DataSource) {

private[offline] object DataSourceAccessor {

private val log = LogManager.getLogger(getClass)
/**
* create time series/composite source that contains multiple day/hour data
*
Expand All @@ -49,7 +51,8 @@ private[offline] object DataSourceAccessor {
addTimestampColumn: Boolean = false,
isStreaming: Boolean = false,
dataPathHandlers: List[DataPathHandler]): DataSourceAccessor = { //TODO: Add tests

val info = s"DataSourceAccessor handling ${source}, with interval ${dateIntervalOpt.getOrElse("None")}"
log.info(info)
val dataAccessorHandlers: List[DataAccessorHandler] = dataPathHandlers.map(_.dataAccessorHandler)
val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ private[offline] object PathPartitionedTimeSeriesSourceAccessor {
val df = fileLoaderFactory.create(path).loadDataFrame()
(df, interval)
})
log.info(s"Reading datasets for interval ${timeInterval} from paths: ${pathList.mkString(", ")}")

if (dataframes.isEmpty) {
val errMsg = s"Input data is empty for creating TimeSeriesSource. No available " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,19 +198,20 @@ private[feathr] object FeathrUtils {
if (isDebugMode(ss)) {
val basePath = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DEBUG_OUTPUT_PATH)
val debugFeatureNames = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DEBUG_FEATURE_NAMES)
.split(FeathrUtils.STRING_PARAMETER_DELIMITER).toSet
.split(FeathrUtils.STRING_PARAMETER_DELIMITER).filter(_.nonEmpty).toSet
val outputNumParts = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DEBUG_OUTPUT_PART_NUM)
val featureNames = "features_" + features.mkString("_") + "_"
if (debugFeatureNames.isEmpty || features.intersect(debugFeatureNames).nonEmpty) {
val savePath = SimplePath(basePath + "/" + featureNames + pathSuffix)
log.info(s"${tag}, Start dumping data ${featureNames} to ${savePath}")
val featureNames = "_for_features_" + features.mkString("_") + "_"
if (features.nonEmpty && (debugFeatureNames.isEmpty || features.intersect(debugFeatureNames).nonEmpty)) {
val savePath = SimplePath(basePath + "/" + tag.replaceAll("\\W", "_") +
featureNames.slice(0, 20) + pathSuffix)
log.info(s"${tag}, Start dumping data ${features.mkString(",")} to ${savePath}")
if (!df.isEmpty) {
SparkIOUtils.writeDataFrame(df, savePath, Map(FeathrUtils.DEBUG_OUTPUT_PART_NUM -> outputNumParts), List())
}
log.info(s"{tag}. Finish dumping data ${featureNames} to ${savePath}")
log.info(s"{tag}. Finish dumping data ${features.mkString(",")} to ${savePath}")
} else {
log.info(s"{tag}. Skipping dumping data as feature names to debug are ${debugFeatureNames}, " +
s"and current dataframe has feature ${featureNames}")
s"and current dataframe has feature ${features.mkString(",")}")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ object SlidingWindowJoin {
val pathBaseSuffix = "features_" + featureNames.mkString("_")
val leftJoinColumns = Seq(JOIN_KEY_COL_NAME)
val leftKeyDf = labelDFPartitioned.select(leftJoinColumns.head, leftJoinColumns.tail: _*)
FeathrUtils.dumpDebugInfo(spark, leftKeyDf, featureNames, "observation data", pathBaseSuffix + "for SWA before join" )
FeathrUtils.dumpDebugInfo(spark, leftKeyDf, featureNames, "observation data", pathBaseSuffix + "for_SWA_before_join" )
FeathrUtils.dumpDebugInfo(spark, factTransformedDf, featureNames, "SWA feature data", pathBaseSuffix + "_swa_feature")
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=1.0.2-rc6
version=1.0.2-rc7
SONATYPE_AUTOMATIC_RELEASE=true
POM_ARTIFACT_ID=feathr_2.12