Skip to content

Commit

Permalink
Merge pull request #156 from Tsovak/feature/postgres
Browse files Browse the repository at this point in the history
feature: postgres datasource support
  • Loading branch information
delfrrr committed Jan 20, 2024
2 parents 758e2d2 + cc87039 commit cb4c21a
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ DEKART_SNOWFLAKE_ACCOUNT_ID=
DEKART_SNOWFLAKE_USER=
DEKART_SNOWFLAKE_PASSWORD=

# postgres like
DEKART_DATA_POSTGRES_PASSWORD=
DEKART_DATA_POSTGRES_USER=
DEKART_DATA_POSTGRES_LIKE_DB=
# postgres like data source connection string (e.g. postgres://user:password@host:port/db)
DEKART_POSTGRES_DATA_CONNECTION=

#UX
DEKART_UX_HOMEPAGE=
Expand Down
26 changes: 25 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ ifneq (,$(wildcard ./.env))
endif

UNAME := $(shell uname -m)
DEKART_DOCKER_DEV_TAG ?= dekart-dev
DEKART_DOCKER_E2E_TAG ?= dekart-e2e

proto-clean:
rm -rf ./src/proto/*.go
Expand Down Expand Up @@ -134,7 +136,29 @@ snowflake:
-e TEST_SPEC=cypress/e2e/snowflake \
${DEKART_DOCKER_E2E_TAG}


postgres:
docker buildx build --tag ${DEKART_DOCKER_E2E_TAG} -o type=image -f ./Dockerfile --target e2etest .
docker run -it --rm \
-v ${GOOGLE_APPLICATION_CREDENTIALS}:${GOOGLE_APPLICATION_CREDENTIALS} \
-e GOOGLE_APPLICATION_CREDENTIALS=${GOOGLE_APPLICATION_CREDENTIALS} \
-v $$(pwd)/cypress/videos:/dekart/cypress/videos/ \
-v $$(pwd)/cypress/screenshots:/dekart/cypress/screenshots/ \
-e DEKART_POSTGRES_DB=${DEKART_POSTGRES_DB} \
-e DEKART_POSTGRES_USER=${DEKART_POSTGRES_USER} \
-e DEKART_POSTGRES_PASSWORD=${DEKART_POSTGRES_PASSWORD} \
-e DEKART_POSTGRES_PORT=${DEKART_POSTGRES_PORT} \
-e DEKART_POSTGRES_HOST=host.docker.internal \
-e DEKART_MAPBOX_TOKEN=${DEKART_MAPBOX_TOKEN} \
-e DEKART_STORAGE=GCS \
\
-e DEKART_DATASOURCE=PG \
-e DEKART_POSTGRES_DATA_CONNECTION=${DEKART_POSTGRES_DATA_CONNECTION} \
\
-e DEKART_CLOUD_STORAGE_BUCKET=${DEKART_CLOUD_STORAGE_BUCKET} \
-e DEKART_ALLOW_FILE_UPLOAD=1 \
-e DEKART_CORS_ORIGIN=http://localhost:3000 \
-e TEST_SPEC=cypress/e2e/pg \
${DEKART_DOCKER_E2E_TAG}

docker: # build docker for local use
docker buildx build --push --tag ${DEKART_DOCKER_DEV_TAG} -o type=image --platform=linux/amd64 -f ./Dockerfile .
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Create beautiful data-driven maps and share them with your team:
* Snowflake ❄️
* BigQuery
* AWS Athena
* Postgres
* CSV (file upload)
* GeoJSON (file upload)

Expand Down
17 changes: 17 additions & 0 deletions cypress/e2e/pg/cancelQuery.cy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* eslint-disable no-undef */
import copy from '../../fixtures/copy.json'

describe('cancelling query', () => {
it('should cancels query', () => {
cy.visit('/')
cy.get(`button:contains("${copy.create_report}")`).click()
cy.get('button:contains("Add data from...")').click()
cy.get('span:contains("SQL query")').click()
cy.get('textarea').type(copy.simple_pg_query_long, { force: true })
cy.get(`button:contains("${copy.execute}")`).click()
cy.get(`button:contains("${copy.cancel}")`).should('be.visible')
cy.get(`button:contains("${copy.cancel}")`).click()
cy.get(`button:contains("${copy.execute}")`).should('be.enabled')
cy.get('#dekart-query-status-message').should('be.empty')
})
})
15 changes: 15 additions & 0 deletions cypress/e2e/pg/happyPath.cy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* eslint-disable no-undef */
import copy from '../../fixtures/copy.json'

describe('happy path', () => {
it('should make simple postgres query and get ready status', () => {
cy.visit('/')
cy.get(`button:contains("${copy.create_report}")`).click()
cy.get('button:contains("Add data from...")').click()
cy.get('span:contains("SQL query")').click()
cy.get('textarea').type(copy.simple_pg_query, { force: true })
cy.get(`button:contains("${copy.execute}")`).click()
cy.get(`span:contains("${copy.ready}")`, { timeout: 20000 }).should('be.visible')
cy.get('div:contains("1 rows")', { timeout: 20000 }).should('be.visible')
})
})
2 changes: 2 additions & 0 deletions cypress/fixtures/copy.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"simple_snowflake_query": "SELECT ROUND(uniform(-90::float, 90::float, random()), 6) AS lat, ROUND(uniform(-180::float, 180::float, random()), 6) AS lon FROM TABLE(GENERATOR(ROWCOUNT => 100))",
"simple_sql_query": "select geometry from`bigquery-public-data.geo_openstreetmap.planet_features_multipolygons` limit 10",
"simple_sql_query_limit_100k": "select geometry from`bigquery-public-data.geo_openstreetmap.planet_features_multipolygons` limit 100000",
"simple_pg_query": "SELECT table_name FROM information_schema.tables WHERE table_schema='public' ORDER BY table_name LIMIT 1",
"simple_pg_query_long": "SELECT 1, pg_sleep(60)",
"execute": "Execute",
"ready": "Ready",
"cancel": "Cancel",
Expand Down
4 changes: 4 additions & 0 deletions src/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"dekart/src/server/bqjob"
"dekart/src/server/dekart"
"dekart/src/server/job"
"dekart/src/server/pgjob"
"dekart/src/server/snowflakejob"
"dekart/src/server/storage"

Expand Down Expand Up @@ -118,6 +119,9 @@ func configureJobStore(bucket storage.Storage) job.Store {
case "ATHENA":
log.Info().Msg("Using Athena Datasource backend")
jobStore = athenajob.NewStore(bucket)
case "PG":
log.Info().Msg("Using Postgres LIKE Datasource backend")
jobStore = pgjob.NewStore()
case "BQ", "":
log.Info().Msg("Using BigQuery Datasource backend")
jobStore = bqjob.NewStore()
Expand Down
173 changes: 173 additions & 0 deletions src/server/pgjob/pgjob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package pgjob

import (
"context"
"database/sql"
"encoding/csv"
"fmt"
"io"
"os"

"dekart/src/proto"
"dekart/src/server/job"
"dekart/src/server/storage"
_ "github.com/lib/pq" // postgres driver
"github.com/rs/zerolog/log"
)

type Job struct {
job.BasicJob
postgresDB *sql.DB
storageObject storage.StorageObject
}

type Store struct {
job.BasicStore
postgresDB *sql.DB
}

func NewStore() *Store {
dbConnStr := os.Getenv("DEKART_POSTGRES_DATA_CONNECTION")
db, err := sql.Open("postgres", dbConnStr)
if err != nil {
log.Fatal().Err(err).Msg("failed to connect to postgres")
}

return &Store{
postgresDB: db,
}
}

func (s *Store) Create(reportID string, queryID string, queryText string, userCtx context.Context) (job.Job, chan int32, error) {
j := &Job{
BasicJob: job.BasicJob{
ReportID: reportID,
QueryID: queryID,
QueryText: queryText,
Logger: log.With().Str("reportID", reportID).Str("queryID", queryID).Logger(),
},
postgresDB: s.postgresDB,
}

j.Init(userCtx)
s.StoreJob(j)
go s.RemoveJobWhenDone(j)
return j, j.Status(), nil
}

func (j *Job) Run(storageObject storage.StorageObject, connection *proto.Connection) error {
j.Status() <- int32(proto.Query_JOB_STATUS_RUNNING)
j.storageObject = storageObject

rows, err := j.postgresDB.QueryContext(j.GetCtx(), j.QueryText)
if err != nil {
j.Logger.Error().Err(err).Str("queryText", j.QueryText).Msg("Error executing query")
j.CancelWithError(err)
return err
}
defer rows.Close()

csvRows := make(chan []string, 10_000)

go j.write(csvRows)

columnTypes, err := rows.ColumnTypes()
if err != nil {
j.Logger.Error().Err(err).Msg("Error getting column types")
j.CancelWithError(err)
return err
}

firstRow := true
for rows.Next() {
if firstRow {
firstRow = false
j.Status() <- int32(proto.Query_JOB_STATUS_READING_RESULTS)
columnNames := make([]string, len(columnTypes))
for i, columnType := range columnTypes {
columnNames[i] = columnType.Name()
}
csvRows <- columnNames
}

csvRow := make([]string, len(columnTypes))
values := make([]interface{}, len(columnTypes))
for i := range columnTypes {
values[i] = new(sql.NullString)
}

err = rows.Scan(values...)
if err != nil {
j.Logger.Error().Err(err).Msg("Error scanning rows")
j.Logger.Warn().Err(err).Msg("Error scanning row, continuing with next")
continue
}

for i := range columnTypes {
value := values[i]
switch x := value.(type) {
case *sql.NullString:
csvRow[i] = x.String
default:
return fmt.Errorf("incorrect type of data: %T", x)
}
}
csvRows <- csvRow
}

close(csvRows)
j.Status() <- int32(proto.Query_JOB_STATUS_DONE)
return nil
}

func (j *Job) write(csvRows chan []string) {
storageWriter := j.storageObject.GetWriter(j.GetCtx())
csvWriter := csv.NewWriter(storageWriter)
for {
csvRow, more := <-csvRows
if !more {
break
}
err := csvWriter.Write(csvRow)
if err == context.Canceled {
break
}
if err != nil {
j.Logger.Err(err).Send()
j.CancelWithError(err)
break
}
}
j.close(storageWriter, csvWriter)
}

func (j *Job) close(storageWriter io.WriteCloser, csvWriter *csv.Writer) {
csvWriter.Flush()
err := storageWriter.Close()
if err != nil {
// Ensure all resources are properly released even when context is canceled
if err == context.Canceled {
return
}
j.Logger.Err(err).Send()
j.CancelWithError(err)
return
}
resultSize, err := j.storageObject.GetSize(j.GetCtx())
if err != nil {
j.Logger.Err(err).Send()
j.CancelWithError(err)
return
}

j.Logger.Debug().Msg("Writing Done")

j.Lock()
j.ResultSize = *resultSize
jobID := j.GetID()
j.ResultID = &jobID
j.Unlock()

j.Status() <- int32(proto.Query_JOB_STATUS_DONE)
j.Cancel()
}

0 comments on commit cb4c21a

Please sign in to comment.