diff --git a/infra/feast-operator/test/e2e_rhoai/e2e_suite_test.go b/infra/feast-operator/test/e2e_rhoai/e2e_suite_test.go new file mode 100644 index 00000000000..86750f36e4f --- /dev/null +++ b/infra/feast-operator/test/e2e_rhoai/e2e_suite_test.go @@ -0,0 +1,32 @@ +/* +Copyright 2025 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2erhoai + +import ( + "fmt" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// Run e2e feast Notebook tests using the Ginkgo runner. +func TestNotebookRunE2E(t *testing.T) { + RegisterFailHandler(Fail) + _, _ = fmt.Fprintf(GinkgoWriter, "Feast Jupyter Notebook Test suite\n") + RunSpecs(t, "e2erhoai Feast Notebook test suite") +} diff --git a/infra/feast-operator/test/e2e_rhoai/feast_wb_test.go b/infra/feast-operator/test/e2e_rhoai/feast_wb_test.go new file mode 100644 index 00000000000..64bb1f2dea4 --- /dev/null +++ b/infra/feast-operator/test/e2e_rhoai/feast_wb_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2025 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package e2erhoai provides end-to-end (E2E) test coverage for Feast integration with +// Red Hat OpenShift AI (RHOAI) environments. This specific test validates the functionality +// of executing a Feast Jupyter notebook within a fully configured OpenShift namespace +package e2erhoai + +import ( + "fmt" + "os" + "os/exec" + "strings" + + utils "github.com/feast-dev/feast/infra/feast-operator/test/utils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Feast Jupyter Notebook Testing", Ordered, func() { + const ( + namespace = "test-ns-feast-wb" + configMapName = "feast-wb-cm" + rolebindingName = "rb-feast-test" + notebookFile = "test/e2e_rhoai/resources/feast-test.ipynb" + pvcFile = "test/e2e_rhoai/resources/pvc.yaml" + notebookPVC = "jupyterhub-nb-kube-3aadmin-pvc" + testDir = "/test/e2e_rhoai" + notebookName = "feast-test.ipynb" + feastMilvusTest = "TestFeastMilvusNotebook" + ) + + BeforeAll(func() { + By(fmt.Sprintf("Creating test namespace: %s", namespace)) + Expect(utils.CreateNamespace(namespace, testDir)).To(Succeed()) + fmt.Printf("Namespace %s created successfully\n", namespace) + }) + + AfterAll(func() { + By(fmt.Sprintf("Deleting test namespace: %s", namespace)) + Expect(utils.DeleteNamespace(namespace, testDir)).To(Succeed()) + fmt.Printf("Namespace %s deleted successfully\n", namespace) + }) + + runNotebookTest := func() { + env := func(key string) string { + val, _ := os.LookupEnv(key) + return val + } + + username := utils.GetOCUser(testDir) + + // set namespace context + By(fmt.Sprintf("Setting namespace context to : %s", namespace)) + cmd := exec.Command("kubectl", "config", "set-context", "--current", "--namespace", namespace) + output, err := utils.Run(cmd, "/test/e2e_rhoai") + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf( + "Failed to set namespace context to %s.\nError: %v\nOutput: %s\n", + namespace, err, output, + )) + fmt.Printf("Successfully set namespace context to: %s\n", namespace) + + // create config map + By(fmt.Sprintf("Creating Config map: %s", configMapName)) + cmd = exec.Command("kubectl", "create", "configmap", configMapName, "--from-file="+notebookFile, "--from-file=test/e2e_rhoai/resources/feature_repo") + output, err = utils.Run(cmd, "/test/e2e_rhoai") + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf( + "Failed to create ConfigMap %s.\nError: %v\nOutput: %s\n", + configMapName, err, output, + )) + fmt.Printf("ConfigMap %s created successfully\n", configMapName) + + // create pvc + By(fmt.Sprintf("Creating Persistent volume claim: %s", notebookPVC)) + cmd = exec.Command("kubectl", "apply", "-f", "test/e2e_rhoai/resources/pvc.yaml") + _, err = utils.Run(cmd, "/test/e2e_rhoai") + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + fmt.Printf("Persistent Volume Claim %s created successfully", notebookPVC) + + // create rolebinding + By(fmt.Sprintf("Creating rolebinding %s for the user", rolebindingName)) + cmd = exec.Command("kubectl", "create", "rolebinding", rolebindingName, "-n", namespace, "--role=admin", "--user="+username) + _, err = utils.Run(cmd, "/test/e2e_rhoai") + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + fmt.Printf("Created rolebinding %s successfully\n", rolebindingName) + + // configure papermill notebook command execution + command := []string{ + "/bin/sh", + "-c", + fmt.Sprintf( + "pip install papermill && "+ + "mkdir -p /opt/app-root/src/feature_repo && "+ + "cp -rL /opt/app-root/notebooks/* /opt/app-root/src/feature_repo/ && "+ + "oc login --token=%s --server=%s --insecure-skip-tls-verify=true && "+ + "(papermill /opt/app-root/notebooks/%s /opt/app-root/src/output.ipynb --kernel python3 && "+ + "echo '✅ Notebook executed successfully' || "+ + "(echo '❌ Notebook execution failed' && "+ + "cp /opt/app-root/src/output.ipynb /opt/app-root/src/failed_output.ipynb && "+ + "echo '📄 Copied failed notebook to failed_output.ipynb')) && "+ + "jupyter nbconvert --to notebook --stdout /opt/app-root/src/output.ipynb || echo '⚠️ nbconvert failed' && "+ + "sleep 100; exit 0", + utils.GetOCToken("test/e2e_rhoai"), + utils.GetOCServer("test/e2e_rhoai"), + "feast-test.ipynb", + ), + } + + // Defining notebook parameters + nbParams := utils.NotebookTemplateParams{ + Namespace: namespace, + IngressDomain: utils.GetIngressDomain(testDir), + OpenDataHubNamespace: env("APPLICATIONS_NAMESPACE"), + NotebookImage: env("NOTEBOOK_IMAGE"), + NotebookConfigMapName: configMapName, + NotebookPVC: notebookPVC, + Username: username, + OC_TOKEN: utils.GetOCToken(testDir), + OC_SERVER: utils.GetOCServer(testDir), + NotebookFile: notebookName, + Command: "[\"" + strings.Join(command, "\",\"") + "\"]", + PipIndexUrl: env("PIP_INDEX_URL"), + PipTrustedHost: env("PIP_TRUSTED_HOST"), + FeastVerison: env("FEAST_VERSION"), + OpenAIAPIKey: env("OPENAI_API_KEY"), + } + + By("Creating Jupyter Notebook") + Expect(utils.CreateNotebook(nbParams)).To(Succeed(), "Failed to create notebook") + + By("Monitoring notebook logs") + Expect(utils.MonitorNotebookPod(namespace, "jupyter-nb-", notebookName)).To(Succeed(), "Notebook execution failed") + } + + Context("Feast Jupyter Notebook Test", func() { + It("Should create and run a "+feastMilvusTest+" successfully", runNotebookTest) + }) +}) diff --git a/infra/feast-operator/test/e2e_rhoai/resources/custom-nb.yaml b/infra/feast-operator/test/e2e_rhoai/resources/custom-nb.yaml new file mode 100644 index 00000000000..8c91cdc5f34 --- /dev/null +++ b/infra/feast-operator/test/e2e_rhoai/resources/custom-nb.yaml @@ -0,0 +1,157 @@ +# This template maybe used to spin up a custom notebook image +# i.e.: sed s/{{.IngressDomain}}/$(oc get ingresses.config/cluster -o jsonpath={.spec.domain})/g tests/resources/custom-nb.template | oc apply -f - +# resources generated: +# pod/jupyter-nb-kube-3aadmin-0 +# service/jupyter-nb-kube-3aadmin +# route.route.openshift.io/jupyter-nb-kube-3aadmin (jupyter-nb-kube-3aadmin-opendatahub.apps.tedbig412.cp.fyre.ibm.com) +# service/jupyter-nb-kube-3aadmin-tls +apiVersion: kubeflow.org/v1 +kind: Notebook +metadata: + annotations: + notebooks.opendatahub.io/inject-oauth: "true" + notebooks.opendatahub.io/last-size-selection: Small + notebooks.opendatahub.io/oauth-logout-url: https://odh-dashboard-{{.OpenDataHubNamespace}}.{{.IngressDomain}}/notebookController/kube-3aadmin/home + opendatahub.io/link: https://jupyter-nb-kube-3aadmin-{{.Namespace}}.{{.IngressDomain}}/notebook/{{.Namespace}}/jupyter-nb-kube-3aadmin + opendatahub.io/username: {{.Username}} + generation: 1 + labels: + app: jupyter-nb-kube-3aadmin + opendatahub.io/dashboard: "true" + opendatahub.io/odh-managed: "true" + opendatahub.io/user: {{.Username}} + name: jupyter-nb-kube-3aadmin + namespace: {{.Namespace}} +spec: + template: + spec: + affinity: + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - preference: + matchExpressions: + - key: nvidia.com/gpu.present + operator: NotIn + values: + - "true" + weight: 1 + containers: + - env: + - name: NOTEBOOK_ARGS + value: |- + --ServerApp.port=8888 + --ServerApp.token='' + --ServerApp.password='' + --ServerApp.base_url=/notebook/test-feast-wb/jupyter-nb-kube-3aadmin + --ServerApp.quit_button=False + --ServerApp.tornado_settings={"user":"{{.Username}}","hub_host":"https://odh-dashboard-{{.OpenDataHubNamespace}}.{{.IngressDomain}}","hub_prefix":"/notebookController/{{.Username}}"} + - name: JUPYTER_IMAGE + value: {{.NotebookImage}} + - name: JUPYTER_NOTEBOOK_PORT + value: "8888" + - name: PIP_INDEX_URL + value: {{.PipIndexUrl}} + - name: PIP_TRUSTED_HOST + value: {{.PipTrustedHost}} + - name: FEAST_VERSION + value: {{.FeastVerison}} + - name: OPENAI_API_KEY + value: {{.OpenAIAPIKey}} + image: {{.NotebookImage}} + command: {{.Command}} + imagePullPolicy: Always + name: jupyter-nb-kube-3aadmin + ports: + - containerPort: 8888 + name: notebook-port + protocol: TCP + resources: + limits: + cpu: "2" + memory: 3Gi + requests: + cpu: "1" + memory: 3Gi + volumeMounts: + - mountPath: /opt/app-root/src + name: jupyterhub-nb-kube-3aadmin-pvc + - mountPath: /opt/app-root/notebooks + name: {{.NotebookConfigMapName}} + workingDir: /opt/app-root/src + - args: + - --provider=openshift + - --https-address=:8443 + - --http-address= + - --openshift-service-account=jupyter-nb-kube-3aadmin + - --cookie-secret-file=/etc/oauth/config/cookie_secret + - --cookie-expire=24h0m0s + - --tls-cert=/etc/tls/private/tls.crt + - --tls-key=/etc/tls/private/tls.key + - --upstream=http://localhost:8888 + - --upstream-ca=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt + - --skip-auth-regex=^(?:/notebook/test-feast-wb/jupyter-nb-kube-3aadmin)?/api$ + - --email-domain=* + - --skip-provider-button + - --openshift-sar={"verb":"get","resource":"notebooks","resourceAPIGroup":"kubeflow.org","resourceName":"jupyter-nb-kube-3aadmin","namespace":$(NAMESPACE)} + - --logout-url=https://odh-dashboard-{{.OpenDataHubNamespace}}.{{.IngressDomain}}/notebookController/kube-3aadmin/home + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + image: registry.redhat.io/openshift4/ose-oauth-proxy:v4.10 + imagePullPolicy: Always + livenessProbe: + failureThreshold: 3 + httpGet: + path: /oauth/healthz + port: oauth-proxy + scheme: HTTPS + initialDelaySeconds: 30 + periodSeconds: 5 + successThreshold: 1 + timeoutSeconds: 1 + name: oauth-proxy + ports: + - containerPort: 8443 + name: oauth-proxy + protocol: TCP + readinessProbe: + failureThreshold: 3 + httpGet: + path: /oauth/healthz + port: oauth-proxy + scheme: HTTPS + initialDelaySeconds: 5 + periodSeconds: 5 + successThreshold: 1 + timeoutSeconds: 1 + resources: + limits: + cpu: 100m + memory: 64Mi + requests: + cpu: 100m + memory: 64Mi + volumeMounts: + - mountPath: /etc/oauth/config + name: oauth-config + - mountPath: /etc/tls/private + name: tls-certificates + enableServiceLinks: false + serviceAccountName: jupyter-nb-kube-3aadmin + volumes: + - name: jupyterhub-nb-kube-3aadmin-pvc + persistentVolumeClaim: + claimName: {{.NotebookPVC}} + - name: oauth-config + secret: + defaultMode: 420 + secretName: jupyter-nb-kube-3aadmin-oauth-config + - name: tls-certificates + secret: + defaultMode: 420 + secretName: jupyter-nb-kube-3aadmin-tls + - name: {{.NotebookConfigMapName}} + configMap: + name: {{.NotebookConfigMapName}} diff --git a/infra/feast-operator/test/e2e_rhoai/resources/feast-test.ipynb b/infra/feast-operator/test/e2e_rhoai/resources/feast-test.ipynb new file mode 100755 index 00000000000..d3fb72eb57b --- /dev/null +++ b/infra/feast-operator/test/e2e_rhoai/resources/feast-test.ipynb @@ -0,0 +1,494 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import subprocess\n", + "\n", + "feast_version = os.environ.get(\"FEAST_VERSION\")\n", + "subprocess.run([\"pip\", \"install\", f\"feast=={feast_version}\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import feast\n", + "\n", + "actual_version = feast.__version__\n", + "assert actual_version == os.environ.get(\"FEAST_VERSION\"), (\n", + " f\"❌ Feast version mismatch. Expected: {os.environ.get('FEAST_VERSION')}, Found: {actual_version}\"\n", + ")\n", + "print(f\"✅ Successfully installed Feast version: {actual_version}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%cd /opt/app-root/src/feature_repo\n", + "!ls -l" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!cat /opt/app-root/src/feature_repo/feature_store.yaml" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir -p data\n", + "!wget -O data/city_wikipedia_summaries_with_embeddings.parquet https://raw.githubusercontent.com/opendatahub-io/feast/master/examples/rag/feature_repo/data/city_wikipedia_summaries_with_embeddings.parquet" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd \n", + "\n", + "df = pd.read_parquet(\"./data/city_wikipedia_summaries_with_embeddings.parquet\")\n", + "df['vector'] = df['vector'].apply(lambda x: x.tolist())\n", + "embedding_length = len(df['vector'][0])\n", + "assert embedding_length == 384, f\"❌ Expected vector length 384, but got {embedding_length}\"\n", + "print(f'embedding length = {embedding_length}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import display\n", + "\n", + "display(df.head())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install -q pymilvus transformers torch" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import subprocess\n", + "\n", + "# Run `feast apply` and capture output\n", + "result = subprocess.run([\"feast\", \"apply\"], capture_output=True, text=True)\n", + "\n", + "# Combine stdout and stderr in case important info is in either\n", + "output = result.stdout + result.stderr\n", + "\n", + "# Print full output for debugging (optional)\n", + "print(output)\n", + "\n", + "# Expected substrings to validate\n", + "expected_messages = [\n", + " \"Applying changes for project rag\",\n", + " \"Connecting to Milvus in local mode\",\n", + " \"Deploying infrastructure for city_embeddings\"\n", + "]\n", + "\n", + "# Validate all expected messages are in output\n", + "for msg in expected_messages:\n", + " assert msg in output, f\"❌ Expected message not found: '{msg}'\"\n", + "\n", + "print(\"✅ All expected messages were found in the output.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "from feast import FeatureStore\n", + "\n", + "store = FeatureStore(repo_path=\".\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import io\n", + "import sys\n", + "\n", + "# Capture stdout\n", + "captured_output = io.StringIO()\n", + "sys_stdout_backup = sys.stdout\n", + "sys.stdout = captured_output\n", + "\n", + "# Call the function\n", + "store.write_to_online_store(feature_view_name='city_embeddings', df=df)\n", + "\n", + "# Restore stdout\n", + "sys.stdout = sys_stdout_backup\n", + "\n", + "# Get the output\n", + "output_str = captured_output.getvalue()\n", + "\n", + "# Expected message\n", + "expected_msg = \"Connecting to Milvus in local mode using data/online_store.db\"\n", + "\n", + "# Validate\n", + "assert expected_msg in output_str, f\"❌ Expected message not found.\\nExpected: {expected_msg}\\nActual Output:\\n{output_str}\"\n", + "\n", + "print(\"✅ Output message validated successfully.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# List batch feature views\n", + "batch_fvs = store.list_batch_feature_views()\n", + "\n", + "# Print the number of batch feature views\n", + "print(\"Number of batch feature views:\", len(batch_fvs))\n", + "\n", + "# Assert that the result is an integer and non-negative\n", + "assert isinstance(len(batch_fvs), int), \"Result is not an integer\"\n", + "assert len(batch_fvs) >= 0, \"Feature view count is negative\"\n", + "\n", + "print(\"Feature views listed correctly ✅\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from feast import FeatureStore\n", + "\n", + "# Initialize store (if not already)\n", + "store = FeatureStore(repo_path=\".\") # Adjust path if necessary\n", + "\n", + "# Retrieve the feature view\n", + "fv = store.get_feature_view(\"city_embeddings\")\n", + "\n", + "# Assert name\n", + "assert fv.name == \"city_embeddings\", \"Feature view name mismatch\"\n", + "\n", + "# Assert entities\n", + "assert fv.entities == [\"item_id\"], f\"Expected entities ['item_id'], got {fv.entities}\"\n", + "\n", + "# Assert feature names and vector index settings\n", + "feature_names = [f.name for f in fv.features]\n", + "assert \"vector\" in feature_names, \"Missing 'vector' feature\"\n", + "assert \"state\" in feature_names, \"Missing 'state' feature\"\n", + "assert \"sentence_chunks\" in feature_names, \"Missing 'sentence_chunks' feature\"\n", + "assert \"wiki_summary\" in feature_names, \"Missing 'wiki_summary' feature\"\n", + "\n", + "# Assert 'vector' feature is a vector index with COSINE metric\n", + "vector_feature = next(f for f in fv.features if f.name == \"vector\")\n", + "assert vector_feature.vector_index, \"'vector' feature is not indexed\"\n", + "assert vector_feature.vector_search_metric == \"COSINE\", \"Expected COSINE search metric for 'vector'\"\n", + "\n", + "print(\"All assertions passed ✅\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from feast.entity import Entity\n", + "from feast.types import ValueType\n", + "entity = Entity(\n", + " name=\"item_id1\",\n", + " value_type=ValueType.INT64,\n", + " description=\"test id\",\n", + " tags={\"team\": \"feast\"},\n", + ")\n", + "store.apply(entity)\n", + "assert any(e.name == \"item_id1\" for e in store.list_entities())\n", + "print(\"Entity added ✅\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "entity_to_delete = store.get_entity(\"item_id1\")\n", + "\n", + "store.apply(\n", + " objects=[],\n", + " objects_to_delete=[entity_to_delete],\n", + " partial=False\n", + ")\n", + "\n", + "# Validation after deletion\n", + "assert not any(e.name == \"item_id1\" for e in store.list_entities())\n", + "print(\"Entity deleted ✅\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# List batch feature views\n", + "batch_fvs = store.list_batch_feature_views()\n", + "assert len(batch_fvs) == 1\n", + "\n", + "# Print count\n", + "print(f\"Found {len(batch_fvs)} batch feature view(s) ✅\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pymilvus_client = store._provider._online_store._connect(store.config)\n", + "COLLECTION_NAME = pymilvus_client.list_collections()[0]\n", + "\n", + "milvus_query_result = pymilvus_client.query(\n", + " collection_name=COLLECTION_NAME,\n", + " filter=\"item_id == '0'\",\n", + ")\n", + "pd.DataFrame(milvus_query_result[0]).head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import torch\n", + "import torch.nn.functional as F\n", + "from feast import FeatureStore\n", + "from pymilvus import MilvusClient, DataType, FieldSchema\n", + "from transformers import AutoTokenizer, AutoModel\n", + "from example_repo import city_embeddings_feature_view, item\n", + "\n", + "TOKENIZER = \"sentence-transformers/all-MiniLM-L6-v2\"\n", + "MODEL = \"sentence-transformers/all-MiniLM-L6-v2\"\n", + "\n", + "def mean_pooling(model_output, attention_mask):\n", + " token_embeddings = model_output[\n", + " 0\n", + " ] # First element of model_output contains all token embeddings\n", + " input_mask_expanded = (\n", + " attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()\n", + " )\n", + " return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(\n", + " input_mask_expanded.sum(1), min=1e-9\n", + " )\n", + "\n", + "def run_model(sentences, tokenizer, model):\n", + " encoded_input = tokenizer(\n", + " sentences, padding=True, truncation=True, return_tensors=\"pt\"\n", + " )\n", + " # Compute token embeddings\n", + " with torch.no_grad():\n", + " model_output = model(**encoded_input)\n", + "\n", + " sentence_embeddings = mean_pooling(model_output, encoded_input[\"attention_mask\"])\n", + " sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)\n", + " return sentence_embeddings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "question = \"Which city has the largest population in New York?\"\n", + "\n", + "tokenizer = AutoTokenizer.from_pretrained(TOKENIZER)\n", + "model = AutoModel.from_pretrained(MODEL)\n", + "query_embedding = run_model(question, tokenizer, model)\n", + "query = query_embedding.detach().cpu().numpy().tolist()[0]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import display\n", + "\n", + "# Retrieve top k documents\n", + "context_data = store.retrieve_online_documents_v2(\n", + " features=[\n", + " \"city_embeddings:vector\",\n", + " \"city_embeddings:item_id\",\n", + " \"city_embeddings:state\",\n", + " \"city_embeddings:sentence_chunks\",\n", + " \"city_embeddings:wiki_summary\",\n", + " ],\n", + " query=query,\n", + " top_k=3,\n", + " distance_metric='COSINE',\n", + ").to_df()\n", + "display(context_data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def format_documents(context_df):\n", + " output_context = \"\"\n", + " unique_documents = context_df.drop_duplicates().apply(\n", + " lambda x: \"City & State = {\" + x['state'] +\"}\\nSummary = {\" + x['wiki_summary'].strip()+\"}\",\n", + " axis=1,\n", + " )\n", + " for i, document_text in enumerate(unique_documents):\n", + " output_context+= f\"****START DOCUMENT {i}****\\n{document_text.strip()}\\n****END DOCUMENT {i}****\"\n", + " return output_context" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "RAG_CONTEXT = format_documents(context_data[['state', 'wiki_summary']])\n", + "print(RAG_CONTEXT)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "FULL_PROMPT = f\"\"\"\n", + "You are an assistant for answering questions about states. You will be provided documentation from Wikipedia. Provide a conversational answer.\n", + "If you don't know the answer, just say \"I do not know.\" Don't make up an answer.\n", + "\n", + "Here are document(s) you should use when answer the users question:\n", + "{RAG_CONTEXT}\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install openai" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from openai import OpenAI\n", + "\n", + "client = OpenAI(\n", + " api_key=os.environ.get(\"OPENAI_API_KEY\"),\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "response = client.chat.completions.create(\n", + " model=\"gpt-4o-mini\",\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": FULL_PROMPT},\n", + " {\"role\": \"user\", \"content\": question}\n", + " ],\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# The expected output\n", + "expected_output = (\n", + " \"New York City\"\n", + ")\n", + "\n", + "# Actual output from response\n", + "actual_output = '\\n'.join([c.message.content.strip() for c in response.choices])\n", + "\n", + "# Validate\n", + "assert expected_output in actual_output, f\"❌ Output mismatch:\\nExpected: {expected_output}\\nActual: {actual_output}\"\n", + "\n", + "print(\"✅ Output matches expected response.\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.5" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/infra/feast-operator/test/e2e_rhoai/resources/feature_repo/__init__.py b/infra/feast-operator/test/e2e_rhoai/resources/feature_repo/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/infra/feast-operator/test/e2e_rhoai/resources/feature_repo/example_repo.py b/infra/feast-operator/test/e2e_rhoai/resources/feature_repo/example_repo.py new file mode 100755 index 00000000000..7a37d99d495 --- /dev/null +++ b/infra/feast-operator/test/e2e_rhoai/resources/feature_repo/example_repo.py @@ -0,0 +1,42 @@ +from datetime import timedelta + +from feast import ( + FeatureView, + Field, + FileSource, +) +from feast.data_format import ParquetFormat +from feast.types import Float32, Array, String, ValueType +from feast import Entity + +item = Entity( + name="item_id", + description="Item ID", + value_type=ValueType.INT64, +) + +parquet_file_path = "./data/city_wikipedia_summaries_with_embeddings.parquet" + +source = FileSource( + file_format=ParquetFormat(), + path=parquet_file_path, + timestamp_field="event_timestamp", +) + +city_embeddings_feature_view = FeatureView( + name="city_embeddings", + entities=[item], + schema=[ + Field( + name="vector", + dtype=Array(Float32), + vector_index=True, + vector_search_metric="COSINE", + ), + Field(name="state", dtype=String), + Field(name="sentence_chunks", dtype=String), + Field(name="wiki_summary", dtype=String), + ], + source=source, + ttl=timedelta(hours=2), +) diff --git a/infra/feast-operator/test/e2e_rhoai/resources/feature_repo/feature_store.yaml b/infra/feast-operator/test/e2e_rhoai/resources/feature_repo/feature_store.yaml new file mode 100755 index 00000000000..f8f9cc293dc --- /dev/null +++ b/infra/feast-operator/test/e2e_rhoai/resources/feature_repo/feature_store.yaml @@ -0,0 +1,16 @@ +project: rag +provider: local +registry: data/registry.db +online_store: + type: milvus + path: data/online_store.db + vector_enabled: true + embedding_dim: 384 + index_type: "FLAT" + metric_type: "COSINE" +offline_store: + type: file +entity_key_serialization_version: 3 +auth: + type: no_auth + diff --git a/infra/feast-operator/test/e2e_rhoai/resources/pvc.yaml b/infra/feast-operator/test/e2e_rhoai/resources/pvc.yaml new file mode 100644 index 00000000000..a9e8c1be299 --- /dev/null +++ b/infra/feast-operator/test/e2e_rhoai/resources/pvc.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: jupyterhub-nb-kube-3aadmin-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi diff --git a/infra/feast-operator/test/utils/notebook_util.go b/infra/feast-operator/test/utils/notebook_util.go new file mode 100644 index 00000000000..28bf64a67eb --- /dev/null +++ b/infra/feast-operator/test/utils/notebook_util.go @@ -0,0 +1,218 @@ +package utils + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "strings" + "text/template" + "time" + + . "github.com/onsi/gomega" +) + +type NotebookTemplateParams struct { + Namespace string + IngressDomain string + OpenDataHubNamespace string + NotebookImage string + NotebookConfigMapName string + NotebookPVC string + Username string + OC_TOKEN string + OC_SERVER string + NotebookFile string + Command string + PipIndexUrl string + PipTrustedHost string + FeastVerison string + OpenAIAPIKey string +} + +// CreateNotebook renders a notebook manifest from a template and applies it using kubectl. +func CreateNotebook(params NotebookTemplateParams) error { + content, err := os.ReadFile("test/e2e_rhoai/resources/custom-nb.yaml") + if err != nil { + return fmt.Errorf("failed to read template file: %w", err) + } + + tmpl, err := template.New("notebook").Parse(string(content)) + if err != nil { + return fmt.Errorf("failed to parse template: %w", err) + } + + var rendered bytes.Buffer + if err := tmpl.Execute(&rendered, params); err != nil { + return fmt.Errorf("failed to substitute template: %w", err) + } + + tmpFile, err := os.CreateTemp("", "notebook-*.yaml") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + + // Defer cleanup of temp file + defer func() { + if err := os.Remove(tmpFile.Name()); err != nil { + fmt.Printf("warning: failed to remove temp file %s: %v", tmpFile.Name(), err) + } + }() + + if _, err := tmpFile.Write(rendered.Bytes()); err != nil { + return fmt.Errorf("failed to write to temp file: %w", err) + } + + if err := tmpFile.Close(); err != nil { + return fmt.Errorf("failed to close temp file: %w", err) + } + + // fmt.Println("Notebook manifest applied successfully") + cmd := exec.Command("kubectl", "apply", "-f", tmpFile.Name(), "-n", params.Namespace) + output, err := Run(cmd, "/test/e2e_rhoai") + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf( + "Failed to create Notebook %s.\nError: %v\nOutput: %s\n", + tmpFile.Name(), err, output, + )) + fmt.Printf("Notebook %s created successfully\n", tmpFile.Name()) + return nil +} + +// MonitorNotebookPod waits for a notebook pod to reach Running state and verifies execution logs. +func MonitorNotebookPod(namespace, podPrefix string, notebookName string) error { + const successMarker = "Notebook executed successfully" + const failureMarker = "Notebook execution failed" + const pollInterval = 5 * time.Second + var pod *PodInfo + + fmt.Println("🔄 Waiting for notebook pod to reach Running & Ready state...") + + foundRunningReady := false + for i := 0; i < 36; i++ { + var err error + pod, err = getPodByPrefix(namespace, podPrefix) + if err != nil { + fmt.Printf("⏳ Pod not created yet: %v\n", err) + time.Sleep(pollInterval) + continue + } + if pod.Status == "Running" { + fmt.Printf("✅ Pod %s is Running and Ready.\n", pod.Name) + foundRunningReady = true + break + } + fmt.Printf("⏳ Pod %s not ready yet. Phase: %s\n", pod.Name, pod.Status) + time.Sleep(pollInterval) + } + + if !foundRunningReady { + return fmt.Errorf("❌ Pod %s did not reach Running & Ready state within 3 minutes", podPrefix) + } + + // Start monitoring notebook logs + fmt.Printf("⏳ Monitoring Notebook pod %s Logs for Jupyter Notebook %s execution status\n", pod.Name, notebookName) + + for i := 0; i < 60; i++ { + logs, err := getPodLogs(namespace, pod.Name) + if err != nil { + fmt.Printf("⏳ Failed to get logs for pod %s: %v\n", pod.Name, err) + time.Sleep(pollInterval) + continue + } + + if strings.Contains(logs, successMarker) { + Expect(logs).To(ContainSubstring(successMarker)) + fmt.Printf("✅ Jupyter Notebook pod %s executed successfully.\n", pod.Name) + return nil + } + + if strings.Contains(logs, failureMarker) { + fmt.Printf("❌ Notebook pod %s failed: failure marker found.\n", pod.Name) + return fmt.Errorf("Notebook failed in execution. Logs:\n%s", logs) + } + + time.Sleep(pollInterval) + } + + return fmt.Errorf("❌ Timed out waiting for notebook pod %s to complete", podPrefix) +} + +type PodInfo struct { + Name string + Status string +} + +// returns the first pod matching a name prefix in the given namespace. +func getPodByPrefix(namespace, prefix string) (*PodInfo, error) { + cmd := exec.Command( + "kubectl", "get", "pods", "-n", namespace, + "-o", "jsonpath={range .items[*]}{.metadata.name} {.status.phase}{\"\\n\"}{end}", + ) + output, err := Run(cmd, "/test/e2e_rhoai") + if err != nil { + return nil, fmt.Errorf("failed to get pods: %w", err) + } + + lines := strings.Split(strings.TrimSpace(string(output)), "\n") + for _, line := range lines { + parts := strings.Fields(line) + if len(parts) < 2 { + continue + } + name := parts[0] + status := parts[1] + + if strings.HasPrefix(name, prefix) { + return &PodInfo{ + Name: name, + Status: status, + }, nil + } + } + + return nil, fmt.Errorf("no pod found with prefix %q in namespace %q", prefix, namespace) +} + +// retrieves the logs of a specified pod in the given namespace. +func getPodLogs(namespace, podName string) (string, error) { + cmd := exec.Command("kubectl", "logs", "-n", namespace, podName) + var out bytes.Buffer + var stderr bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &stderr + + err := cmd.Run() + if err != nil { + return "", fmt.Errorf("error getting pod logs: %v - %s", err, stderr.String()) + } + + return out.String(), nil +} + +// returns the OpenShift cluster ingress domain. +func GetIngressDomain(testDir string) string { + cmd := exec.Command("oc", "get", "ingresses.config.openshift.io", "cluster", "-o", "jsonpath={.spec.domain}") + output, _ := Run(cmd, testDir) + return string(output) +} + +// returns the current OpenShift user authentication token. +func GetOCToken(testDir string) string { + cmd := exec.Command("oc", "whoami", "--show-token") + output, _ := Run(cmd, testDir) + return string(output) +} + +// returns the OpenShift API server URL for the current user. +func GetOCServer(testDir string) string { + cmd := exec.Command("oc", "whoami", "--show-server") + output, _ := Run(cmd, testDir) + return string(output) +} + +// returns the OpenShift cluster logged in Username +func GetOCUser(testDir string) string { + cmd := exec.Command("oc", "whoami") + output, _ := Run(cmd, testDir) + return strings.TrimSpace(string(output)) +}