Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.texera.web.resource.dashboard.hub

import com.typesafe.scalalogging.Logger
import io.dropwizard.auth.Auth
import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
import org.apache.texera.auth.SessionUser
Expand All @@ -41,6 +42,7 @@ import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowResource.{
}
import org.jooq.Table
import org.jooq.impl.DSL
import org.slf4j.LoggerFactory

import java.util.regex.Pattern
import javax.servlet.http.HttpServletRequest
Expand All @@ -51,6 +53,8 @@ import scala.jdk.CollectionConverters._
import scala.language.existentials

object HubResource {
private lazy val logger: Logger = Logger(LoggerFactory.getLogger(getClass.getName))

// Represents an entity reference for general-purpose batch APIs.
// Used by: isLikedHelper, recordLikeAction, getCounts, userAccess
case class UserRequest(entityId: Integer, entityType: EntityType)
Expand Down Expand Up @@ -306,17 +310,28 @@ object HubResource {
.fetch()

records.asScala
.map { record =>
.flatMap { record =>
val dataset = record.into(DATASET).into(classOf[Dataset])
val datasetAccess = record.into(DATASET_USER_ACCESS).into(classOf[DatasetUserAccess])
val ownerEmail = record.into(USER).getEmail
DashboardDataset(
isOwner = if (uid == null) false else dataset.getOwnerUid == uid,
dataset = dataset,
accessPrivilege = datasetAccess.getPrivilege,
ownerEmail = ownerEmail,
size = LakeFSStorageClient.retrieveRepositorySize(dataset.getRepositoryName)
)
try {
Some(
DashboardDataset(
isOwner = if (uid == null) false else dataset.getOwnerUid == uid,
dataset = dataset,
accessPrivilege = datasetAccess.getPrivilege,
ownerEmail = ownerEmail,
size = LakeFSStorageClient.retrieveRepositorySize(dataset.getRepositoryName)
)
)
} catch {
case e: io.lakefs.clients.sdk.ApiException =>
logger.error(
s"LakeFS ApiException for dataset repository '${dataset.getRepositoryName}': ${e.getMessage}",
e
)
None
}
}
.toList
.distinctBy(_.dataset.getDid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.texera.service.resource

import com.typesafe.scalalogging.LazyLogging
import io.dropwizard.auth.Auth
import jakarta.annotation.security.RolesAllowed
import jakarta.ws.rs._
Expand Down Expand Up @@ -215,7 +216,7 @@ object DatasetResource {

@Produces(Array(MediaType.APPLICATION_JSON, "image/jpeg", "application/pdf"))
@Path("/dataset")
class DatasetResource {
class DatasetResource extends LazyLogging {
private val ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE = "User has no access to this dataset"
private val ERR_DATASET_VERSION_NOT_FOUND_MESSAGE = "The version of the dataset not found"
private val EXPIRATION_MINUTES = 5
Expand Down Expand Up @@ -1105,28 +1106,32 @@ class DatasetResource {
)
.where(DATASET.IS_PUBLIC.eq(true))
.fetch()
.map(record => {
.asScala
.flatMap { record =>
val dataset = record.into(DATASET).into(classOf[Dataset])
val ownerEmail = record.into(USER).getEmail
DashboardDataset(
isOwner = false,
dataset = dataset,
accessPrivilege = PrivilegeEnum.READ,
ownerEmail = ownerEmail,
size = LakeFSStorageClient.retrieveRepositorySize(dataset.getRepositoryName)
)
})
publicDatasets.forEach { publicDataset =>
try {
Some(
DashboardDataset(
isOwner = false,
dataset = dataset,
accessPrivilege = PrivilegeEnum.READ,
ownerEmail = ownerEmail,
size = LakeFSStorageClient.retrieveRepositorySize(dataset.getRepositoryName)
)
)
} catch {
case e: io.lakefs.clients.sdk.ApiException =>
logger.error(
s"LakeFS ApiException for dataset repository '${dataset.getRepositoryName}': ${e.getMessage}",
e
)
None
}
}
publicDatasets.foreach { publicDataset =>
if (!accessibleDatasets.exists(_.dataset.getDid == publicDataset.dataset.getDid)) {
val dashboardDataset = DashboardDataset(
isOwner = false,
dataset = publicDataset.dataset,
ownerEmail = publicDataset.ownerEmail,
accessPrivilege = PrivilegeEnum.READ,
size =
LakeFSStorageClient.retrieveRepositorySize(publicDataset.dataset.getRepositoryName)
)
accessibleDatasets = accessibleDatasets :+ dashboardDataset
accessibleDatasets = accessibleDatasets :+ publicDataset
}
}
accessibleDatasets.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,42 @@ class DatasetResourceSpec
datasetDao.fetchOneByDid(dataset.getDid) should not be null
}

"listDatasets" should "include a dataset whose LakeFS repo exists" in {
val repoName = s"list-ok-${System.nanoTime()}"
val dataset = new Dataset
dataset.setName(repoName)
dataset.setRepositoryName(repoName)
dataset.setDescription("list endpoint - healthy dataset")
dataset.setOwnerUid(ownerUser.getUid)
dataset.setIsPublic(true)
dataset.setIsDownloadable(true)
datasetDao.insert(dataset)
LakeFSStorageClient.initRepo(repoName)

val result = datasetResource.listDatasets(sessionUser)

result.map(_.dataset.getDid) should contain(dataset.getDid)
}

it should "exclude a dataset whose LakeFS repo has been deleted (orphan DB row)" in {
val repoName = s"list-orphan-${System.nanoTime()}"
val dataset = new Dataset
dataset.setName(repoName)
dataset.setRepositoryName(repoName)
dataset.setDescription("list endpoint - orphan DB row")
dataset.setOwnerUid(ownerUser.getUid)
dataset.setIsPublic(true)
dataset.setIsDownloadable(true)
datasetDao.insert(dataset)
LakeFSStorageClient.initRepo(repoName)
// Simulate the DB/LakeFS mismatch: delete the repo directly, leaving the DB row.
LakeFSStorageClient.deleteRepo(repoName)

val result = datasetResource.listDatasets(sessionUser)

result.map(_.dataset.getDid) should not contain dataset.getDid
}

"updateDatasetName" should "rename dataset successfully if user has write access" in {
val dataset = new Dataset
dataset.setName("rename-before")
Expand Down
Loading