## Database

In [22]:
import pymongo
import boto3
from urllib.parse import quote_plus
from botocore.client import Config
from pymongo.collection import Collection

In [23]:
mongoUser = "mongotestaccess"
mongoPassword = "mongotestsecret"
mongoHost = "localhost"
mongoPort = "27017"
mongoDB = "fairscape"
mongoUserCollection = "users"
mongoIdentifierCollection = "mds"
mongoROCrateCollection = "rocrate"
mongoAsyncCollection = "async"

minioAccessKey = "miniotestadmin"
minioSecretKey = "miniotestsecret"
minioEndpoint = "http://localhost:9000"
minioDefaultBucket = "fairscape"
minioDefaultPath = "fairscape"

In [24]:
connection_string = f"mongodb://{quote_plus(mongoUser)}:{quote_plus(mongoPassword)}@{mongoHost}:{mongoPort}"
mongoClient = pymongo.MongoClient(connection_string)

mongoDB = mongoClient[mongoDB]

userCollection = mongoDB[mongoUserCollection]
identifierCollection = mongoDB[mongoIdentifierCollection]
rocrateCollection = mongoDB[mongoROCrateCollection]
asyncCollection = mongoDB[mongoAsyncCollection]

In [25]:
s3 = boto3.client('s3',
        endpoint_url=minioEndpoint,
        aws_access_key_id=minioAccessKey,
        aws_secret_access_key=minioSecretKey,
        config=Config(signature_version='s3v4'),
        region_name='us-east-1'
    )

try:
    s3.create_bucket(Bucket=minioDefaultBucket)
except:
    pass

In [26]:
# set up support for compression headers
def _add_header(request, **kwargs):
    request.headers.add_header('x-minio-extract', 'true')

event_system = s3.meta.events
event_system.register_first('before-sign.s3.*', _add_header)

## Setting up Mongo Indices

In [40]:
# create plain text search on description name author for identifier collection
identifierCollection.create_index(
    [( "description", "text" )]
)

'description_text'

In [39]:
identifierCollection.create_index("@id", unique=True)

'@id_1'

In [38]:
# clear indices
identifierCollection.drop_indexes()

In [42]:
# text query example
query = { "$text": { "$search": "a time-traveling DeLorean" } }
cursor = identifierCollection.find(query)

list(cursor)

[]

### Clearing Mongo Minio for Tests

# Models

In [28]:
def clearMongo():
    userCollection.delete_many({})
    identifierCollection.delete_many({})
    asyncCollection.delete_many({})
    rocrateCollection.delete_many({})

In [40]:
def clearMinio():
    paginator = s3.get_paginator('list_objects')
    page_iterator = paginator.paginate(Bucket=minioDefaultBucket)
    
    for page in page_iterator:
        try:
            for obj in page['Contents']:
                s3.delete_object(Bucket=minioDefaultBucket, Key=obj["Key"])
        except KeyError:
            pass

In [41]:
clearMongo()

In [42]:
clearMinio()

### User/Auth


In [43]:
from pydantic import (
    BaseModel,
    Field
)
from typing import (
    Optional,
    List,
    Literal
)
import datetime
import jwt

In [44]:
# permissions
class Permissions(BaseModel):
    owner: str
    group: Optional[str] = Field(default=None)

In [45]:
class UserCreateModel(BaseModel):
    email: str
    firstName: str
    lastName: str
    password: str



class UserWriteModel(UserCreateModel):
    metadataType: Literal['Person'] = Field(alias="@type", default="Person")
    session: Optional[str] = Field(default=None)
    groups: Optional[List[str]] = Field(default=[])
    datasets: Optional[List[str]] = Field(default=[])
    software: Optional[List[str]] = Field(default=[])
    computations: Optional[List[str]] = Field(default=[])
    rocrates: Optional[List[str]] = Field(default=[])

    def getPermissions(self)->Permissions:
        permissionsDict = {
            "owner": self.email,
        }
        
        if len(self.groups)>0:
            permissionsDict['group'] = self.groups[0]
        else:
            permissionsDict['group'] = None

            
        return Permissions.model_validate(permissionsDict)



In [46]:
def createUser(userCollection: Collection, userInstance: UserCreateModel)->pymongo.results.InsertOneResult:
    userWriteInstance = UserWriteModel.model_validate({**userInstance.model_dump()})
    
    insertResult = userCollection.insert_one(
        userWriteInstance.model_dump(by_alias=True)
    )

    return insertResult

### Fairscape Request/Response

In [390]:
class FairscapeResponse():
    def __init__(self, success: bool, statusCode: int, model=None, fileResponse=None, error: dict=None):
        self.model = model
        self.success = success
        self.statusCode = statusCode
        self.error = error
        self.fileResponse = fileResponse
        

In [332]:
class FairscapeRequest():
    def __init__(
        self, 
        minioClient, 
        minioBucket, 
        identifierCollection, 
        userCollection, 
        asyncCollection,
        rocrateCollection=None,v
        jwtSecret: str ="",
    ):
        self.minioClient=minioClient
        self.minioBucket=minioBucket
        self.minioDefaultPath="fairscape"
        self.identifierCollection=identifierCollection
        self.userCollection=userCollection
        self.rocrateCollection=rocrateCollection
        self.asyncCollection=asyncCollection
        self.jwtSecret = jwtSecret

    def getMetadata(self, guid: str):
        return self.identifierCollection.find_one({"@id": guid})

    def getUserBySession(self, session: str):
        foundUser = userCollection.find_one({
            "session": session
        })

        if foundUser is None:
            return None
        else:
            return UserWriteModel.model_validate(foundUser)

    
    def loginUser(self, userEmail: str, userPassword: str):
        """ Get a user record, create a session for the 
        """
    
        foundUser = self.userCollection.find_one({
            "email": userEmail,
            "password": userPassword
        })
    
        if foundUser is None:
            return None
    
        # create a token for the user
        userEmail = foundUser['email']
        fullname = ' '.join([foundUser['firstName'], foundUser['lastName']])
        now = datetime.datetime.now(datetime.timezone.utc)
        exp = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=24)
    
        nowTimestamp = datetime.datetime.timestamp(now)
        expTimestamp = datetime.datetime.timestamp(exp)
    
        tokenMessage = {
            'iss': 'https://fairscape.net/',
            'sub':  userEmail,
            'name': fullname,
            'email': userEmail,
            'iat': int(nowTimestamp),
            'exp': int(expTimestamp)
        }
    
        compactJWS = jwt.encode(
            tokenMessage, 
            self.jwtSecret, 
            algorithm="HS256"
        )
        
        # set session in userCollection
        updateTokenResult = userCollection.update_one({
            "email": userEmail,
            "password": userPassword
            },
            {
            "$set": {"session": compactJWS}
            }
        )
    
        #TODO check that update is correct
    
        return compactJWS
        

#### User/Auth Tests

In [333]:
testUserInstance = UserCreateModel(
    email="mal8ch@virginia.edu",
    firstName="Max",
    lastName="Levinson",
    password="roaringtwenties"
)

testJWTSecret = "test-secret"

In [334]:
createUserResult = createUser(userCollection, testUserInstance)
createUserResult

InsertOneResult(ObjectId('68187227a9f7d5c166f0c72d'), acknowledged=True)

In [335]:
# login user
loginUserResult = loginUser(userCollection, testJWTSecret, testUserInstance.email, testUserInstance.password)
loginUserResult

'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2ZhaXJzY2FwZS5uZXQvIiwic3ViIjoibWFsOGNoQHZpcmdpbmlhLmVkdSIsIm5hbWUiOiJNYXggTGV2aW5zb24iLCJlbWFpbCI6Im1hbDhjaEB2aXJnaW5pYS5lZHUiLCJpYXQiOjE3NDY0MzI1NTEsImV4cCI6MTc0NjUxODk1MX0.qJk1TDwlgc9FT7jJBXZ6htiR8aK6YFkrcWt0cRd5yHA'

In [336]:
# get user by session
foundUser = getUserBySession(userCollection, loginUserResult)
foundUser

UserWriteModel(email='mal8ch@virginia.edu', firstName='Max', lastName='Levinson', password='roaringtwenties', metadataType='Person', session='eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2ZhaXJzY2FwZS5uZXQvIiwic3ViIjoibWFsOGNoQHZpcmdpbmlhLmVkdSIsIm5hbWUiOiJNYXggTGV2aW5zb24iLCJlbWFpbCI6Im1hbDhjaEB2aXJnaW5pYS5lZHUiLCJpYXQiOjE3NDY0MzI1NTEsImV4cCI6MTc0NjUxODk1MX0.qJk1TDwlgc9FT7jJBXZ6htiR8aK6YFkrcWt0cRd5yHA', groups=[], datasets=[], software=[], computations=[], rocrates=[])

## Groups

In [161]:
class Group(BaseModel):
    name: str
    metadataType: Literal['Group'] = Field(default=Literal['Group'])
    members: List[str] = Field(default=[])

class GroupWriteModel(Group):
    owner: str

In [162]:
class FairscapeGroupRequest(FairscapeRequest):

    def createGroup(
        self, 
        user: UserWriteInstance, 
        groupInstance: Group
    ):
        pass

    def getGroup(self):
        pass

    def updateGroup(self):
        pass

In [163]:

def createGroup():
    pass

def getGroup():
    pass

## Dataset

In [140]:
import fastapi
from fairscape_models.dataset import Dataset
from enum import Enum
from typing import Union, Optional

In [141]:
class DatasetCreateModel(Dataset):
    guid: Optional[str] = Field(
        title="guid",
        alias="@id",
        default=None
    )
    metadataType: Optional[str] = Field(alias="@type")
    dateRegistered: Optional[datetime.datetime] = Field(default_factory=datetime.datetime.now)

class DistributionTypeEnum(str, Enum):
    MINIO = 'minio'
    URL = 'url'
    GLOBUS = 'globus'

class MinioDistribution(BaseModel):
    path: str

class URLDistribution(BaseModel):
    uri: str

class DatasetDistribution(BaseModel):
    distributionType: DistributionTypeEnum
    location: Union[MinioDistribution, URLDistribution]


class DatasetWriteModel(DatasetCreateModel):
    distribution: Optional[DatasetDistribution] = Field(default=None)
    published: bool = Field(default=True)
    permissions: Permissions

In [51]:
def setDatasetObjectKey(datasetFilename: str, userInstance: UserWriteModel, basePath: str = None):
    if basePath is None:
        contentName = pathlib.Path(datasetFile).name
        return f"{userInstance.email}/datasets/{contentName}"
    else:
        return f"{basePath}/{userInstance.email}/datasets/{contentName}"

In [52]:
def uploadObjectMinio(
    minioClient,
    minioBucket: str,
    minioKey: str,
    datasetFile: fastapi.UploadFile,
)-> DatasetDistribution:
    """ Upload a 
    """

    uploadResult = minioClient.upload_fileobj(
        Bucket=minioBucket,
        Key=minioKey,
        Fileobj=datasetFile
    )

    # create distribution for metadata
    distribution = DatasetDistribution.model_validate({
        "distributionType": DistributionTypeEnum.Minio,
            "location": {"path": minioKey}
        })

    return distribution

In [53]:
class FairscapeDatasetRequest(FairscapeRequest):
    
    def __init__(self, minioClient, minioBucket, identifierCollection, userCollection):
        super().__init__(minioClient, minioBucket, identifierCollection, userCollection)
         
    def getDatasetMetadata(
        self,
        datasetGUID: str
    ):
        foundMetadata = self.getMetadata(datasetGUID)
        if foundMetadata is None:
            raise Exception

        else:
            return DatasetWriteModel.model_validate({**foundMetadata})

    def getDatasetContent(
        self, 
        userInstance: UserWriteModel, 
        datasetGUID: str,
    ):
        datasetInstance = self.getDatasetMetadata(datasetGUID)

        # check that datasetInstance has minio distribtuion
        if datasetInstance.distribution.distributionType != DistributionTypeEnum.MINIO:
            raise Exception

        # get the distribution location from metadata
        objectKey = datasetInstance.distributionType.location.path

        response = self.minioClient.get_object(
            Bucket=self.minioBucket,
            Key=objectKey
        )
        return response
        
    def createDataset(
        self, 
        userInstance: UserWriteModel,
        inputDataset: Dataset,
        datasetContent: Optional[fastapi.UploadFile]=None
    ):
        # check if guid already exists
        foundMetadata = self.getMetadata(inputDataset.guid)

        if foundMetadata is not None:
            raise Exception('GUID Already assigned')
        
        # if no content is passed
        if datasetContent is None:
            
            # process dataset
            if 'http' in inputDataset.contentUrl:
                distribution = DatasetDistribution.model_validate({
                    "distributionType": "url",
                    "location": {"uri": inputDataset.contentUrl}
                    })
            if inputDataset.contentUrl is None:
               distribution = None 

        # upload dataset content to minio
        else:
            
            # determine object key
            uploadKey = setDatasetObjectKey(
                datasetContent.filename, 
                userInstance, 
                basePath= self.minioDefaultPath
            )
            
            # upload content and return a dataset distribution
            distribution = uploadObjectMinio(self.minioClient, self.minioBucket, uploadKey, datasetContent)

        # set remainder of metadata for storage
        permissionsSet = userInstance.getPermissions()
    
        outputDataset = DatasetWriteModel.model_validate({
            "permissions": permissionsSet, 
            "distribution": distribution, 
            **datasetModelInstance.model_dump(by_alias=True)})
    
        # insert identifier metadata into mongo
        insertResult= identifierCollection.insert_one(
            outputDataset.model_dump(by_alias=True)
        )
    
        
        # add identifier to users'dataset
        updateResult = userCollection.update_one(
            {"email": userInstance.email}, 
            {"$push": {"identifiers": inputDataset.guid}}
        )
        
            

## Dataset Tests

#### Metadata Only

In [27]:
fairscapeDatasetRequest = FairscapeDatasetRequest(
    minioClient=s3,
    minioBucket="default",
    identifierCollection=identifierCollection,
    userCollection=userCollection
)

In [28]:
# create a instance of fairscape_models.Dataset
datasetModelInstance = Dataset.model_validate(
    {
        "@id": "ark:59852/test-guid",
        "name": "test dataset",
        "@type": "https://w3id.org/EVI#Dataset",
        "author": "John Doe",
        "datePublished": "04-08-2025",
        "version": "0.1.0",
        "file": "csv",
        "description": "An example dataset",
        "keywords": [],
        "format": "csv",
        "dataSchema": None,
        "generatedBy": [],
        "derivedFrom": [],
        "usedByComputation": [],
        "contentUrl": "https://example.org/"
    }
)

#### Metadata and Data

In [34]:
# create a dataset
createDatasetResult = fairscapeDatasetRequest.createDataset(
    userInstance=foundUser,
    inputDataset=datasetModelInstance
)

Exception: GUID Already assigned

In [35]:
createDatasetResult

In [36]:
# get a dataset metadata record
getDatasetResult = fairscapeDatasetRequest.getDatasetMetadata(
    datasetGUID=datasetModelInstance.guid
)

In [37]:
getDatasetResult

DatasetWriteModel(guid='ark:59852/test-guid', name='test dataset', metadataType='https://w3id.org/EVI#Dataset', additionalType='Dataset', author='John Doe', datePublished='04-08-2025', version='0.1.0', description='An example dataset', keywords=[], associatedPublication=None, additionalDocumentation=None, fileFormat='csv', dataSchema=None, generatedBy=[], derivedFrom=[], usedByComputation=[], contentUrl='https://example.org/', dateRegistered=datetime.datetime(2025, 4, 24, 4, 53, 8, 914000), distribution=DatasetDistribution(distributionType=<DistributionTypeEnum.URL: 'url'>, location=URLDistribution(uri='https://example.org/')), published=True, permissions=Permissions(owner='mal8ch@virginia.edu', group=None), _id=ObjectId('6809fbf4493117aa0ac6505c'), metadataType='https://w3id.org/EVI#Dataset', file='csv', dataSchema=None)

In [38]:
# get dataset content
getDatasetContentResult = fairscapeDatasetRequest.getDatasetContent(
    datasetGUID=datasetModelInstance.guid,
    userInstance=foundUser
)

Exception: 

## Software

In [None]:
from fairscape_models.software import Software

class SoftwareWriteModel(Software):
    published: bool = Field(default=True)
    dateRegistered: Optional[datetime.datetime] = Field(default_factory=datetime.datetime.now)
    permissions: Permissions

In [None]:
class FairscapeSoftwareRequest(FairscapeRequest):

    def __init__(self, minioClient, minioBucket, identifierCollection, userCollection):
        super().__init__(minioClient, minioBucket, identifierCollection, userCollection)

    def createSoftware(self):
        pass

    def getSoftware(self):
        pass

    def deleteSoftware(self):
        pass

    def updateSoftware(self):
        pass

In [24]:
def createSoftware(
    identifierCollection: pymongo.collection.Collection,
    userCollection: pymongo.collection.Collection,
    softwareInstance: Software,
    userInstance: UserWriteModel
)->bool:

    # check if identifier exists
    foundMetadata = identifierCollection.find_one({"@id": softwareInstance.guid})
    if foundMetadata is not None:
        raise Exception
    
    # create write model
    permissionsInstance = userInstance.getPermissions()

    softwareWriteInstance = SoftwareWriteModel.model_validate(  
        {
        "permissions": permissionsInstance,
        "published": True,
        **softwareInstance.model_dump(by_alias=True)
        }
     )

    # update user collection
    updateUserResult = userCollection.update_one(
        {"email": userInstance.email}, 
        {"$push": {"software": softwareInstance.guid}}
    )

    # update identifier collection
    insertSoftwareResult = identifierCollection.insert_one(
        softwareWriteInstance.model_dump(by_alias=True)
    )

    return True

In [None]:
def getSoftware()->SoftwareWriteModel:
    pass

In [None]:
def updateSoftware():
    pass

## Computation

In [None]:
from fairscape_models.computation import Computation

class ComputationWriteModel(Computation):
    published: bool = Field(default=True)
    dateRegistered: Optional[datetime.datetime] = Field(default_factory=datetime.datetime.now)
    permissions: Permissions

In [None]:
def createComputation():
    pass

In [None]:
def getComputation():
    pass

In [None]:
def updateComputation():
    pass

In [None]:
def deleteComputation():
    pass

## Schema

In [None]:
from fairscape_models.schema import schema

## ROCrate

#### Old Implementation

In [337]:
# old implementation
class ROCrateUploadJob(BaseModel):
    userCN: str
    transactionFolder: str
    zippedCratePath: str
    timeStarted: datetime.datetime | None = Field(default=None)
    timeFinished: datetime.datetime | None = Field(default=None)
    progress: float = Field(default=0)
    stage: Optional[str] = Field(default='started')
    status: Optional[str] = Field(default='in progress')
    completed: Optional[bool] = Field(default=False)
    success: Optional[bool] = Field(default=False)
    processedFiles: List[str] = Field(default=[])
    identifiersMinted: List[str] = Field(default=[])
    error: str | None = Field(default=None)

In [338]:
def ProcessMetadata(roCrateMetadata, userCN: str, zipname: str):
    """ Function for processing metadata
    """ 
    crateArk = parseArk(roCrateMetadata["@id"])

    # Add distribution information if not present
    if 'distribution' not in roCrateMetadata:
        crate_name = Path(zipname).stem  # Get filename without extension
        object_path = f"{transactionFolder}/{crate_name}"

        roCrateMetadata['distribution'] = {
            "archivedROCrateBucket": fairscapeConfig.minio.rocrate_bucket,
            "archivedObjectPath": pathlib.Path(fairscapeConfig.minio.rocrate_bucket_path) / userCN / filePath / zipname
        }
        # set download link to https download link
        roCrateMetadata['contentURL'] = f"{fairscapeConfig.url}/rocrate/download/{rocrateGUID}" 

    roCrateMetadata['uploadDate'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    if crateArk is None:
        # TODO assign new identifiers
        pass
    else:
        roCrateMetadata["@id"] = crateArk

    # TODO reassign identifiers if there is conflict
    for crateElement in roCrateMetadata["@graph"]:
        elementArk = parseArk(crateElement["@id"])
        if elementArk is None:
            pass
        else:
            crateElement["@id"] = elementArk


In [339]:
def extractMetadata(transactionFolder, zippedMetadataKey):
    try:
        metadataResponse = s3.get_object(
            Bucket=fairscapeConfig.minio.default_bucket,
            Key=str(zippedMetadataKey),
        )

    # handle no such key error
    except s3.exceptions.NoSuchKey as err:
        backgroundTaskLogger.error(
            f"transaction: {str(transactionFolder)}" +
            "\tmessage: failed to read ro-crate-metadata.json from zipped ROCrate" + f"\terror: {str(minioException)}"
        )

        asyncCollection.update_one(
            {"transactionFolder": str(transactionFolder)},
            {"$set": 
                {
                    "completed": True,
                    "success": False,
                    "error": f"failed to read ro-crate-metadata.json from path {str(zippedMetadataKey)}",
                    "status": "Failed"
                }
            }
        )

    try:
        metadataContent = metadataResponse['Body'].read()
        metadataJSON = json.loads(metadataContent)

    # TODO more specific exceptions
    except:
        backgroundTaskLogger.error(
            f"transaction: {str(transactionFolder)}" +
            "\tmessage: failed to parse ro-crate-metadata.json from zipped ROCrate" + f"\terror: {str(minioException)}"
        )
        
        asyncCollection.update_one(
            {"transactionFolder": str(transactionFolder)},
            {"$set": 
                {
                    "completed": True,
                    "success": False,
                    "error": f"failed to parse ro-crate json",
                    "status": "Failed"
                }
            }
        )

        raise Exception

    asyncCollection.update_one(
        {"transactionFolder": str(transactionFolder)},
        {"$set": 
            {
                "stage": "extracting ro crate"
            }
        }
    )

    return metadataJSON

In [340]:
def AsyncRegisterROCrate(userCN: str, transactionFolder: str, filePath: str):
    """
    Background task for processing Zipped ROCrates.
    :param str userCN: Current User's CN uploading the ROCrate
    :param str transactionFolder: UUID folder representing the unique path in minio
    :param str filePath: The filename of the zipped crate contents
    """

    # connect to ldap and get user
    ldapConnection = fairscapeConfig.ldap.connectAdmin()
    currentUserLDAP = getUserByCN(ldapConnection, userCN)
    ldapConnection.unbind()


    # extract metadata from the zipped ROCrate 
    # construct the upload path
    uploadPath = pathlib.PurePosixPath(filePath) 
    
    zippedMetadataKey = uploadPath / uploadPath.stem / 'ro-crate-metadata.json'

    crateMetadata = extractMetadata(
        transactionFolder,
        zippedMetadataKey
    )

    # ------------------------------------
    #         Process Metadata 
    # -----------------------------------
    
    asyncCollection.update_one(
        {"transactionFolder": str(transactionFolder)},
        {"$set": 
            {
                "stage": "processing metadata"
            }
        }
    )

    metadata = ROCrateV1_2.model_validate(crateMetadata)


    #TODO: handle pydantic validation errors
    #except pydantic.ValidationError as err:
    #TODO: update transaction with metadata validation failure
        #validationErrors = err.errors() 
    
    # TODO reassign identifiers if there is conflict

    # TODO add default project for a user

    # TODO if no ROCrate ARK is assigned 
    # if crateMetadata.get("@id") is None:
    #    pass
    
    # clean identifiers
    metadata.cleanIdentifiers()
    crateElem = metadata.getCrateMetadata()
    crateGUID = crateElem.guid

    # Add distribution information if not present
    if crateMetadata.get('distribution') is None:
        zipUploadPath = uploadPath 
        crateMetadata['distribution'] = {
            "archivedROCrateBucket": fairscapeConfig.minio.rocrate_bucket,
            "archivedObjectPath": str(zipUploadPath)
        }

        # TODO set content URL
        # set download link to https download link
        crateMetadata['contentUrl'] = f"{fairscapeConfig.url}/rocrate/download/{crateGUID}" 

    crateMetadata['uploadDate'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")


    for crateElement in crateMetadata["@graph"]:
        #if elementArk is None:
        #    pass
        elementArk = parseArk(crateElement["@id"])
        crateElement['@id'] = elementArk
        crateElement['isPartOf'] = {"@id": crateGUID}


    # upload extracted files to datasets 
    # filter out all datasets
    crateDatasets = filter(
        lambda crateElem: crateElem.get("@type") == "EVI:Dataset" and  crateElem.get("contentUrl") is not None,
        crateMetadata['@graph']
        )

    for datasetElem in crateDatasets:

        # file to read from within the zipfile 
        contentURL = datasetElem['contentUrl']
        sourcePath = pathlib.Path(contentURL.lstrip('file:///'))


        # compute relative crate path
        # this will give elements a path in minio preserving the structure of the rocrate
        # files will have relative path similar to
        #  {ro_crate_name} / *** / {filename}
        # starting from the deepest element breaking at the level of the rocrate
        folderNames= str(sourcePath).split('/')
        folderNames.reverse()

        relativePath = pathlib.Path(folderNames[0])

        for nested in folderNames[1::]:
            relativePath = nested / relativePath
            if nested == uploadPath.stem:
                break

        zippedFilePath = uploadPath / relativePath


        #backgroundTaskLogger.info(                
        #    f"transaction: {str(transactionFolder)}" +
        #    f"\tuploadPath: {str(uploadPath)}" +
        #    f"\ttmpFilePath: {str(tmpFilePath)}" +
        #    "\tmessage: uploading dataset" 
        #    )

        # add distribution to metadata to enable download enpoints
        datasetElem['distribution'] = {
            'distributionType': 'minio',
            'location': {
                'path': str(zippedFilePath)
            }
        }

        # TODO check that dataset enpoint works and is accurate
        # set download link relative to fairscape
        datasetElem['contentUrl']=f"{fairscapeConfig.url}/dataset/download/{datasetElem['@id']}"


    # --------------------------- 
    #    PUBLISH METADATA
    # --------------------------

    asyncCollection.update_one(
        {"transactionFolder": transactionFolder},
        {"$set": 
            {
                "stage": "publishing metadata"
            }
        }
    )

    # Check if @id already exsists
    #rocrateFound = rocrateCollection.find_one(
    #        {"@id": crateMetadata['@id']}
    #        )

    #if rocrateFound:
    #    raise ROCrateException(
    #        f"ROCrate with @id == {crateMetadata['@id']} found", None)
    

    # set default permissions for uploaded crate
    crateMetadata['permissions'] = {
            "owner": currentUserLDAP.dn,
            "group": currentUserLDAP.memberOf[0]
            }

    # set default permissions for all datasets
    for crateElem in crateMetadata['@graph']:
        # set permissions on all rocrate identifiers
        crateElem['permissions'] = {
            "owner": currentUserLDAP.dn,
            "group": currentUserLDAP.memberOf[0]
            }

    # for every element in the rocrate model dump json
    insertMetadata = [ elem for elem in crateMetadata.get("@graph", [])]
    # insert rocrate json into identifier collection
    insertMetadata.append(crateMetadata)

    insertedIdentifiers = [ elem.get("@id") for elem in insertMetadata]

    # TODO check for already existing identifiers and mark as conflicts
    #     - should reassign identifiers at metadata processing stage

    # insert all identifiers into the identifier collection
    insertResult = identifierCollection.insert_many(insertMetadata)

    if len(insertResult.inserted_ids) != len(insertMetadata):
        # raise an exception
        backgroundTaskLogger.error(
            f"transaction: {str(transactionFolder)}" +
            "\tmessage: error uploading provenance identifiers" 
        )
        raise Exception(f"Error Minting Provenance Identifiers")

    else:
        # log success
        backgroundTaskLogger.info(
            f"transaction: {str(transactionFolder)}" +
            "\tmessage: minted provenance identifiers" +
            f"\tcrateGUID: {crateGUID}" +
            f"\tnumberOfIdentifiers: {len(insertResult.inserted_ids)}"
        )

    # insert rocrate result 
    insertResult = rocrateCollection.insert_one(crateMetadata)

    if insertResult.inserted_id is None:
        backgroundTaskLogger.error(
            f"transaction: {str(transactionFolder)}" +
            "\tmessage: error uploading rocrate identifier"  +
            f"\tcrateGUID: {crateGUID}"
        )
        raise Exception(f"Error Minting Provenance Identifiers")

    else:
        backgroundTaskLogger.info(
            f"transaction: {str(transactionFolder)}" +
            "\tmessage: minted rocrate identifiers" +
            f"\tcrateGUID: {crateGUID}" 
        )

    # update the job as success 
    # upsert because some jobs can't find their metadata
    asyncCollection.update_one(
        {"transactionFolder": str(transactionFolder)},
        {"$set": 
            {
                "userCN": userCN,
                "transactionFolder": str(transactionFolder),
                "zippedCratePath": filePath,
                "completed": True,
                "success": True,
                "status": "finished",
                "stage": "completed all tasks successfully"
            }
        },
        upsert=True
    )

    # ---------------------
    # Delete Transaction
    # ---------------------
    return True

### Helper Functions For ROCrate Processing

In [376]:
def checkPermissions(permissionsInstance: Permissions, requestingUser: UserWriteModel)-> bool:
    if uploadInstance.permissions.owner == requestingUser.email:
        return True
    elif uploadInstance.permissions.group:
        if uploadInstance.permissions.group in requestingUser.groups:
            return True
    else:
        return False

In [344]:
def getROCrateMetadata(s3Client, bucketName, uploadInstance):
    zippedCratePath = pathlib.Path(uploadInstance.uploadPath)
    zippedCrateMetadataPath = zippedCratePath / zippedCratePath.stem / 'ro-crate-metadata.json'
    
    # get the values for the zipped metadata
    try:
        s3Response = s3Client.get_object(
            Bucket=bucketName,
            Key=str(zippedCrateMetadataPath)
        )

        # get the metadata out of the s3 response
        content= s3Response['Body'].read()
        roCrateJSON = json.loads(content)
        return roCrateJSON
    
    except s3Client.exceptions.NoSuchKey as err:
        # TODO handle error
        print("No Key Found")
        print(err)

In [345]:
def userPath(inputEmail):
    searchResults = re.search("(^[a-zA-Z0-9_.+-]+)@", inputEmail) 

    if searchResults is None:
        raise Exception
    else:
        return searchResults.group(1)

In [346]:
# set remainder of metadata for storage
def writeDatasets(
    identifierCollection, 
    userInstance: UserWriteModel, 
    rocrateInstance, 
    objectList
):
    """ Write ROCrate metadata to identifier collection for all dataset elements.

    Args:
        identifierCollection (pymongo.synchronous.collection.Collection): Collection to insert Identifier metadata
        userInstance (UserWriteModel): User Record for the user inserting the metadata
        rocrateInstance (fairscape_models.rocrate.ROCratev1_2): ROCrate Metadata as a pydantic model
        objectList (List[dict]): Content for the Zipped ROCrate from the s3 object_list_v1 call 

    Returns:
        List[str]: List of All Dataset Identifiers Minted
    """
    
    datasetWriteList = []

    # TODO: set to userInstance for method
    permissionsSet = userInstance.getPermissions()

    for datasetElem in rocrateInstance.getDatasets():

        # TODO: handle when remote content is included in ROCrate
        #if 'http' in datasetElem.contentUrl:
        #    continue
            
        # match the metadata path to content
        datasetCratePath = datasetElem.contentUrl.lstrip('file:///')
        
        # filter function to match content url to key
        matchedElementList = list(
            filter(
            lambda x: datasetCratePath in x.get('Key'),
            objectList
            )
        )

        if len(matchedElementList)>0:
            matchedElement = matchedElementList[0]
        else:
            print(f"ContentNotFound: {datasetElem.guid}\tPath: {datasetElem.contentUrl}")
            continue
    
         # create metadata record to insert 
        objectSize = matchedElement.get('Size')
        objectPath = matchedElement.get('Key').lstrip(minioDefaultBucket + '/')
        
        # create distribution for metadata
        distribution = DatasetDistribution.model_validate({
            "distributionType": 'minio',
            "location": {"path": objectPath}
            })
        
        outputDataset = DatasetWriteModel.model_validate({
            **datasetElem.model_dump(by_alias=True),
            "permissions": permissionsSet, 
            "distribution": distribution,
            "size": objectSize,
            "isPartOf": {
                "@id": crateGUID,
            },
            
        })

        # insert all identifiers for datasets
        insertResult = identifierCollection.insert_one(
            outputDataset.model_dump(by_alias=True, mode='json')
        )

        # TODO: check insertResult for success

        # append guid to dataset list
        datasetWriteList.append(outputDataset.guid)
    
    return datasetWriteList

In [359]:
def writeMetadataElements(
    identifierCollection,
    userInstance,
    rocrateInstance
):
    """ Write ROCrate metadata for all elements excluding datasets

    Args:
        identifierCollection (pymongo.synchronous.collection.Collection): Collection to insert Identifier metadata
        userInstance (UserWriteModel): User Record for the user inserting the metadata
        rocrateInstance (fairscape_models.rocrate.ROCratev1_2): ROCrate Metadata as a pydantic model

    Returns:
        List[str]: List of all ARKs minted
    """
    # mint software and computation and biochem entity
    rocrateMetadataElements = rocrateInstance.getSoftware() + rocrateInstance.getComputations() + rocrateInstance.getSchemas() + \
        rocrateInstance.getBioChemEntities() + rocrateInstance.getMedicalConditions()

    userPermissions = userInstance.getPermissions()
    crateGUID = rocrateInstance.getCrateMetadata().guid
    
    # written identifiers
    guidList = []

    # mint all metadata elements
    for metadataModel in rocrateMetadataElements:

        insertDocument = {
            **metadataModel.model_dump(by_alias=True, mode='json'),
            'permissions': userPermissions.model_dump(mode='json'),
            'isPartOf': {"@id": crateGUID}
        }
        
        insertResult = identifierCollection.insert_one(
           insertDocument
        )

        # TODO check insertResult
        
        guidList.append(metadataModel.guid)
        

    
    return guidList

In [150]:
# helper function to get all contents inside a zipped crate when there are more than 1k objects
def getROCrateContentsMinio(s3Client, bucketName, zipCratePath: str):
    # list entire subdirectory for rocrate upload
    listObjects = s3Client.list_objects_v2(
        Bucket= bucketName,
        Prefix= str(zipCratePath) + '/'
    )

    objectList = listObjects['Contents']

    isTruncated = listObjects.get('IsTruncated')
    nextContinueToken = listObjects.get('NextContinuationToken')

    while isTruncated:

        listObjects = s3Client.list_objects_v2(
            Bucket = bucketName,
            Prefix = str(zippedCratePath) + '/',
            ContinuationToken = nextContinueToken
        )
        nextContinueToken = listObjects.get('NextContinuationToken')
        isTruncated = listObjects.get('IsTruncated')
        objectList= objectList + listObjects['Contents']

    return objectList

### Refactor

In [348]:
import re
import pathlib
import uuid
import json

from fairscape_models.rocrate import ROCrateV1_2, ROCrateMetadataElem
from fairscape_models.dataset import Dataset

In [418]:
class ROCrateUploadRequest(BaseModel):
    guid: str
    permissions: Permissions
    uploadPath: str
    timeStarted: Optional[datetime.datetime] = Field(default_factory=datetime.datetime.now)
    timeFinished: Optional[datetime.datetime] = Field(default=None)
    completed: Optional[bool] = Field(default=False)
    error: Optional[str] = Field(default=None)
    identifiersMinted: Optional[int] = Field(default=None)
    rocrateIdentifier: Optional[str] = Field(default=None)
    

In [350]:
class ROCrateMetadataElemWrite(ROCrateMetadataElem):
    permissions: Permissions
    hasPart: Optional[List[dict]]
    distribution: Optional[DatasetDistribution]

In [79]:
userPath('mal8ch@virginia.edu')

'mal8ch'

In [429]:
class FairscapeROCrateRequest(FairscapeRequest):

    def uploadROCrate(self, userInstance: UserWriteModel, rocrate):
        # TODO replace pathlib with file
        
        # set upload path
        rocrateFilename = rocrateZip.name

        # get email path
        userEmailPath = userPath(foundUser.email)
        
        uploadPath = f"{self.minioDefaultPath}/{userEmailPath}/rocrates/{rocrateFilename}"

        # upload zip to minio
        # TODO switch with fastapi.UploadFile
        with rocrateZip.open('rb') as zippedFileObj:
            uploadOperationResult = self.minioClient.upload_fileobj(
                Bucket = self.minioBucket,
                Key = str(uploadPath),
                Fileobj = zippedFileObj,
                ExtraArgs = {'ContentType': 'application/zip'}
            )

        transactionGUID = uuid.uuid4()
        
        uploadRequestInstance = ROCrateUploadRequest.model_validate({
            "guid": str(transactionGUID),
            "permissions": userInstance.getPermissions(),
            "uploadPath": str(uploadPath)
        })
        # create record in the async collection
        uploadRequestMetadata = uploadRequestInstance.model_dump()
        insertResult = self.asyncCollection.insert_one(uploadRequestMetadata)

        # return a response
        response = FairscapeResponse(success=True, statusCode=200, model=uploadRequestInstance)
        return response
    
    def processROcrate(self, transactionGUID: str):
        # get the current rocrate upload job
        uploadMetadata = asyncCollection.find_one({"guid": transactionGUID})

        if uploadMetadata is None:
            raise Exception

        uploadInstance = ROCrateUploadRequest.model_validate(uploadMetadata)
        
        # find the user from the 
        userMetadata = self.userCollection.find_one(
            {"email": uploadInstance.permissions.owner }
        )

        foundUser = UserWriteModel.model_validate(userMetadata)

        roCrateMetadata = getROCrateMetadata(self.minioClient, self.minioBucket, uploadInstance)
        
        # parse the metadata into the rocrate
        try:
            roCrateModel = ROCrateV1_2.model_validate(roCrateJSON)
        
        except ValidationError as validationErr:
            # TODO return an error
            print("ValidationError")
            return None

        # get rocrate GUID
        crateMetadata = roCrateModel.getCrateMetadata()

        # get list of the objects from minio
        objectList = getROCrateContentsMinio(s3, minioDefaultBucket, zippedCratePath)

        # write dataset records
        datasetGUIDS = writeDatasets(
            self.identifierCollection, 
            foundUser, 
            roCrateModel, 
            objectList
        )

        # write metadata elements
        nonDatasetGUIDS = writeMetadataElements(
            self.identifierCollection,
            foundUser,
            roCrateModel
        )


        # write the metadata elem as an identifier to identifierCollection and ROCrate
        metadataElem = roCrateModel.getCrateMetadata()
        
        roCrateDistribution = DatasetDistribution.model_validate({
            "distributionType": 'minio',
                "location": {"path": uploadInstance.uploadPath}
                })
            
        rocrateMetadataElem = ROCrateMetadataElemWrite.model_validate({
            **metadataElem.model_dump(by_alias=True),
            "permissions": permissionsSet, 
            "distribution": roCrateDistribution,
            "hasPart": [{"@id": elem} for elem in nonDatasetGUIDS + datasetGUIDS],
            })

        rocrateMetadataWrite = rocrateMetadataElem.model_dump(by_alias=True, mode='json')
        
        # dump into identifier collection and rocrate collection
        self.identifierCollection.insert_one(rocrateMetadataWrite)
        self.rocrateCollection.insert_one(rocrateMetadataWrite)
        
        # update process as success
        updateResult = self.asyncCollection.update_one(
            {"guid": uploadInstance.guid},
            {"$set": {
                "completed": True,
                "identifiersMinted": len(datasetGUIDS + nonDatasetGUIDS)+1,
                "rocrateIdentifier": metadataElem.guid,
                "timeFinished": datetime.datetime.now()
            }}
        )

        #TODO: check update result
        return datasetGUIDS + nonDatasetGUIDS

        
    def getUploadMetadata(self, requestingUser: UserWriteModel, transactionGUID: str):
        # get upload metadata
        uploadMetadata = self.asyncCollection.find_one({
            "guid": transactionGUID
        })

        if uploadMetadata is None:
            return FairscapeResponse(
                success=False,
                statusCode=404,
                error={"message": "upload request not found"}
                )
        
        uploadInstance = ROCrateUploadRequest.model_validate(uploadMetadata)

        # check that user has permission to view upload request
        if checkPermissions(uploadInstance.permissions, requestingUser):
            return FairscapeResponse(
                model=uploadInstance,
                success=True,
                statusCode=200
            )
        else:
            return FairscapeResponse(
                success=False,
                statusCode=401,
                error={"message": "user unauthorized to view upload status"}
            )

    def getROCrateMetadata(self, rocrateGUID: str):
        rocrateMetadata = self.rocrateCollection.find_one({
            "@id": rocrateGUID
        })
        
        # if no metadata is found return 404
        if not rocrateMetadata:
            return FairscapeResponse(
                success=False,
                statusCode=404,
                error={"message": "rocrate not found"}
            )
        else:
            rocrateModel = ROCrateMetadataElemWrite.model_validate(rocrateMetadata)
            return FairscapeResponse(
                success=True,
                model=rocrateModel,
                statusCode=200
            )
        
    
    def downloadROCrateArchive(self, requestingUser: UserWriteModel, rocrateGUID: str):

        rocrateMetadata = self.identifierCollection.find_one({
            "@id": rocrateGUID
        })

        # if no metadata is found return 404
        if not rocrateMetadata:
            return FairscapeResponse(
                success=False,
                statusCode=404,
                error={"message": "rocrate not found"}
            )

        # TODO handle metadata failures
        rocrateInstance = ROCrateMetadataElemWrite.model_validate(rocrateMetadata)


        if not checkPermissions(rocrateInstance.permissions, requestingUser):
            return FairscapeResponse(
                success=False,
                statusCode=401,
                error={"message": "user unauthorized to download rocrate archive"}
            )

        # get the object from s3
        # TODO handle key missing error
        objectResponse = self.minioClient.get_object(
            Bucket=self.minioBucket,
            Key=rocrateInstance.distribution.location.path
        )

        # create a FairscapeResponse with a fileResponse item
        return FairscapeResponse(
            success=True,
            statusCode=200,
            fileResponse=objectResponse.get('Body')
        )

        

### ROCrate Tests

#### ROCrate Upload

In [430]:
fairscapeCrateUpload = FairscapeROCrateRequest(
    minioClient=s3,
    minioBucket=minioDefaultBucket,
    identifierCollection=identifierCollection,
    userCollection=userCollection,
    asyncCollection=asyncCollection,
    rocrateCollection=rocrateCollection
)

In [407]:
# choose ro-crate upload
rocrateZip = pathlib.Path("paclitaxel/paclitaxel.zip")

upload = fairscapeCrateUpload.uploadROCrate(
    userInstance=foundUser,
    rocrate=rocrateZip
)


In [408]:
upload.model

ROCrateUploadRequest(guid='48feebf3-ca08-4612-9b7a-d78318f5cc8c', permissions=Permissions(owner='mal8ch@virginia.edu', group=None), uploadPath='fairscape/mal8ch/rocrates/paclitaxel.zip', timeStarted=datetime.datetime(2025, 5, 6, 6, 16, 27, 944532), timeFinished=None, completed=False, error=None, identifiersMinted=[])

#### Test Processing

In [409]:
identifierCollection.delete_many({})
rocrateCollection.delete_many({})

DeleteResult({'n': 1, 'ok': 1.0}, acknowledged=True)

In [410]:
minted_identifiers = fairscapeCrateUpload.processROcrate(upload.model.guid)

In [412]:
upload.model.guid

'48feebf3-ca08-4612-9b7a-d78318f5cc8c'

In [421]:
# get the updated status
getUploadStatusResponse = fairscapeCrateUpload.getUploadMetadata(
    foundUser,
    upload.model.guid
)

uploadResult = getUploadStatusResponse.model

assert getUploadStatusResponse.success
assert not getUploadStatusResponse.error
assert isinstance(getUploadStatusResponse.model, ROCrateUploadRequest)

AttributeError: 'ROCrateUploadRequest' object has no attribute 'getDatasets'

In [422]:
assert uploadResult.completed
uploadResult.timeFinished - uploadResult.timeStarted

datetime.timedelta(seconds=75, microseconds=443000)

In [433]:
# get the ROCrate metadata
getMetadataReponse = fairscapeCrateUpload.getROCrateMetadata(uploadResult.rocrateIdentifier)


assert getMetadataReponse.success
assert isinstance(getMetadataReponse.model, ROCrateMetadataElemWrite)

In [435]:
getMetadataReponse.model

ROCrateMetadataElemWrite(guid='ark:59852/rocrate-paclitaxel-if-data-release', metadataType=['Dataset', 'https://w3id.org/EVI#ROCrate'], name='Paclitaxel IF Images', description='This data set displays the spatial localization of 464 proteins of interest in cells of the breast cancer cell line MDA-MB-468 treated with paclitaxel as imaged by immunofluorescence-based staining (ICC-IF) and confocal microscopy in the Lundberg Lab at Stanford University, as part of the Cell Maps for Artificial Intelligence (CM4AI; CM4AI.org) project. Nuclei were stained with DAPI (blue channel); endoplasmic reticulum with a calreticulin antibody (yellow channel); microtubules with tubulin antibody (red channel); and antibody against protein of interest (green channel). \n\nThis data is Copyright (c) 2025 The Board of Trustees of the Leland Stanford Junior University. It is licensed for reuse under Creative Commons Attribution ShareAlike NonCommercial 4.0 International License (https://creativecommons.org/lic

In [431]:
# download the ROCrate archive
with open("test.zip", "wb") as outputFile:
    getArchiveResponse = fairscapeCrateUpload.downloadROCrateArchive(foundUser, uploadResult.rocrateIdentifier)

    if getArchiveResponse.success:
        outputFile.write(getArchiveResponse.fileResponse.read())

In [None]:
# try to get as unauthorized user

In [None]:
# try to get as user within the same group

In [374]:
getUploadStatusResponse.error

{'message': 'user unauthorized to view upload status'}

#### ROCrate TODO

- outputOfROCrate
- prov Relations between ROCrate

## Resolver

In [64]:
EVIDatasetType = "https://w3id.org/EVI#Dataset"
EVIComputationType = "https://w3id.org/EVI#Computation"
EVISoftwareType = "https://w3id.org/EVI#Software"

def getElemByID(identifierCollection: pymongo.collection.Collection, guid: str):

    foundMetadata = identifierCollection.find_one({"@id": guid})

    if foundMetadata is None:
        return None

    match foundMetadata['@type']:
        case "https://w3id.org/EVI#Dataset":
            return DatasetWriteModel.model_validate(foundMetadata)

        case "https://w3id.org/EVI#Computation":
            return SoftwareWriteModel.model_validate(foundMetadata)

        case "https://w3id.org/EVI#Software":
            return SoftwareWriteModel.model_validate(foundMetadata)

## Evidence Graph

## Reasoninng

### Miscelaneous

In [74]:
# correct format for all image elements
with open('paclitaxel/paclitaxel/ro-crate-metadata.json', 'r') as rocrateMetadataFile:
    crateJSON = json.load(rocrateMetadataFile)

element_list = list(filter(lambda x: x.get('@type') == 'EVI:Dataset' and x.get('format') is None, crateJSON['@graph']))
for elem in element_list:
    elem['format'] = 'image/jpeg'

    
with open('paclitaxel/paclitaxel/ro-crate-metadata.json', 'w') as rocrateMetadataFile:
    json.dump(crateJSON, rocrateMetadataFile)