From 66ccdb7034d4198b9d0d9b7020233e1bd6fd8d07 Mon Sep 17 00:00:00 2001 From: Vladi Bilonenko Date: Wed, 24 Apr 2024 21:29:49 +0200 Subject: [PATCH 1/3] SNOWFLAKE storage --- .env.example | 1 + src/client/DatasetSettingsModal.js | 8 +- src/client/Downloading.js | 4 +- src/client/ReportPage.js | 4 +- src/client/actions/dataset.js | 28 +++- src/client/actions/message.js | 19 ++- src/client/actions/report.js | 4 +- src/client/lib/shouldAddQuery.js | 2 +- src/client/reducers/connectionReducer.js | 2 +- src/client/reducers/datasetReducer.js | 73 +++++++++ src/client/reducers/rootReducer.js | 73 +-------- src/server/app/app.go | 53 +++++-- src/server/dekart/dataset.go | 14 +- src/server/dekart/query.go | 3 + src/server/dekart/querysource.go | 52 ++++--- src/server/dekart/stream.go | 23 ++- src/server/job/job.go | 1 + src/server/main.go | 3 + src/server/snowflakejob/snowflakejob.go | 113 +++++++------- src/server/snowflakeutils/snowflakeutils.go | 156 ++++++++++++++++++++ src/server/storage/snowflakestorage.go | 143 ++++++++++++++++++ src/server/storage/storage.go | 18 +++ 22 files changed, 611 insertions(+), 186 deletions(-) create mode 100644 src/client/reducers/datasetReducer.js create mode 100644 src/server/snowflakeutils/snowflakeutils.go create mode 100644 src/server/storage/snowflakestorage.go diff --git a/.env.example b/.env.example index 1138fd34..56d7dc5a 100644 --- a/.env.example +++ b/.env.example @@ -65,6 +65,7 @@ DEKART_UX_NOT_FOUND_ERROR_INFO_HTML= DEKART_UX_SAMPLE_QUERY_SQL= #dev +DEKART_STREAM_TIMEOUT= DEKART_LOG_DEBUG=1 DEKART_LOG_PRETTY=1 DEKART_STATIC_FILES= diff --git a/src/client/DatasetSettingsModal.js b/src/client/DatasetSettingsModal.js index d6a0e0b0..90f68197 100644 --- a/src/client/DatasetSettingsModal.js +++ b/src/client/DatasetSettingsModal.js @@ -9,7 +9,7 @@ import { closeDatasetSettingsModal, removeDataset, updateDatasetName } from './a function ModalFooter ({ saving, setSaving, name, datasetId }) { const dispatch = useDispatch() - const numDatasets = useSelector(state => state.datasets.length) + const numDatasets = useSelector(state => state.dataset.list.length) return (
@@ -45,11 +45,11 @@ function ModalFooter ({ saving, setSaving, name, datasetId }) { } export default function DatasetSettingsModal () { - const datasetId = useSelector(state => state.datasetSettings.datasetId) - const dataset = useSelector(state => state.datasets.find(d => d.id === datasetId)) + const datasetId = useSelector(state => state.dataset.settings.datasetId) + const dataset = useSelector(state => state.dataset.list.find(d => d.id === datasetId)) const queries = useSelector(state => state.queries) const files = useSelector(state => state.files) - const visible = useSelector(state => state.datasetSettings.visible) + const visible = useSelector(state => state.dataset.settings.visible) const dispatch = useDispatch() diff --git a/src/client/Downloading.js b/src/client/Downloading.js index f3b99ee6..933db5a7 100644 --- a/src/client/Downloading.js +++ b/src/client/Downloading.js @@ -4,7 +4,7 @@ import message from 'antd/es/message' import { useEffect } from 'react' function DownloadingMessage () { - const downloadingDatasets = useSelector(state => state.downloadingDatasets) + const downloadingDatasets = useSelector(state => state.dataset.downloading) const files = useSelector(state => state.files) const queries = useSelector(state => state.queries) const size = downloadingDatasets.reduce((size, { queryId, fileId }) => { @@ -26,7 +26,7 @@ function DownloadingMessage () { let hideDownloading = null export default function Downloading () { - const downloadingDatasets = useSelector(state => state.downloadingDatasets) + const downloadingDatasets = useSelector(state => state.dataset.downloading) const show = downloadingDatasets.length > 0 const [api, contextHolder] = message.useMessage() useEffect(() => { diff --git a/src/client/ReportPage.js b/src/client/ReportPage.js index e92132de..8aff6551 100644 --- a/src/client/ReportPage.js +++ b/src/client/ReportPage.js @@ -111,10 +111,10 @@ function getTabPane (dataset, queries, files, status) { } function DatasetSection ({ reportId }) { - const datasets = useSelector(state => state.datasets) + const datasets = useSelector(state => state.dataset.list) const queries = useSelector(state => state.queries) const files = useSelector(state => state.files) - const activeDataset = useSelector(state => state.activeDataset) + const activeDataset = useSelector(state => state.dataset.active) const report = useSelector(state => state.report) const queryStatus = useSelector(state => state.queryStatus) const { canWrite } = report diff --git a/src/client/actions/dataset.js b/src/client/actions/dataset.js index 29ec830d..a8c13e40 100644 --- a/src/client/actions/dataset.js +++ b/src/client/actions/dataset.js @@ -1,12 +1,13 @@ import { CreateDatasetRequest, RemoveDatasetRequest, UpdateDatasetConnectionRequest, UpdateDatasetNameRequest } from '../../proto/dekart_pb' import { Dekart } from '../../proto/dekart_pb_service' import { grpcCall } from './grpc' -import { downloading, setError, finishDownloading, success } from './message' +import { downloading, setError, finishDownloading, success, info, warn } from './message' import { addDataToMap, toggleSidePanel, removeDataset as removeDatasetFromKepler, reorderLayer } from '@dekart-xyz/kepler.gl/dist/actions' import { processCsvData, processGeojson } from '@dekart-xyz/kepler.gl/dist/processors' import { get } from '../lib/api' import { KeplerGlSchema } from '@dekart-xyz/kepler.gl/dist/schemas' import getDatasetName from '../lib/getDatasetName' +import { runQuery } from './query' export function createDataset (reportId) { return (dispatch, getState) => { @@ -19,7 +20,7 @@ export function createDataset (reportId) { export function setActiveDataset (datasetId) { return (dispatch, getState) => { - const { datasets } = getState() + const { list: datasets } = getState().dataset const dataset = datasets.find(d => d.id === datasetId) || datasets[0] if (dataset) { dispatch({ type: setActiveDataset.name, dataset }) @@ -29,7 +30,7 @@ export function setActiveDataset (datasetId) { export function updateDatasetName (datasetId, name) { return async (dispatch, getState) => { - const { datasets } = getState() + const { list: datasets } = getState().dataset const dataset = datasets.find(d => d.id === datasetId) if (!dataset) { return @@ -44,7 +45,7 @@ export function updateDatasetName (datasetId, name) { export function updateDatasetConnection (datasetId, connectionId) { return async (dispatch, getState) => { - const { datasets } = getState() + const { list: datasets } = getState().dataset const dataset = datasets.find(d => d.id === datasetId) if (!dataset) { return @@ -59,7 +60,7 @@ export function updateDatasetConnection (datasetId, connectionId) { export function removeDataset (datasetId) { return async (dispatch, getState) => { - const { datasets, activeDataset } = getState() + const { list: datasets, active: activeDataset } = getState().dataset if (activeDataset.id === datasetId) { // removed active query const datasetsLeft = datasets.filter(q => q.id !== datasetId) @@ -81,6 +82,8 @@ export function removeDataset (datasetId) { export function downloadDataset (dataset, sourceId, extension, prevDatasetsList) { return async (dispatch, getState) => { + const { dataset: { list: datasets }, files, queries, keplerGl } = getState() + const label = getDatasetName(dataset, queries, files) dispatch({ type: downloadDataset.name, dataset }) dispatch(downloading(dataset)) const { token } = getState() @@ -95,11 +98,20 @@ export function downloadDataset (dataset, sourceId, extension, prevDatasetsList) data = processGeojson(json) } } catch (err) { - dispatch(setError(err)) + dispatch(finishDownloading(dataset)) // remove downloading message + if (err.status === 410 && dataset.queryId) { // gone from dw query temporary storage + const { canRun, queryText } = getState().queryStatus[dataset.queryId] + if (!canRun) { + dispatch(warn(<>{label} result expired, false)) + return + } + dispatch(info(<>{label} result expired, re-running)) + dispatch(runQuery(dataset.queryId, queryText)) + } else { + dispatch(setError(err)) + } return } - const { datasets, files, queries, keplerGl } = getState() - const label = getDatasetName(dataset, queries, files) // check if dataset was already added to kepler const addedDatasets = getState().keplerGl.kepler.visState.datasets const prevDataset = prevDatasetsList.find(d => d.id in addedDatasets) diff --git a/src/client/actions/message.js b/src/client/actions/message.js index 4a0bca54..916fff08 100644 --- a/src/client/actions/message.js +++ b/src/client/actions/message.js @@ -10,10 +10,25 @@ export function downloading (dataset) { return { type: downloading.name, dataset } } -export function finishDownloading (dataset) { - return { type: finishDownloading.name, dataset } +export function finishDownloading (dataset, gone = false) { + return { type: finishDownloading.name, dataset, gone } } +export function warn (content, transitive = true) { + if (!transitive) { + message.warn({ + content, + duration: 10000, + style + }) + } else { + message.warn({ + content, + style + }) + } + return { type: warn.name } +} export function success (content) { message.success({ content, diff --git a/src/client/actions/report.js b/src/client/actions/report.js index c8aace05..ddaad57d 100644 --- a/src/client/actions/report.js +++ b/src/client/actions/report.js @@ -83,7 +83,7 @@ function shouldDownloadQueryText (query, prevQueriesList, queriesList) { export function reportUpdate (reportStreamResponse) { const { report, queriesList, datasetsList, filesList } = reportStreamResponse return async (dispatch, getState) => { - const { queries: prevQueriesList, datasets: prevDatasetsList, report: prevReport, files: prevFileList, env, connection } = getState() + const { queries: prevQueriesList, dataset: { list: prevDatasetsList }, report: prevReport, files: prevFileList, env, connection } = getState() dispatch({ type: reportUpdate.name, report, @@ -126,7 +126,7 @@ export function reportUpdate (reportStreamResponse) { let extension = 'csv' if (dataset.queryId) { const query = queriesList.find(q => q.id === dataset.queryId) - if (shouldAddQuery(query, prevQueriesList, queriesList) || shouldUpdateDataset(dataset, prevDatasetsList)) { + if (shouldAddQuery(query, prevQueriesList) || shouldUpdateDataset(dataset, prevDatasetsList)) { dispatch(downloadDataset( dataset, query.jobResultId, diff --git a/src/client/lib/shouldAddQuery.js b/src/client/lib/shouldAddQuery.js index 215afdb9..2e2fc56b 100644 --- a/src/client/lib/shouldAddQuery.js +++ b/src/client/lib/shouldAddQuery.js @@ -1,4 +1,4 @@ -export function shouldAddQuery (query, prevQueriesList, queriesList) { +export function shouldAddQuery (query, prevQueriesList) { if (!query.jobResultId) { return false } diff --git a/src/client/reducers/connectionReducer.js b/src/client/reducers/connectionReducer.js index fc486558..ae9040bd 100644 --- a/src/client/reducers/connectionReducer.js +++ b/src/client/reducers/connectionReducer.js @@ -107,7 +107,7 @@ function userDefined (state = false, action) { switch (action.type) { case setEnv.name: { const { BIGQUERY_PROJECT_ID, CLOUD_STORAGE_BUCKET, DATASOURCE } = action.variables - return (BIGQUERY_PROJECT_ID === '' && DATASOURCE === 'BQ') || CLOUD_STORAGE_BUCKET === '' + return (BIGQUERY_PROJECT_ID === '' && DATASOURCE === 'BQ') || (CLOUD_STORAGE_BUCKET === '' && DATASOURCE !== 'SNOWFLAKE') } default: return state diff --git a/src/client/reducers/datasetReducer.js b/src/client/reducers/datasetReducer.js new file mode 100644 index 00000000..2323a264 --- /dev/null +++ b/src/client/reducers/datasetReducer.js @@ -0,0 +1,73 @@ +import { combineReducers } from 'redux' +import { closeDatasetSettingsModal, openDatasetSettingsModal, setActiveDataset } from '../actions/dataset' +import { downloading as downloadingAction, finishDownloading } from '../actions/message' +import { openReport, reportUpdate } from '../actions/report' + +function downloading (state = [], action) { + const { dataset } = action + switch (action.type) { + case downloadingAction.name: + return state.concat(dataset) + case finishDownloading.name: + return state.filter(d => d.id !== dataset.id) + default: + return state + } +} + +function active (state = null, action) { + const { datasetsList, prevDatasetsList } = action + switch (action.type) { + case openReport.name: + return null + case setActiveDataset.name: + return action.dataset + case reportUpdate.name: + if (!state) { + return datasetsList[0] || state + } + if (datasetsList.length > prevDatasetsList.length) { + return datasetsList.slice(-1)[0] + } + return { + ...(datasetsList.find(d => d.id === state.id) || datasetsList[0]) + } + default: + return state + } +} + +function list (state = [], action) { + switch (action.type) { + case openReport.name: + return [] + case reportUpdate.name: + return action.datasetsList + default: + return state + } +} + +function settings (state = { datasetId: null, visible: false }, action) { + switch (action.type) { + case openDatasetSettingsModal.name: + return { + datasetId: action.datasetId, + visible: true + } + case closeDatasetSettingsModal.name: + return { + datasetId: null, + visible: false + } + default: + return state + } +} + +export default combineReducers({ + downloading, + active, + settings, + list +}) diff --git a/src/client/reducers/rootReducer.js b/src/client/reducers/rootReducer.js index b5ab480e..8fdb5ec9 100644 --- a/src/client/reducers/rootReducer.js +++ b/src/client/reducers/rootReducer.js @@ -2,8 +2,7 @@ import { combineReducers } from 'redux' import { ActionTypes as KeplerActionTypes } from '@dekart-xyz/kepler.gl/dist/actions' import { setUserMapboxAccessTokenUpdater } from '@dekart-xyz/kepler.gl/dist/reducers/ui-state-updaters' import { openReport, reportUpdate, forkReport, saveMap, reportTitleChange, newReport, newForkedReport, unsubscribeReports, reportsListUpdate } from '../actions/report' -import { downloading, finishDownloading, setStreamError } from '../actions/message' -import { closeDatasetSettingsModal, openDatasetSettingsModal, setActiveDataset } from '../actions/dataset' +import { setStreamError } from '../actions/message' import { queries, queryStatus } from './queryReducer' import { setUsage } from '../actions/usage' import { setEnv } from '../actions/env' @@ -15,6 +14,7 @@ import token from './tokenReducer' import connection from './connectionReducer' import user from './userReducer' import httpError from './httpErrorReducer' +import dataset from './datasetReducer' const customKeplerGlReducer = keplerGlReducer.initialState({ uiState: { @@ -59,17 +59,6 @@ function files (state = [], action) { } } -function datasets (state = [], action) { - switch (action.type) { - case openReport.name: - return [] - case reportUpdate.name: - return action.datasetsList - default: - return state - } -} - const defaultReportStatus = { dataAdded: false, canSave: false, @@ -180,40 +169,6 @@ function env (state = defaultEnv, action) { } } -function downloadingDatasets (state = [], action) { - const { dataset } = action - switch (action.type) { - case downloading.name: - return state.concat(dataset) - case finishDownloading.name: - return state.filter(d => d.id !== dataset.id) - default: - return state - } -} - -function activeDataset (state = null, action) { - const { datasetsList, prevDatasetsList } = action - switch (action.type) { - case openReport.name: - return null - case setActiveDataset.name: - return action.dataset - case reportUpdate.name: - if (!state) { - return datasetsList[0] || state - } - if (datasetsList.length > prevDatasetsList.length) { - return datasetsList.slice(-1)[0] - } - return { - ...(datasetsList.find(d => d.id === state.id) || datasetsList[0]) - } - default: - return state - } -} - function release (state = null, action) { switch (action.type) { case newRelease.name: @@ -257,42 +212,22 @@ function fileUploadStatus (state = {}, action) { } } -function datasetSettings (state = { datasetId: null, visible: false }, action) { - switch (action.type) { - case openDatasetSettingsModal.name: - return { - datasetId: action.datasetId, - visible: true - } - case closeDatasetSettingsModal.name: - return { - datasetId: null, - visible: false - } - default: - return state - } -} - export default combineReducers({ keplerGl, report, queries, queryStatus, - activeDataset, reportStatus, reportsList, env, httpError, - downloadingDatasets, release, - datasets, files, fileUploadStatus, usage, - datasetSettings, connection, token, stream, - user + user, + dataset }) diff --git a/src/server/app/app.go b/src/server/app/app.go index 571705a1..5a188261 100644 --- a/src/server/app/app.go +++ b/src/server/app/app.go @@ -7,6 +7,8 @@ import ( "dekart/src/server/user" "net/http" "os" + "regexp" + "strings" "time" "github.com/gorilla/mux" @@ -43,32 +45,53 @@ func (m ResponseWriter) WriteHeader(statusCode int) { var allowedOrigin string = os.Getenv("DEKART_CORS_ORIGIN") +func getAllowedOrigin(origin string) string { + if matchOrigin(origin) { + return origin + } + return "null" +} + +func matchOrigin(origin string) bool { + if allowedOrigin == "" || allowedOrigin == "*" { + log.Warn().Msg("DEKART_CORS_ORIGIN is empty or *") + return true + } + //check if allowedOrigin contains wildcard using strings.Contains + if strings.Contains(allowedOrigin, "*") { + regexPattern := strings.ReplaceAll(allowedOrigin, "*", ".*") + match, err := regexp.MatchString(regexPattern, origin) + if err != nil { + log.Error().Err(err).Msg("failed to match origin") + return false + } + return match + } + + result := origin == allowedOrigin + if !result { + log.Warn().Str("origin", origin).Str("allowed origin", allowedOrigin).Msg("Origin is not allowed") + } + return result +} + func configureGRPC(dekartServer *dekart.Server) *grpcweb.WrappedGrpcServer { server := grpc.NewServer() proto.RegisterDekartServer(server, dekartServer) + if allowedOrigin == "" || allowedOrigin == "null" { + log.Info().Msg("CORS is disabled") + return grpcweb.WrapServer(server) + } return grpcweb.WrapServer( server, grpcweb.WithOriginFunc(func(origin string) bool { - if allowedOrigin == "" || allowedOrigin == "*" { - log.Warn().Msg("DEKART_CORS_ORIGIN is empty or *") - return true - } - result := origin == allowedOrigin - if !result { - log.Warn().Str("origin", origin).Str("allowed origin", allowedOrigin).Msg("Origin is not allowed") - } - return result + return matchOrigin(origin) }), ) } func setOriginHeader(w http.ResponseWriter, r *http.Request) { - if allowedOrigin == "" || allowedOrigin == "*" { - log.Warn().Msg("DEKART_CORS_ORIGIN is empty or *") - w.Header().Set("Access-Control-Allow-Origin", "*") - } else { - w.Header().Set("Access-Control-Allow-Origin", allowedOrigin) - } + w.Header().Set("Access-Control-Allow-Origin", getAllowedOrigin(r.Header.Get("Origin"))) w.Header().Set("Access-Control-Allow-Headers", "Authorization") } diff --git a/src/server/dekart/dataset.go b/src/server/dekart/dataset.go index f7fec160..19fef970 100644 --- a/src/server/dekart/dataset.go +++ b/src/server/dekart/dataset.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "dekart/src/proto" + "dekart/src/server/storage" "dekart/src/server/user" "fmt" "io" @@ -319,6 +320,15 @@ func (s Server) CreateDataset(ctx context.Context, req *proto.CreateDatasetReque return res, nil } +// process storage expired error +func storageError(w http.ResponseWriter, err error) { + if _, ok := err.(*storage.ExpiredError); ok { + http.Error(w, "expired", http.StatusGone) + return + } + HttpError(w, err) +} + func (s Server) ServeDatasetSource(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) ctx := r.Context() @@ -342,12 +352,12 @@ func (s Server) ServeDatasetSource(w http.ResponseWriter, r *http.Request) { obj := s.storage.GetObject(bucketName, fmt.Sprintf("%s.%s", vars["source"], vars["extension"])) created, err := obj.GetCreatedAt(ctx) if err != nil { - HttpError(w, err) + storageError(w, err) return } objectReader, err := obj.GetReader(ctx) if err != nil { - HttpError(w, err) + storageError(w, err) return } defer objectReader.Close() diff --git a/src/server/dekart/query.go b/src/server/dekart/query.go index 143dda85..39b5c1ed 100644 --- a/src/server/dekart/query.go +++ b/src/server/dekart/query.go @@ -95,6 +95,8 @@ func (s Server) RunAllQueries(ctx context.Context, req *proto.RunAllQueriesReque return nil, Unauthenticated } + log.Debug().Str("report_id", req.ReportId).Msg("RunAllQueries") + // get all queries from report queriesRows, err := s.db.QueryContext(ctx, `select @@ -194,6 +196,7 @@ func (s Server) runQuery(ctx context.Context, i query) error { log.Error().Err(err).Send() return err } + // Result ID should be same as job ID once available obj := s.storage.GetObject(i.bucketName, fmt.Sprintf("%s.csv", job.GetID())) go s.updateJobStatus(job, jobStatus) job.Status() <- int32(proto.Query_JOB_STATUS_PENDING) diff --git a/src/server/dekart/querysource.go b/src/server/dekart/querysource.go index db087742..c137a34a 100644 --- a/src/server/dekart/querysource.go +++ b/src/server/dekart/querysource.go @@ -3,6 +3,7 @@ package dekart import ( "context" "crypto/sha1" + "database/sql" "dekart/src/proto" "dekart/src/server/user" "fmt" @@ -66,26 +67,39 @@ func (s Server) storeQuerySync(ctx context.Context, bucketName, queryID string, queryTextByte := []byte(queryText) h.Write(queryTextByte) newQuerySourceId := fmt.Sprintf("%x", h.Sum(nil)) - storageWriter := s.storage.GetObject(bucketName, fmt.Sprintf("%s.sql", newQuerySourceId)).GetWriter(ctx) - _, err := storageWriter.Write(queryTextByte) - if err != nil { - log.Err(err).Msg("Error writing query_text to storage") - storageWriter.Close() - return err - } - err = storageWriter.Close() - if err != nil { - log.Err(err).Str("bucketName", bucketName).Msg("Error writing query_text to storage") - return err + var result sql.Result + var err error + if s.storage.CanSaveQuery() { + storageWriter := s.storage.GetObject(bucketName, fmt.Sprintf("%s.sql", newQuerySourceId)).GetWriter(ctx) + _, err = storageWriter.Write(queryTextByte) + if err != nil { + log.Err(err).Msg("Error writing query_text to storage") + storageWriter.Close() + return err + } + err = storageWriter.Close() + if err != nil { + log.Err(err).Str("bucketName", bucketName).Msg("Error writing query_text to storage") + return err + } + + result, err = s.db.ExecContext(ctx, + `update queries set query_source_id=$1, query_source=$2, updated_at=now() where id=$3 and query_source_id=$4`, + newQuerySourceId, + proto.Query_QUERY_SOURCE_STORAGE, + queryID, + prevQuerySourceId, + ) + } else { + result, err = s.db.ExecContext(ctx, + `update queries set query_text=$1, query_source_id=$2, query_source=$3, updated_at=now() where id=$4 and query_source_id=$5`, + queryText, + newQuerySourceId, + proto.Query_QUERY_SOURCE_INLINE, + queryID, + prevQuerySourceId, + ) } - - result, err := s.db.ExecContext(ctx, - `update queries set query_source_id=$1, query_source=$2, updated_at=now() where id=$3 and query_source_id=$4`, - newQuerySourceId, - proto.Query_QUERY_SOURCE_STORAGE, - queryID, - prevQuerySourceId, - ) if err != nil { return err } diff --git a/src/server/dekart/stream.go b/src/server/dekart/stream.go index 73a38af2..87b3c6e6 100644 --- a/src/server/dekart/stream.go +++ b/src/server/dekart/stream.go @@ -6,6 +6,8 @@ import ( "dekart/src/server/report" "dekart/src/server/user" "fmt" + "os" + "strconv" "time" "github.com/google/uuid" @@ -65,6 +67,21 @@ func (s Server) sendReportMessage(reportID string, srv proto.Dekart_GetReportStr } +const defaultStreamTimeout = 50 * time.Second + +// parse int constant from os env variable DEKART_STREAM_TIMEOUT +func getStreamTimeout() time.Duration { + timeout := os.Getenv("DEKART_STREAM_TIMEOUT") + if timeout == "" { + return defaultStreamTimeout + } + timeoutInt, err := strconv.Atoi(timeout) + if err != nil { + log.Fatal().Err(err).Msg("Failed to parse DEKART_STREAM_TIMEOUT") + } + return time.Duration(timeoutInt) * time.Second +} + // GetReportStream which sends report and queries on every update func (s Server) GetReportStream(req *proto.ReportStreamRequest, srv proto.Dekart_GetReportStreamServer) error { claims := user.GetClaims(srv.Context()) @@ -93,7 +110,7 @@ func (s Server) GetReportStream(req *proto.ReportStreamRequest, srv proto.Dekart ping := s.reportStreams.Register(req.Report.Id, streamID.String(), req.StreamOptions.Sequence) defer s.reportStreams.Deregister(req.Report.Id, streamID.String()) - ctx, cancel := context.WithTimeout(srv.Context(), 55*time.Second) + ctx, cancel := context.WithTimeout(srv.Context(), getStreamTimeout()) defer cancel() for { @@ -228,7 +245,7 @@ func (s Server) GetUserStream(req *proto.GetUserStreamRequest, srv proto.Dekart_ ping, streamID := s.userStreams.Register(*claims, req.StreamOptions.Sequence) defer s.userStreams.Deregister(*claims, streamID) - ctx, cancel := context.WithTimeout(srv.Context(), 55*time.Second) + ctx, cancel := context.WithTimeout(srv.Context(), getStreamTimeout()) defer cancel() for { @@ -261,7 +278,7 @@ func (s Server) GetReportListStream(req *proto.ReportListRequest, srv proto.Deka ping := s.reportStreams.Register(report.All, streamID.String(), req.StreamOptions.Sequence) defer s.reportStreams.Deregister(report.All, streamID.String()) - ctx, cancel := context.WithTimeout(srv.Context(), 55*time.Second) + ctx, cancel := context.WithTimeout(srv.Context(), getStreamTimeout()) defer cancel() for { diff --git a/src/server/job/job.go b/src/server/job/job.go index 24e925ff..39d0d489 100644 --- a/src/server/job/job.go +++ b/src/server/job/job.go @@ -81,6 +81,7 @@ func (j *BasicJob) GetResultSize() int64 { return j.ResultSize } +// nil until result is ready func (j *BasicJob) GetResultID() *string { j.Lock() defer j.Unlock() diff --git a/src/server/main.go b/src/server/main.go index 76d3f180..2379bd2e 100644 --- a/src/server/main.go +++ b/src/server/main.go @@ -104,6 +104,9 @@ func configureBucket() storage.Storage { case "GCS", "": log.Info().Msg("Using GCS storage backend") bucket = storage.NewGoogleCloudStorage() + case "SNOWFLAKE": + log.Info().Msg("Using SNOWFLAKE query result cache") + bucket = storage.NewSnowflakeStorage() default: log.Fatal().Str("DEKART_STORAGE", os.Getenv("DEKART_STORAGE")).Msg("Unknown storage backend") } diff --git a/src/server/snowflakejob/snowflakejob.go b/src/server/snowflakejob/snowflakejob.go index 282f85fe..0fa0673c 100644 --- a/src/server/snowflakejob/snowflakejob.go +++ b/src/server/snowflakejob/snowflakejob.go @@ -5,11 +5,10 @@ import ( "database/sql" "dekart/src/proto" "dekart/src/server/job" + "dekart/src/server/snowflakeutils" "dekart/src/server/storage" "encoding/csv" - "fmt" "io" - "os" "regexp" "sync" @@ -19,9 +18,10 @@ import ( type Job struct { job.BasicJob - snowflakeDb *sql.DB - storageObject storage.StorageObject - dataSourceName string + snowflakeDb *sql.DB + storageObject storage.StorageObject + connector sf.Connector + isSnowflakeStorageObject bool } type Store struct { @@ -82,7 +82,7 @@ func (j *Job) close(storageWriter io.WriteCloser, csvWriter *csv.Writer) { j.Lock() j.ResultSize = *resultSize jobID := j.GetID() - j.ResultID = &jobID + j.ResultID = &jobID // results available now j.Unlock() j.Status() <- int32(proto.Query_JOB_STATUS_DONE) j.Cancel() @@ -98,7 +98,7 @@ func (j *Job) fetchQueryMetadata(queryIDChan chan string, resultsReady chan bool j.Logger.Warn().Msg("Context Done before query status received") return case <-resultsReady: - conn, err := j.snowflakeDb.Driver().Open(j.dataSourceName) + conn, err := j.connector.Connect(ctx) if err != nil { j.Logger.Err(err).Send() j.CancelWithError(err) @@ -111,6 +111,10 @@ func (j *Job) fetchQueryMetadata(queryIDChan chan string, resultsReady chan bool return } j.Lock() + if j.isSnowflakeStorageObject { + // when using SNOWFLAKE storage, resultID is same as queryID and available immediately + j.ResultID = &queryID + } j.ProcessedBytes = status.ScanBytes j.Unlock() return @@ -123,81 +127,78 @@ func (j *Job) fetchQueryMetadata(queryIDChan chan string, resultsReady chan bool func (j *Job) Run(storageObject storage.StorageObject, connection *proto.Connection) error { j.storageObject = storageObject + _, isSnowflakeStorageObject := j.storageObject.(storage.SnowflakeStorageObject) + j.isSnowflakeStorageObject = isSnowflakeStorageObject queryIDChan := make(chan string) resultsReady := make(chan bool) metadataWg := &sync.WaitGroup{} metadataWg.Add(1) go j.fetchQueryMetadata(queryIDChan, resultsReady, metadataWg) j.Status() <- int32(proto.Query_JOB_STATUS_RUNNING) + // TODO will this just work: queryID = rows1.(sf.SnowflakeRows).GetQueryID() + // https://pkg.go.dev/github.com/snowflakedb/gosnowflake#hdr-Fetch_Results_by_Query_ID rows, err := j.snowflakeDb.QueryContext( sf.WithQueryIDChan(j.GetCtx(), queryIDChan), j.QueryText, ) if err != nil { + j.Logger.Debug().Err(err).Msg("Error querying snowflake") j.CancelWithError(err) return nil // it's ok, since these are query errors } defer rows.Close() - csvRows := make(chan []string, 10) //j.TotalRows? - go j.write(csvRows) + if isSnowflakeStorageObject { // no need to write to storage, use temp query results storage + resultsReady <- true + defer (func() { + j.Status() <- int32(proto.Query_JOB_STATUS_DONE) + })() + } else { + csvRows := make(chan []string, 10) //j.TotalRows? + + go j.write(csvRows) + + firstRow := true + for rows.Next() { + if firstRow { + firstRow = false + resultsReady <- true + j.Status() <- int32(proto.Query_JOB_STATUS_READING_RESULTS) + columnNames, err := snowflakeutils.GetColumns(rows) + if err != nil { + j.Logger.Error().Err(err).Msg("Error getting column names") + j.CancelWithError(err) + return err + } + csvRows <- columnNames + } - firstRow := true - for rows.Next() { - columnTypes, err := rows.ColumnTypes() - if err != nil { - j.Logger.Error().Err(err).Msg("Error getting column types") - j.CancelWithError(err) - return err - } - if firstRow { - firstRow = false - resultsReady <- true - j.Status() <- int32(proto.Query_JOB_STATUS_READING_RESULTS) - columnNames := make([]string, len(columnTypes)) - for i, columnType := range columnTypes { - columnNames[i] = columnType.Name() + csvRow, err := snowflakeutils.GetRow(rows) + if err != nil { + j.Logger.Error().Err(err).Msg("Error getting column names") + j.CancelWithError(err) + return err } - csvRows <- columnNames + csvRows <- csvRow } - csvRow := make([]string, len(columnTypes)) - values := make([]interface{}, len(columnTypes)) - for i := range columnTypes { - values[i] = new(sql.NullString) - } - rows.Scan(values...) - - for i := range columnTypes { - value := values[i] - switch x := value.(type) { - case *sql.NullString: - csvRow[i] = x.String - default: - return fmt.Errorf("incorrect type of data: %T", x) - } + if firstRow { + // unblock fetchQueryMetadata if no rows + resultsReady <- true } - csvRows <- csvRow - } - if firstRow { - // unblock fetchQueryMetadata if no rows - resultsReady <- true + defer close(csvRows) } + metadataWg.Wait() // do not close context until metadata is fetched - close(csvRows) //better to close in defer? return nil } func (s *Store) Create(reportID string, queryID string, queryText string, userCtx context.Context) (job.Job, chan int32, error) { - dataSourceName := fmt.Sprintf( - "%s:%s@%s", - os.Getenv("DEKART_SNOWFLAKE_USER"), - os.Getenv("DEKART_SNOWFLAKE_PASSWORD"), - os.Getenv("DEKART_SNOWFLAKE_ACCOUNT_ID"), - ) - db, err := sql.Open("snowflake", dataSourceName) + connector := snowflakeutils.GetConnector() + db := sql.OpenDB(connector) + err := db.Ping() if err != nil { - log.Error().Err(err).Msg("failed to connect to snowflake") + log.Error().Err(err).Msg("Failed to ping snowflake") return nil, nil, err } job := &Job{ @@ -207,8 +208,8 @@ func (s *Store) Create(reportID string, queryID string, queryText string, userCt QueryText: queryText, Logger: log.With().Str("reportID", reportID).Str("queryID", queryID).Logger(), }, - snowflakeDb: db, - dataSourceName: dataSourceName, + snowflakeDb: db, + connector: connector, } job.Init(userCtx) s.StoreJob(job) diff --git a/src/server/snowflakeutils/snowflakeutils.go b/src/server/snowflakeutils/snowflakeutils.go new file mode 100644 index 00000000..b5a600fc --- /dev/null +++ b/src/server/snowflakeutils/snowflakeutils.go @@ -0,0 +1,156 @@ +package snowflakeutils + +import ( + "database/sql" + "encoding/csv" + "fmt" + "os" + + "github.com/rs/zerolog/log" + sf "github.com/snowflakedb/gosnowflake" +) + +// https://docs.snowflake.com/en/developer-guide/snowpark-container-services/additional-considerations-services-jobs#connecting-to-snowflake +func readSnowparkToken() string { + _, err := os.Stat("/snowflake/session/token") + if os.IsNotExist(err) { + return "" + } + token, err := os.ReadFile("/snowflake/session/token") + if err != nil { + log.Error().Err(err).Msg("failed to read token") + return "" + } + return string(token) +} + +func getConfig() sf.Config { + token := readSnowparkToken() + dekartSnowflakeUser := os.Getenv("DEKART_SNOWFLAKE_USER") + if dekartSnowflakeUser != "" { + log.Debug().Msg("Using snowflake password") + return sf.Config{ + Account: os.Getenv("DEKART_SNOWFLAKE_ACCOUNT_ID"), + User: dekartSnowflakeUser, + Password: os.Getenv("DEKART_SNOWFLAKE_PASSWORD"), + Params: map[string]*string{}, + } + } + if token != "" { + log.Debug().Msg("Using snowpark token") + // https://docs.snowflake.com/en/developer-guide/snowpark-container-services/tutorials/tutorial-2#main-py-file + return sf.Config{ + Account: os.Getenv("SNOWFLAKE_ACCOUNT"), + Host: os.Getenv("SNOWFLAKE_HOST"), + Database: os.Getenv("SNOWFLAKE_DATABASE"), + Schema: os.Getenv("SNOWFLAKE_SCHEMA"), + Authenticator: sf.AuthTypeOAuth, + Token: token, + Params: map[string]*string{}, + InsecureMode: true, + } + } + log.Fatal().Msg("No snowflake credentials found") + return sf.Config{} +} + +// GetConnector returns a snowflake connector +func GetConnector() sf.Connector { + config := getConfig() + driver := sf.SnowflakeDriver{} + return sf.NewConnector(driver, config) +} + +func GetColumns(rows *sql.Rows) ([]string, error) { + columnTypes, err := rows.ColumnTypes() + if err != nil { + log.Error().Err(err).Msg("Error getting column types") + return nil, err + } + columnNames := make([]string, len(columnTypes)) + for i, columnType := range columnTypes { + columnNames[i] = columnType.Name() + } + return columnNames, nil +} + +func GetRow(rows *sql.Rows) ([]string, error) { + columnTypes, err := rows.ColumnTypes() + if err != nil { + log.Error().Err(err).Msg("Error getting column types") + return nil, err + } + csvRow := make([]string, len(columnTypes)) + values := make([]interface{}, len(columnTypes)) + for i := range columnTypes { + values[i] = new(sql.NullString) + } + rows.Scan(values...) + + for i := range columnTypes { + value := values[i] + switch x := value.(type) { + case *sql.NullString: + csvRow[i] = x.String + default: + return nil, fmt.Errorf("incorrect type of data: %T", x) + } + } + return csvRow, nil +} + +// ParseRows reads rows from a sql.Rows object and writes them to a csv.Writer or a channel of csv rows +func ParseRows(rows *sql.Rows, csvWriter *csv.Writer, csvRows chan []string) (bool, error) { + firstRow := true + for rows.Next() { + columnTypes, err := rows.ColumnTypes() + if err != nil { + log.Error().Err(err).Msg("Error getting column types") + return firstRow, err + } + if firstRow { + firstRow = false + columnNames := make([]string, len(columnTypes)) + for i, columnType := range columnTypes { + columnNames[i] = columnType.Name() + } + if csvWriter != nil { + err := csvWriter.Write(columnNames) + if err != nil { + log.Error().Err(err).Msg("Error writing column names") + return firstRow, err + } + } else { + csvRows <- columnNames + } + + } + + csvRow := make([]string, len(columnTypes)) + values := make([]interface{}, len(columnTypes)) + for i := range columnTypes { + values[i] = new(sql.NullString) + } + rows.Scan(values...) + + for i := range columnTypes { + value := values[i] + switch x := value.(type) { + case *sql.NullString: + csvRow[i] = x.String + default: + return firstRow, fmt.Errorf("incorrect type of data: %T", x) + } + } + if csvWriter != nil { + err = csvWriter.Write(csvRow) + if err != nil { + log.Error().Err(err).Msg("Error writing column names") + return firstRow, err + } + } else { + csvRows <- csvRow + } + } + return firstRow, nil +} diff --git a/src/server/storage/snowflakestorage.go b/src/server/storage/snowflakestorage.go new file mode 100644 index 00000000..1c8c181e --- /dev/null +++ b/src/server/storage/snowflakestorage.go @@ -0,0 +1,143 @@ +package storage + +import ( + "context" + "database/sql" + "dekart/src/server/snowflakeutils" + "encoding/csv" + "io" + "strings" + "time" + + "github.com/rs/zerolog/log" + sf "github.com/snowflakedb/gosnowflake" +) + +type SnowflakeStorage struct { + // logger zerolog.Logger +} + +func NewSnowflakeStorage() *SnowflakeStorage { + return &SnowflakeStorage{} +} + +func (s *SnowflakeStorage) CanSaveQuery() bool { + return false +} + +func (s *SnowflakeStorage) GetObject(_ string, queryID string) StorageObject { + return NewSnowflakeStorageObject(queryID) +} + +// NewSnowflakeStorageObject +func NewSnowflakeStorageObject(fileName string) StorageObject { + connector := snowflakeutils.GetConnector() + parts := strings.Split(fileName, ".") + queryID := parts[0] //extract queryID from fileName like 01b3b0ae-0102-9b06-0001-c28e001599fe.csv + + return SnowflakeStorageObject{queryID: queryID, connector: connector} +} + +// SnowflakeStorageObject +type SnowflakeStorageObject struct { + queryID string + connector sf.Connector +} + +func (s SnowflakeStorageObject) CanSaveQuery() bool { + return false +} + +func (s SnowflakeStorageObject) GetReader(ctx context.Context) (io.ReadCloser, error) { + log.Debug().Str("queryID", s.queryID).Msg("GetReader") + fetchResultByIDCtx := sf.WithFetchResultByID(ctx, s.queryID) + db := sql.OpenDB(s.connector) + rows, err := db.QueryContext(fetchResultByIDCtx, "") + if err != nil { + if sfErr, ok := err.(*sf.SnowflakeError); ok { + if sfErr.Number == 612 { + return nil, &ExpiredError{} + } + } + log.Error().Err(err).Msg("failed to query snowflake") + return nil, err + } + pr, pw := io.Pipe() + csvWriter := csv.NewWriter(pw) + go func() { + defer rows.Close() + firstRow := true + for rows.Next() { + if firstRow { + firstRow = false + columnNames, err := snowflakeutils.GetColumns(rows) + if err != nil { + log.Error().Err(err).Msg("Error getting column names") + return + } + err = csvWriter.Write(columnNames) + if err != nil { + log.Error().Err(err).Msg("Error writing column names") + return + } + + } + csvRow, err := snowflakeutils.GetRow(rows) + if err != nil { + log.Error().Err(err).Msg("Error getting row") + return + } + err = csvWriter.Write(csvRow) + if err != nil { + log.Error().Err(err).Msg("Error writing column names") + return + } + } + csvWriter.Flush() + err := pw.Close() + if err != nil { + log.Error().Err(err).Msg("Error closing pipe writer") + } + }() + + return pr, nil +} + +func (s SnowflakeStorageObject) GetWriter(ctx context.Context) io.WriteCloser { + log.Fatal().Msg("not implemented") + return nil +} + +func (s SnowflakeStorageObject) GetCreatedAt(ctx context.Context) (*time.Time, error) { + conn, err := s.connector.Connect(ctx) + if err != nil { + return nil, err + } + defer conn.Close() + status, err := conn.(sf.SnowflakeConnection).GetQueryStatus(ctx, s.queryID) + + if err != nil { + log.Warn().Err(err).Msg("failed to get query status") + return nil, &ExpiredError{} + } + createdAt := time.Unix(status.EndTime/1000, 0) + + log.Debug().Str("queryID", s.queryID).Time("createdAt", createdAt).Msg("GetCreatedAt") + + //check if query is older than 1 day (minus 1 hour for safety) + if time.Since(createdAt) > 23*time.Hour { + return nil, &ExpiredError{} + } + return &createdAt, nil +} + +// GetSize(context.Context) (*int64, error) +func (s SnowflakeStorageObject) GetSize(ctx context.Context) (*int64, error) { + return nil, nil +} + +// CopyFromS3(ctx context.Context, source string) error +func (s SnowflakeStorageObject) CopyFromS3(ctx context.Context, source string) error { + log.Fatal().Msg("not implemented") + return nil +} diff --git a/src/server/storage/storage.go b/src/server/storage/storage.go index aef39dc0..5ba9b54e 100644 --- a/src/server/storage/storage.go +++ b/src/server/storage/storage.go @@ -31,6 +31,15 @@ type StorageObject interface { type Storage interface { GetObject(string, string) StorageObject + CanSaveQuery() bool +} + +// Expired Error is returned when temp storage is expired +type ExpiredError struct { +} + +func (e *ExpiredError) Error() string { + return "expired" } func GetBucketName(userBucketName string) string { @@ -54,6 +63,11 @@ type GoogleCloudStorage struct { logger zerolog.Logger } +// CanSaveQuery returns true if the storage can save SQL query text +func (s GoogleCloudStorage) CanSaveQuery() bool { + return true +} + func (s GoogleCloudStorage) GetDefaultBucketName() string { return s.defaultBucketName } @@ -210,6 +224,10 @@ func NewS3Storage() Storage { } } +func (s S3Storage) CanSaveQuery() bool { + return true +} + func (s S3Storage) GetDefaultBucketName() string { return s.bucketName } From f691185c6b4020c9d0cc9fc934789a70bbe9d50d Mon Sep 17 00:00:00 2001 From: Vladi Bilonenko Date: Thu, 25 Apr 2024 07:24:47 +0200 Subject: [PATCH 2/3] Fix query text retrieval for SNOWFLAKE storage --- src/server/dekart/query.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/server/dekart/query.go b/src/server/dekart/query.go index 39b5c1ed..e143a4eb 100644 --- a/src/server/dekart/query.go +++ b/src/server/dekart/query.go @@ -102,7 +102,8 @@ func (s Server) RunAllQueries(ctx context.Context, req *proto.RunAllQueriesReque `select queries.id, queries.query_source_id, - datasets.connection_id + datasets.connection_id, + queries.query_text from queries left join datasets on queries.id = datasets.query_id left join reports on (datasets.report_id = reports.id or queries.report_id = reports.id) @@ -124,8 +125,9 @@ func (s Server) RunAllQueries(ctx context.Context, req *proto.RunAllQueriesReque for queriesRows.Next() { var queryID string var querySourceId string + var queryText string var connectionID sql.NullString - err := queriesRows.Scan(&queryID, &querySourceId, &connectionID) + err := queriesRows.Scan(&queryID, &querySourceId, &connectionID, &queryText) if err != nil { log.Err(err).Send() return nil, status.Error(codes.Internal, err.Error()) @@ -142,6 +144,7 @@ func (s Server) RunAllQueries(ctx context.Context, req *proto.RunAllQueriesReque queryID: queryID, connection: connection, bucketName: bucketName, + queryText: queryText, }) } @@ -155,12 +158,15 @@ func (s Server) RunAllQueries(ctx context.Context, req *proto.RunAllQueriesReque for i := range queries { go func(i int) { - queryText, err := s.getQueryText(ctx, querySourceIds[i], queries[i].bucketName) - if err != nil { - res <- err - return + if queries[i].queryText == "" { + // for SNOWFLAKE storage queryText is stored in db + queryText, err := s.getQueryText(ctx, querySourceIds[i], queries[i].bucketName) + if err != nil { + res <- err + return + } + queries[i].queryText = queryText } - queries[i].queryText = queryText err = s.runQuery(ctx, queries[i]) res <- err }(i) From c6460ca4a37bacbfb44fe11634aa25bde1d98cc8 Mon Sep 17 00:00:00 2001 From: Vladi Bilonenko Date: Thu, 25 Apr 2024 08:05:03 +0200 Subject: [PATCH 3/3] Snowflake storage tests --- .github/workflows/e2e.yaml | 50 ++++++++++++++++++- Makefile | 25 +++++++++- .../cancelQuery.cy.js | 0 .../emptyResult.cy.js | 0 .../{snowflake => snowflake-s3}/fork.cy.js | 0 cypress/e2e/snowflake-s3/happyPath.cy.js | 16 ++++++ .../{snowflake => snowflake-s3}/spec.cy.js | 0 cypress/e2e/snowflake/happyPath.cy.js | 3 -- cypress/e2e/snowflake/runAllQueries.cy.js | 18 +++++++ src/client/ReportHeaderButtons.js | 1 + src/server/dekart/connection.go | 3 -- 11 files changed, 107 insertions(+), 9 deletions(-) rename cypress/e2e/{snowflake => snowflake-s3}/cancelQuery.cy.js (100%) rename cypress/e2e/{snowflake => snowflake-s3}/emptyResult.cy.js (100%) rename cypress/e2e/{snowflake => snowflake-s3}/fork.cy.js (100%) create mode 100644 cypress/e2e/snowflake-s3/happyPath.cy.js rename cypress/e2e/{snowflake => snowflake-s3}/spec.cy.js (100%) create mode 100644 cypress/e2e/snowflake/runAllQueries.cy.js diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 05b832a6..0533e56b 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -188,7 +188,7 @@ jobs: with: name: athena path: /tmp/cypress - snowflake: + snowflake-s3: runs-on: ubuntu-latest needs: build services: @@ -229,6 +229,54 @@ jobs: -e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \ -e DEKART_ALLOW_FILE_UPLOAD=1 \ -e DEKART_CORS_ORIGIN=http://localhost:3000 \ + -e TEST_SPEC=cypress/e2e/snowflake-s3 \ + -e CYPRESS_CI=1 \ + -e DEKART_SNOWFLAKE_ACCOUNT_ID=${{ secrets.SNOWFLAKE_ACCOUNT_ID }} \ + -e DEKART_SNOWFLAKE_USER=${{ secrets.SNOWFLAKE_USER }} \ + -e DEKART_SNOWFLAKE_PASSWORD=${{ secrets.SNOWFLAKE_PASSWORD }} \ + dekartxyz/dekart:${{ env.IMAGE_CACHE_KEY }} + - name: Upload cypress artifacts + if: failure() + uses: actions/upload-artifact@v3 + with: + name: snowflake-s3 + path: /tmp/cypress + snowflake: + runs-on: ubuntu-latest + needs: build + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: ${{ env.DEKART_POSTGRES_PASSWORD }} + POSTGRES_USER: ${{ env.DEKART_POSTGRES_USER }} + POSTGRES_DB: ${{ env.DEKART_POSTGRES_DB }} + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: E2E test Snowflake + run: | + docker run -i \ + --network="host" \ + -v /tmp/cypress/videos:/dekart/cypress/videos/ \ + -v /tmp/cypress/screenshots:/dekart/cypress/screenshots/ \ + -e DEKART_LOG_DEBUG=1 \ + -e DEKART_POSTGRES_USER=$DEKART_POSTGRES_USER \ + -e DEKART_POSTGRES_PASSWORD=$DEKART_POSTGRES_PASSWORD \ + -e DEKART_POSTGRES_PORT=$DEKART_POSTGRES_PORT \ + -e DEKART_POSTGRES_HOST=localhost \ + -e DEKART_STORAGE=SNOWFLAKE \ + -e DEKART_DATASOURCE=SNOWFLAKE \ + -e DEKART_CORS_ORIGIN=null \ -e TEST_SPEC=cypress/e2e/snowflake \ -e CYPRESS_CI=1 \ -e DEKART_SNOWFLAKE_ACCOUNT_ID=${{ secrets.SNOWFLAKE_ACCOUNT_ID }} \ diff --git a/Makefile b/Makefile index e9ea507d..9f7d9031 100644 --- a/Makefile +++ b/Makefile @@ -108,10 +108,9 @@ athena: ${DEKART_DOCKER_E2E_TAG} -snowflake: +snowflake-s3: docker buildx build --tag ${DEKART_DOCKER_E2E_TAG} -o type=image -f ./Dockerfile --target e2etest . docker run -it --rm \ - -v ${GOOGLE_APPLICATION_CREDENTIALS}:${GOOGLE_APPLICATION_CREDENTIALS} \ -v $$(pwd)/cypress/videos:/dekart/cypress/videos/ \ -v $$(pwd)/cypress/screenshots:/dekart/cypress/screenshots/ \ -e DEKART_POSTGRES_DB=${DEKART_POSTGRES_DB} \ @@ -131,6 +130,28 @@ snowflake: -e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \ -e DEKART_ALLOW_FILE_UPLOAD=1 \ -e DEKART_CORS_ORIGIN=http://localhost:3000 \ + -e TEST_SPEC=cypress/e2e/snowflake-s3 \ + ${DEKART_DOCKER_E2E_TAG} + +snowflake: + docker buildx build --tag ${DEKART_DOCKER_E2E_TAG} -o type=image -f ./Dockerfile --target e2etest . + docker run -it --rm \ + -v ${GOOGLE_APPLICATION_CREDENTIALS}:${GOOGLE_APPLICATION_CREDENTIALS} \ + -v $$(pwd)/cypress/videos:/dekart/cypress/videos/ \ + -v $$(pwd)/cypress/screenshots:/dekart/cypress/screenshots/ \ + -e DEKART_POSTGRES_DB=${DEKART_POSTGRES_DB} \ + -e DEKART_POSTGRES_USER=${DEKART_POSTGRES_USER} \ + -e DEKART_POSTGRES_PASSWORD=${DEKART_POSTGRES_PASSWORD} \ + -e DEKART_POSTGRES_PORT=${DEKART_POSTGRES_PORT} \ + -e DEKART_POSTGRES_HOST=host.docker.internal \ + -e DEKART_MAPBOX_TOKEN=${DEKART_MAPBOX_TOKEN} \ + -e DEKART_STORAGE=SNOWFLAKE \ + -e DEKART_DATASOURCE=SNOWFLAKE \ + -e DEKART_SNOWFLAKE_ACCOUNT_ID=${DEKART_SNOWFLAKE_ACCOUNT_ID} \ + -e DEKART_SNOWFLAKE_USER=${DEKART_SNOWFLAKE_USER} \ + -e DEKART_SNOWFLAKE_PASSWORD=${DEKART_SNOWFLAKE_PASSWORD} \ + -e DEKART_CLOUD_STORAGE_BUCKET=${DEKART_CLOUD_STORAGE_BUCKET} \ + -e DEKART_CORS_ORIGIN=http://localhost:3000 \ -e TEST_SPEC=cypress/e2e/snowflake \ ${DEKART_DOCKER_E2E_TAG} diff --git a/cypress/e2e/snowflake/cancelQuery.cy.js b/cypress/e2e/snowflake-s3/cancelQuery.cy.js similarity index 100% rename from cypress/e2e/snowflake/cancelQuery.cy.js rename to cypress/e2e/snowflake-s3/cancelQuery.cy.js diff --git a/cypress/e2e/snowflake/emptyResult.cy.js b/cypress/e2e/snowflake-s3/emptyResult.cy.js similarity index 100% rename from cypress/e2e/snowflake/emptyResult.cy.js rename to cypress/e2e/snowflake-s3/emptyResult.cy.js diff --git a/cypress/e2e/snowflake/fork.cy.js b/cypress/e2e/snowflake-s3/fork.cy.js similarity index 100% rename from cypress/e2e/snowflake/fork.cy.js rename to cypress/e2e/snowflake-s3/fork.cy.js diff --git a/cypress/e2e/snowflake-s3/happyPath.cy.js b/cypress/e2e/snowflake-s3/happyPath.cy.js new file mode 100644 index 00000000..34739008 --- /dev/null +++ b/cypress/e2e/snowflake-s3/happyPath.cy.js @@ -0,0 +1,16 @@ +/* eslint-disable no-undef */ +import copy from '../../fixtures/copy.json' + +describe('happy path', () => { + it('should make simple snowflake query and get ready status', () => { + cy.visit('/') + cy.get(`button:contains("${copy.create_report}")`).click() + cy.get('button:contains("Add data from...")').click() + cy.get('span:contains("SQL query")').click() + cy.get('textarea').type(copy.simple_snowflake_query, { force: true }) + cy.get(`button:contains("${copy.execute}")`).click() + cy.get(`span:contains("${copy.ready}")`, { timeout: 20000 }).should('be.visible') + cy.get(`span:contains("${copy.downloading}")`).should('contain', 'kB') // size of result shown + cy.get('div:contains("100 rows")', { timeout: 20000 }).should('be.visible') + }) +}) diff --git a/cypress/e2e/snowflake/spec.cy.js b/cypress/e2e/snowflake-s3/spec.cy.js similarity index 100% rename from cypress/e2e/snowflake/spec.cy.js rename to cypress/e2e/snowflake-s3/spec.cy.js diff --git a/cypress/e2e/snowflake/happyPath.cy.js b/cypress/e2e/snowflake/happyPath.cy.js index 34739008..ec91be84 100644 --- a/cypress/e2e/snowflake/happyPath.cy.js +++ b/cypress/e2e/snowflake/happyPath.cy.js @@ -5,12 +5,9 @@ describe('happy path', () => { it('should make simple snowflake query and get ready status', () => { cy.visit('/') cy.get(`button:contains("${copy.create_report}")`).click() - cy.get('button:contains("Add data from...")').click() - cy.get('span:contains("SQL query")').click() cy.get('textarea').type(copy.simple_snowflake_query, { force: true }) cy.get(`button:contains("${copy.execute}")`).click() cy.get(`span:contains("${copy.ready}")`, { timeout: 20000 }).should('be.visible') - cy.get(`span:contains("${copy.downloading}")`).should('contain', 'kB') // size of result shown cy.get('div:contains("100 rows")', { timeout: 20000 }).should('be.visible') }) }) diff --git a/cypress/e2e/snowflake/runAllQueries.cy.js b/cypress/e2e/snowflake/runAllQueries.cy.js new file mode 100644 index 00000000..01cc2c43 --- /dev/null +++ b/cypress/e2e/snowflake/runAllQueries.cy.js @@ -0,0 +1,18 @@ +/* eslint-disable no-undef */ +import copy from '../../fixtures/copy.json' + +describe('run all queries', () => { + it('should run all queries', () => { + cy.visit('/') + cy.get(`button:contains("${copy.create_report}")`).click() + cy.get('textarea').type('SELECT ROUND(uniform(-90::float, 90::float, random()), 6) AS lat, ROUND(uniform(-180::float, 180::float, random()), 6) AS lon FROM TABLE(GENERATOR(ROWCOUNT => 1000))', { force: true }) + cy.get(`button:contains("${copy.execute}")`).click() + cy.get(`span:contains("${copy.ready}")`, { timeout: 20000 }).should('be.visible') + cy.get('div:contains("1,000 rows")', { timeout: 20000 }).should('be.visible') + + cy.get('button#dekart-refresh-button').click() + cy.get('#dekart-query-status-message').should('contain', 'Running') + cy.get('#dekart-query-status-message').should('contain', 'Ready') + cy.get('div:contains("1,000 rows")', { timeout: 20000 }).should('be.visible') + }) +}) diff --git a/src/client/ReportHeaderButtons.js b/src/client/ReportHeaderButtons.js index ed063360..12b87f43 100644 --- a/src/client/ReportHeaderButtons.js +++ b/src/client/ReportHeaderButtons.js @@ -66,6 +66,7 @@ function RefreshButton () { } return (