Skip to content

Commit

Permalink
Merge pull request #179 from dekart-xyz/snowflake-storage
Browse files Browse the repository at this point in the history
SNOWFLAKE storage backend support
  • Loading branch information
delfrrr committed Apr 25, 2024
2 parents 2d19e43 + c6460ca commit b72ab34
Show file tree
Hide file tree
Showing 33 changed files with 731 additions and 202 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ DEKART_UX_NOT_FOUND_ERROR_INFO_HTML=
DEKART_UX_SAMPLE_QUERY_SQL=

#dev
DEKART_STREAM_TIMEOUT=
DEKART_LOG_DEBUG=1
DEKART_LOG_PRETTY=1
DEKART_STATIC_FILES=
Expand Down
50 changes: 49 additions & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ jobs:
with:
name: athena
path: /tmp/cypress
snowflake:
snowflake-s3:
runs-on: ubuntu-latest
needs: build
services:
Expand Down Expand Up @@ -229,6 +229,54 @@ jobs:
-e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \
-e DEKART_ALLOW_FILE_UPLOAD=1 \
-e DEKART_CORS_ORIGIN=http://localhost:3000 \
-e TEST_SPEC=cypress/e2e/snowflake-s3 \
-e CYPRESS_CI=1 \
-e DEKART_SNOWFLAKE_ACCOUNT_ID=${{ secrets.SNOWFLAKE_ACCOUNT_ID }} \
-e DEKART_SNOWFLAKE_USER=${{ secrets.SNOWFLAKE_USER }} \
-e DEKART_SNOWFLAKE_PASSWORD=${{ secrets.SNOWFLAKE_PASSWORD }} \
dekartxyz/dekart:${{ env.IMAGE_CACHE_KEY }}
- name: Upload cypress artifacts
if: failure()
uses: actions/upload-artifact@v3
with:
name: snowflake-s3
path: /tmp/cypress
snowflake:
runs-on: ubuntu-latest
needs: build
services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: ${{ env.DEKART_POSTGRES_PASSWORD }}
POSTGRES_USER: ${{ env.DEKART_POSTGRES_USER }}
POSTGRES_DB: ${{ env.DEKART_POSTGRES_DB }}
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: E2E test Snowflake
run: |
docker run -i \
--network="host" \
-v /tmp/cypress/videos:/dekart/cypress/videos/ \
-v /tmp/cypress/screenshots:/dekart/cypress/screenshots/ \
-e DEKART_LOG_DEBUG=1 \
-e DEKART_POSTGRES_USER=$DEKART_POSTGRES_USER \
-e DEKART_POSTGRES_PASSWORD=$DEKART_POSTGRES_PASSWORD \
-e DEKART_POSTGRES_PORT=$DEKART_POSTGRES_PORT \
-e DEKART_POSTGRES_HOST=localhost \
-e DEKART_STORAGE=SNOWFLAKE \
-e DEKART_DATASOURCE=SNOWFLAKE \
-e DEKART_CORS_ORIGIN=null \
-e TEST_SPEC=cypress/e2e/snowflake \
-e CYPRESS_CI=1 \
-e DEKART_SNOWFLAKE_ACCOUNT_ID=${{ secrets.SNOWFLAKE_ACCOUNT_ID }} \
Expand Down
25 changes: 23 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ athena:
${DEKART_DOCKER_E2E_TAG}


snowflake:
snowflake-s3:
docker buildx build --tag ${DEKART_DOCKER_E2E_TAG} -o type=image -f ./Dockerfile --target e2etest .
docker run -it --rm \
-v ${GOOGLE_APPLICATION_CREDENTIALS}:${GOOGLE_APPLICATION_CREDENTIALS} \
-v $$(pwd)/cypress/videos:/dekart/cypress/videos/ \
-v $$(pwd)/cypress/screenshots:/dekart/cypress/screenshots/ \
-e DEKART_POSTGRES_DB=${DEKART_POSTGRES_DB} \
Expand All @@ -131,6 +130,28 @@ snowflake:
-e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
-e DEKART_ALLOW_FILE_UPLOAD=1 \
-e DEKART_CORS_ORIGIN=http://localhost:3000 \
-e TEST_SPEC=cypress/e2e/snowflake-s3 \
${DEKART_DOCKER_E2E_TAG}

snowflake:
docker buildx build --tag ${DEKART_DOCKER_E2E_TAG} -o type=image -f ./Dockerfile --target e2etest .
docker run -it --rm \
-v ${GOOGLE_APPLICATION_CREDENTIALS}:${GOOGLE_APPLICATION_CREDENTIALS} \
-v $$(pwd)/cypress/videos:/dekart/cypress/videos/ \
-v $$(pwd)/cypress/screenshots:/dekart/cypress/screenshots/ \
-e DEKART_POSTGRES_DB=${DEKART_POSTGRES_DB} \
-e DEKART_POSTGRES_USER=${DEKART_POSTGRES_USER} \
-e DEKART_POSTGRES_PASSWORD=${DEKART_POSTGRES_PASSWORD} \
-e DEKART_POSTGRES_PORT=${DEKART_POSTGRES_PORT} \
-e DEKART_POSTGRES_HOST=host.docker.internal \
-e DEKART_MAPBOX_TOKEN=${DEKART_MAPBOX_TOKEN} \
-e DEKART_STORAGE=SNOWFLAKE \
-e DEKART_DATASOURCE=SNOWFLAKE \
-e DEKART_SNOWFLAKE_ACCOUNT_ID=${DEKART_SNOWFLAKE_ACCOUNT_ID} \
-e DEKART_SNOWFLAKE_USER=${DEKART_SNOWFLAKE_USER} \
-e DEKART_SNOWFLAKE_PASSWORD=${DEKART_SNOWFLAKE_PASSWORD} \
-e DEKART_CLOUD_STORAGE_BUCKET=${DEKART_CLOUD_STORAGE_BUCKET} \
-e DEKART_CORS_ORIGIN=http://localhost:3000 \
-e TEST_SPEC=cypress/e2e/snowflake \
${DEKART_DOCKER_E2E_TAG}

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
16 changes: 16 additions & 0 deletions cypress/e2e/snowflake-s3/happyPath.cy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* eslint-disable no-undef */
import copy from '../../fixtures/copy.json'

describe('happy path', () => {
it('should make simple snowflake query and get ready status', () => {
cy.visit('/')
cy.get(`button:contains("${copy.create_report}")`).click()
cy.get('button:contains("Add data from...")').click()
cy.get('span:contains("SQL query")').click()
cy.get('textarea').type(copy.simple_snowflake_query, { force: true })
cy.get(`button:contains("${copy.execute}")`).click()
cy.get(`span:contains("${copy.ready}")`, { timeout: 20000 }).should('be.visible')
cy.get(`span:contains("${copy.downloading}")`).should('contain', 'kB') // size of result shown
cy.get('div:contains("100 rows")', { timeout: 20000 }).should('be.visible')
})
})
File renamed without changes.
3 changes: 0 additions & 3 deletions cypress/e2e/snowflake/happyPath.cy.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ describe('happy path', () => {
it('should make simple snowflake query and get ready status', () => {
cy.visit('/')
cy.get(`button:contains("${copy.create_report}")`).click()
cy.get('button:contains("Add data from...")').click()
cy.get('span:contains("SQL query")').click()
cy.get('textarea').type(copy.simple_snowflake_query, { force: true })
cy.get(`button:contains("${copy.execute}")`).click()
cy.get(`span:contains("${copy.ready}")`, { timeout: 20000 }).should('be.visible')
cy.get(`span:contains("${copy.downloading}")`).should('contain', 'kB') // size of result shown
cy.get('div:contains("100 rows")', { timeout: 20000 }).should('be.visible')
})
})
18 changes: 18 additions & 0 deletions cypress/e2e/snowflake/runAllQueries.cy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* eslint-disable no-undef */
import copy from '../../fixtures/copy.json'

describe('run all queries', () => {
it('should run all queries', () => {
cy.visit('/')
cy.get(`button:contains("${copy.create_report}")`).click()
cy.get('textarea').type('SELECT ROUND(uniform(-90::float, 90::float, random()), 6) AS lat, ROUND(uniform(-180::float, 180::float, random()), 6) AS lon FROM TABLE(GENERATOR(ROWCOUNT => 1000))', { force: true })
cy.get(`button:contains("${copy.execute}")`).click()
cy.get(`span:contains("${copy.ready}")`, { timeout: 20000 }).should('be.visible')
cy.get('div:contains("1,000 rows")', { timeout: 20000 }).should('be.visible')

cy.get('button#dekart-refresh-button').click()
cy.get('#dekart-query-status-message').should('contain', 'Running')
cy.get('#dekart-query-status-message').should('contain', 'Ready')
cy.get('div:contains("1,000 rows")', { timeout: 20000 }).should('be.visible')
})
})
8 changes: 4 additions & 4 deletions src/client/DatasetSettingsModal.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { closeDatasetSettingsModal, removeDataset, updateDatasetName } from './a

function ModalFooter ({ saving, setSaving, name, datasetId }) {
const dispatch = useDispatch()
const numDatasets = useSelector(state => state.datasets.length)
const numDatasets = useSelector(state => state.dataset.list.length)

return (
<div className={styles.modalFooter}>
Expand Down Expand Up @@ -45,11 +45,11 @@ function ModalFooter ({ saving, setSaving, name, datasetId }) {
}

export default function DatasetSettingsModal () {
const datasetId = useSelector(state => state.datasetSettings.datasetId)
const dataset = useSelector(state => state.datasets.find(d => d.id === datasetId))
const datasetId = useSelector(state => state.dataset.settings.datasetId)
const dataset = useSelector(state => state.dataset.list.find(d => d.id === datasetId))
const queries = useSelector(state => state.queries)
const files = useSelector(state => state.files)
const visible = useSelector(state => state.datasetSettings.visible)
const visible = useSelector(state => state.dataset.settings.visible)

const dispatch = useDispatch()

Expand Down
4 changes: 2 additions & 2 deletions src/client/Downloading.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import message from 'antd/es/message'
import { useEffect } from 'react'

function DownloadingMessage () {
const downloadingDatasets = useSelector(state => state.downloadingDatasets)
const downloadingDatasets = useSelector(state => state.dataset.downloading)
const files = useSelector(state => state.files)
const queries = useSelector(state => state.queries)
const size = downloadingDatasets.reduce((size, { queryId, fileId }) => {
Expand All @@ -26,7 +26,7 @@ function DownloadingMessage () {

let hideDownloading = null
export default function Downloading () {
const downloadingDatasets = useSelector(state => state.downloadingDatasets)
const downloadingDatasets = useSelector(state => state.dataset.downloading)
const show = downloadingDatasets.length > 0
const [api, contextHolder] = message.useMessage()
useEffect(() => {
Expand Down
1 change: 1 addition & 0 deletions src/client/ReportHeaderButtons.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ function RefreshButton () {
}
return (
<Button
id='dekart-refresh-button'
type='text'
icon={loadingNumber ? <LoadingOutlined /> : <ReloadOutlined />}
title='Re-run all queries'
Expand Down
4 changes: 2 additions & 2 deletions src/client/ReportPage.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ function getTabPane (dataset, queries, files, status) {
}

function DatasetSection ({ reportId }) {
const datasets = useSelector(state => state.datasets)
const datasets = useSelector(state => state.dataset.list)
const queries = useSelector(state => state.queries)
const files = useSelector(state => state.files)
const activeDataset = useSelector(state => state.activeDataset)
const activeDataset = useSelector(state => state.dataset.active)
const report = useSelector(state => state.report)
const queryStatus = useSelector(state => state.queryStatus)
const { canWrite } = report
Expand Down
28 changes: 20 additions & 8 deletions src/client/actions/dataset.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { CreateDatasetRequest, RemoveDatasetRequest, UpdateDatasetConnectionRequest, UpdateDatasetNameRequest } from '../../proto/dekart_pb'
import { Dekart } from '../../proto/dekart_pb_service'
import { grpcCall } from './grpc'
import { downloading, setError, finishDownloading, success } from './message'
import { downloading, setError, finishDownloading, success, info, warn } from './message'
import { addDataToMap, toggleSidePanel, removeDataset as removeDatasetFromKepler, reorderLayer } from '@dekart-xyz/kepler.gl/dist/actions'
import { processCsvData, processGeojson } from '@dekart-xyz/kepler.gl/dist/processors'
import { get } from '../lib/api'
import { KeplerGlSchema } from '@dekart-xyz/kepler.gl/dist/schemas'
import getDatasetName from '../lib/getDatasetName'
import { runQuery } from './query'

export function createDataset (reportId) {
return (dispatch, getState) => {
Expand All @@ -19,7 +20,7 @@ export function createDataset (reportId) {

export function setActiveDataset (datasetId) {
return (dispatch, getState) => {
const { datasets } = getState()
const { list: datasets } = getState().dataset
const dataset = datasets.find(d => d.id === datasetId) || datasets[0]
if (dataset) {
dispatch({ type: setActiveDataset.name, dataset })
Expand All @@ -29,7 +30,7 @@ export function setActiveDataset (datasetId) {

export function updateDatasetName (datasetId, name) {
return async (dispatch, getState) => {
const { datasets } = getState()
const { list: datasets } = getState().dataset
const dataset = datasets.find(d => d.id === datasetId)
if (!dataset) {
return
Expand All @@ -44,7 +45,7 @@ export function updateDatasetName (datasetId, name) {

export function updateDatasetConnection (datasetId, connectionId) {
return async (dispatch, getState) => {
const { datasets } = getState()
const { list: datasets } = getState().dataset
const dataset = datasets.find(d => d.id === datasetId)
if (!dataset) {
return
Expand All @@ -59,7 +60,7 @@ export function updateDatasetConnection (datasetId, connectionId) {

export function removeDataset (datasetId) {
return async (dispatch, getState) => {
const { datasets, activeDataset } = getState()
const { list: datasets, active: activeDataset } = getState().dataset
if (activeDataset.id === datasetId) {
// removed active query
const datasetsLeft = datasets.filter(q => q.id !== datasetId)
Expand All @@ -81,6 +82,8 @@ export function removeDataset (datasetId) {

export function downloadDataset (dataset, sourceId, extension, prevDatasetsList) {
return async (dispatch, getState) => {
const { dataset: { list: datasets }, files, queries, keplerGl } = getState()
const label = getDatasetName(dataset, queries, files)
dispatch({ type: downloadDataset.name, dataset })
dispatch(downloading(dataset))
const { token } = getState()
Expand All @@ -95,11 +98,20 @@ export function downloadDataset (dataset, sourceId, extension, prevDatasetsList)
data = processGeojson(json)
}
} catch (err) {
dispatch(setError(err))
dispatch(finishDownloading(dataset)) // remove downloading message
if (err.status === 410 && dataset.queryId) { // gone from dw query temporary storage
const { canRun, queryText } = getState().queryStatus[dataset.queryId]
if (!canRun) {
dispatch(warn(<><i>{label}</i> result expired</>, false))
return
}
dispatch(info(<><i>{label}</i> result expired, re-running</>))
dispatch(runQuery(dataset.queryId, queryText))
} else {
dispatch(setError(err))
}
return
}
const { datasets, files, queries, keplerGl } = getState()
const label = getDatasetName(dataset, queries, files)
// check if dataset was already added to kepler
const addedDatasets = getState().keplerGl.kepler.visState.datasets
const prevDataset = prevDatasetsList.find(d => d.id in addedDatasets)
Expand Down
19 changes: 17 additions & 2 deletions src/client/actions/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,25 @@ export function downloading (dataset) {
return { type: downloading.name, dataset }
}

export function finishDownloading (dataset) {
return { type: finishDownloading.name, dataset }
export function finishDownloading (dataset, gone = false) {
return { type: finishDownloading.name, dataset, gone }
}

export function warn (content, transitive = true) {
if (!transitive) {
message.warn({
content,
duration: 10000,
style
})
} else {
message.warn({
content,
style
})
}
return { type: warn.name }
}
export function success (content) {
message.success({
content,
Expand Down
4 changes: 2 additions & 2 deletions src/client/actions/report.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function shouldDownloadQueryText (query, prevQueriesList, queriesList) {
export function reportUpdate (reportStreamResponse) {
const { report, queriesList, datasetsList, filesList } = reportStreamResponse
return async (dispatch, getState) => {
const { queries: prevQueriesList, datasets: prevDatasetsList, report: prevReport, files: prevFileList, env, connection } = getState()
const { queries: prevQueriesList, dataset: { list: prevDatasetsList }, report: prevReport, files: prevFileList, env, connection } = getState()
dispatch({
type: reportUpdate.name,
report,
Expand Down Expand Up @@ -126,7 +126,7 @@ export function reportUpdate (reportStreamResponse) {
let extension = 'csv'
if (dataset.queryId) {
const query = queriesList.find(q => q.id === dataset.queryId)
if (shouldAddQuery(query, prevQueriesList, queriesList) || shouldUpdateDataset(dataset, prevDatasetsList)) {
if (shouldAddQuery(query, prevQueriesList) || shouldUpdateDataset(dataset, prevDatasetsList)) {
dispatch(downloadDataset(
dataset,
query.jobResultId,
Expand Down
2 changes: 1 addition & 1 deletion src/client/lib/shouldAddQuery.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export function shouldAddQuery (query, prevQueriesList, queriesList) {
export function shouldAddQuery (query, prevQueriesList) {
if (!query.jobResultId) {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/reducers/connectionReducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ function userDefined (state = false, action) {
switch (action.type) {
case setEnv.name: {
const { BIGQUERY_PROJECT_ID, CLOUD_STORAGE_BUCKET, DATASOURCE } = action.variables
return (BIGQUERY_PROJECT_ID === '' && DATASOURCE === 'BQ') || CLOUD_STORAGE_BUCKET === ''
return (BIGQUERY_PROJECT_ID === '' && DATASOURCE === 'BQ') || (CLOUD_STORAGE_BUCKET === '' && DATASOURCE !== 'SNOWFLAKE')
}
default:
return state
Expand Down

0 comments on commit b72ab34

Please sign in to comment.