Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve debug logging #1129

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.source.{DataSource, SourceFormatType}
import com.linkedin.feathr.offline.util.PartitionLimiter
import com.linkedin.feathr.offline.util.datetime.DateTimeInterval
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.io.File
Expand All @@ -27,6 +28,7 @@ private[offline] abstract class DataSourceAccessor(val source: DataSource) {

private[offline] object DataSourceAccessor {

private val log = LogManager.getLogger(getClass)
/**
* create time series/composite source that contains multiple day/hour data
*
Expand All @@ -49,7 +51,8 @@ private[offline] object DataSourceAccessor {
addTimestampColumn: Boolean = false,
isStreaming: Boolean = false,
dataPathHandlers: List[DataPathHandler]): DataSourceAccessor = { //TODO: Add tests

val info = s"DataSourceAccessor handling ${source}, with interval ${dateIntervalOpt.getOrElse("None")}"
log.info(info)
val dataAccessorHandlers: List[DataAccessorHandler] = dataPathHandlers.map(_.dataAccessorHandler)
val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ private[offline] object PathPartitionedTimeSeriesSourceAccessor {
val df = fileLoaderFactory.create(path).loadDataFrame()
(df, interval)
})
log.info(s"Reading datasets for interval ${timeInterval} from paths: ${pathList.mkString(", ")}")

if (dataframes.isEmpty) {
val errMsg = s"Input data is empty for creating TimeSeriesSource. No available " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,20 @@ private[feathr] object FeathrUtils {
if (isDebugMode(ss)) {
val basePath = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DEBUG_OUTPUT_PATH)
val debugFeatureNames = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DEBUG_FEATURE_NAMES)
.split(FeathrUtils.STRING_PARAMETER_DELIMITER).toSet
.split(FeathrUtils.STRING_PARAMETER_DELIMITER).filter(_.nonEmpty).toSet
val outputNumParts = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.DEBUG_OUTPUT_PART_NUM)
val featureNames = "features_" + features.mkString("_") + "_"
if (debugFeatureNames.isEmpty || features.intersect(debugFeatureNames).nonEmpty) {
val savePath = SimplePath(basePath + "/" + featureNames + pathSuffix)
log.info(s"${tag}, Start dumping data ${featureNames} to ${savePath}")
val featureNames = "_for_features_" + features.mkString("_") + "_"
if (features.nonEmpty && (debugFeatureNames.isEmpty || features.intersect(debugFeatureNames).nonEmpty)) {
val savePath = SimplePath(basePath + "/" + tag.replaceAll("\\W", "_") +
featureNames.slice(0, 20) + pathSuffix)
log.info(s"${tag}, Start dumping data ${features.mkString(",")} to ${savePath}")
if (!df.isEmpty) {
SparkIOUtils.writeDataFrame(df, savePath, Map(FeathrUtils.DEBUG_OUTPUT_PART_NUM -> outputNumParts), List())
}
log.info(s"{tag}. Finish dumping data ${featureNames} to ${savePath}")
log.info(s"{tag}. Finish dumping data ${features.mkString(",")} to ${savePath}")
} else {
log.info(s"{tag}. Skipping dumping data as feature names to debug are ${debugFeatureNames}, " +
s"and current dataframe has feature ${featureNames}")
s"and current dataframe has feature ${features.mkString(",")}")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ object SlidingWindowJoin {
val pathBaseSuffix = "features_" + featureNames.mkString("_")
val leftJoinColumns = Seq(JOIN_KEY_COL_NAME)
val leftKeyDf = labelDFPartitioned.select(leftJoinColumns.head, leftJoinColumns.tail: _*)
FeathrUtils.dumpDebugInfo(spark, leftKeyDf, featureNames, "observation data", pathBaseSuffix + "for SWA before join" )
FeathrUtils.dumpDebugInfo(spark, leftKeyDf, featureNames, "observation data", pathBaseSuffix + "for_SWA_before_join" )
FeathrUtils.dumpDebugInfo(spark, factTransformedDf, featureNames, "SWA feature data", pathBaseSuffix + "_swa_feature")
}
}
Expand Down