Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ S3_REGION=eu-west-1
# PostgreSQL
# ==========

PGPASSWORD=datalabtech
PSQL_ROOT_PASSWORD=datalabtech

# ========
# Data Lab
Expand All @@ -38,11 +38,23 @@ S3_BACKUPS_PREFIX=backups
# - All paths are relative to local/

ENGINE_DB=engine.duckdb

# Deprecated: only use to migrate from SQLite to PostgreSQL
STAGE_DB=stage.sqlite
SECURE_STAGE_DB=secure_stage.sqlite
GRAPHS_MART_DB=marts/graphs.sqlite
ANALYTICS_MART_DB=marts/analytics.sqlite

PSQL_CATALOG_HOST=docker-shared
PSQL_CATALOG_PORT=5432
PSQL_CATALOG_DB=lakehouse
PSQL_CATALOG_USER=lakehouse
PSQL_CATALOG_PASSWORD=lakehouse
PSQL_CATALOG_STAGE_SCHEMA=stage
PSQL_CATALOG_SECURE_STAGE_SCHEMA=secure_stage
PSQL_CATALOG_GRAPHS_MART_SCHEMA=graphs
PSQL_CATALOG_ANALYTICS_MART_SCHEMA=analytics

# ====
# Kuzu
# ====
Expand Down
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ s3://lakehouse/
│ └── catalog/
│ ├── YYYY_MM_DD/
│ │ └── HH_mm_SS_sss/
│ │ ├── engine.duckdb
│ │ ├── stage.sqlite
│ │ └── marts/*.sqlite
│ │ └── lakehouse.dump
│ └── manifest.json
├── raw/
│ └── <dataset-name>/
Expand Down Expand Up @@ -238,7 +236,7 @@ S3_REGION=eu-west-1
#### PostgreSQL

```bash
PGPASSWORD=datalabtech
PSQL_ROOT_PASSWORD=datalabtech
```

Set this to the `root` user password of your PostgreSQL database—only used when deploying your on-premise infrastructure, so that databases and credentials can be provisioned at a later stage. Otherwise not accessed.
Expand Down Expand Up @@ -440,9 +438,7 @@ dlctl backup create
In order to restore a backup, just run:

```bash
dlctl backup restore \
--source "<YYYY-mm-ddTHH:MM:SS.sss>" \
--target "<target-dir>"
dlctl backup restore --source "<YYYY-mm-ddTHH:MM:SS.sss>"
```

Omitting `--source` will restore the latest backup.
Expand Down
59 changes: 40 additions & 19 deletions dlctl/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import subprocess
import sys
import tempfile
from datetime import datetime
from importlib.metadata import version
from pathlib import Path
Expand All @@ -15,7 +17,7 @@
from ingest.cli import ingest
from ml.cli import ml
from shared.cache import cache_usage, expunge_cache
from shared.settings import LOCAL_DIR, MART_DB_VARS, env
from shared.settings import LOCAL_DIR, MART_SCHEMA_VARS, env
from shared.storage import Storage, StoragePrefix

LOG_FILE = Path(__file__).resolve().parents[1] / "logs/datalab.log"
Expand Down Expand Up @@ -86,22 +88,25 @@ def backup():
def backup_create():
log.info("Creating a catalog backup")

source_files = [env.str("ENGINE_DB"), env.str("STAGE_DB")]
source_files += (env.str(varname) for varname in MART_DB_VARS)
os.environ["PGHOST"] = env.str("PSQL_CATALOG_HOST")
os.environ["PGPORT"] = env.str("PSQL_CATALOG_PORT")
os.environ["PGDATABASE"] = env.str("PSQL_CATALOG_DB")
os.environ["PGUSER"] = env.str("PSQL_CATALOG_USER")
os.environ["PGPASSWORD"] = env.str("PSQL_CATALOG_PASSWORD")

for source_file in source_files:
source_path = os.path.join(LOCAL_DIR, source_file)
with tempfile.NamedTemporaryFile(
prefix="datalab-lakehouse-",
suffix=".dump",
) as tmp:
log.debug("Dumping to temporary file: {}", tmp.name)
subprocess.run(["pg_dump", "-Fc", "-f", tmp.name], check=True)

if not os.path.exists(source_path):
log.error("source path doesn't exist: {}", source_path)
return

s = Storage(prefix=StoragePrefix.BACKUPS)
s3_backup_path = s.get_dir("catalog", dated=True)
s.upload_files(LOCAL_DIR, source_files, s3_backup_path)
s.upload_manifest("catalog", latest=s3_backup_path)
s = Storage(prefix=StoragePrefix.BACKUPS)
s3_backup_path = s.get_dir("catalog", dated=True)
s.upload_file(tmp.name, f"{s3_backup_path}/lakehouse.dump")
s.upload_manifest("catalog", latest=s3_backup_path)

log.info("Catalog backup created: {}", s3_backup_path)
log.info("Catalog backup created: {}", s3_backup_path)


@backup.command(name="restore", help="Restore engine and catalog into a directory")
Expand All @@ -111,9 +116,13 @@ def backup_create():
type=click.DateTime(formats=["%Y-%m-%dT%H:%M:%S.%f"]),
help="Timestamp for backup source (YYYY-mm-ddTHH:MM:SS.sss)",
)
@click.option("--target", default=LOCAL_DIR, type=click.STRING, help="Target directory")
def backup_restore(source_date: Optional[datetime], target: str):
os.makedirs(target, exist_ok=True)
def backup_restore(source_date: Optional[datetime]):
os.environ["PGHOST"] = env.str("PSQL_CATALOG_HOST")
os.environ["PGPORT"] = env.str("PSQL_CATALOG_PORT")
os.environ["PGUSER"] = env.str("PSQL_CATALOG_USER")
os.environ["PGPASSWORD"] = env.str("PSQL_CATALOG_PASSWORD")

db = env.str("PSQL_CATALOG_DB")

s = Storage(prefix=StoragePrefix.BACKUPS)

Expand All @@ -130,8 +139,20 @@ def backup_restore(source_date: Optional[datetime], target: str):
time = source_date.strftime("%H_%M_%S_%f")[:-3]
s3_path = s.to_s3_path(f"{env.str('S3_BACKUPS_PREFIX')}/catalog/{date}/{time}")

log.info("Restoring backup from {} into {}", s3_path, target)
s.download_dir(s3_path, target)
log.info("Restoring backup from {}", s3_path)

with tempfile.NamedTemporaryFile(
prefix="datalab-lakehouse-",
suffix=".dump",
) as tmp:
log.debug("Downloading dump to temporary file: {}", tmp.name)
s.download_file(f"{s3_path}/lakehouse.dump", tmp.name)

log.debug("Restoring dump from temporary file: {}", tmp.name)
subprocess.run(
["pg_restore", "-c", "--if-exists", "-d", db, tmp.name],
check=True,
)


@backup.command(name="ls", help="List catalog backups")
Expand Down
8 changes: 0 additions & 8 deletions dlctl/dbt_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@ def mkdirs(self):
engine_db_dir = os.path.dirname(os.path.join(LOCAL_DIR, env.str("ENGINE_DB")))
os.makedirs(engine_db_dir, exist_ok=True)

stage_db_dir = os.path.dirname(os.path.join(LOCAL_DIR, env.str("STAGE_DB")))
os.makedirs(stage_db_dir, exist_ok=True)

for name, value in os.environ.items():
if name.endswith("_MART_DB"):
mart_db_dir = os.path.dirname(os.path.join(LOCAL_DIR, value))
os.makedirs(mart_db_dir, exist_ok=True)

def deps(self):
self.dbt.invoke(["deps"] + self.PROJECT_ARGS)

Expand Down
25 changes: 25 additions & 0 deletions infra/apps/docker/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,31 @@ services:
environment:
MLFLOW_TRACKING_URI: ${MLFLOW_TRACKING_URI}
KAFKA_BROKER_ENDPOINT: ${KAFKA_BROKER_ENDPOINT}

S3_ENDPOINT: ${S3_ENDPOINT}
S3_USE_SSL: ${S3_USE_SSL}
S3_URL_STYLE: ${S3_URL_STYLE}
S3_ACCESS_KEY_ID: ${S3_ACCESS_KEY_ID}
S3_SECRET_ACCESS_KEY: ${S3_SECRET_ACCESS_KEY}
S3_REGION: ${S3_REGION}

S3_BUCKET: ${S3_BUCKET}
S3_STAGE_PREFIX: ${S3_STAGE_PREFIX}
S3_SECURE_STAGE_PREFIX: ${S3_SECURE_STAGE_PREFIX}
S3_GRAPHS_MART_PREFIX: ${S3_GRAPHS_MART_PREFIX}
S3_ANALYTICS_MART_PREFIX: ${S3_ANALYTICS_MART_PREFIX}
S3_EXPORTS_PREFIX: ${S3_EXPORTS_PREFIX}
S3_BACKUPS_PREFIX: ${S3_BACKUPS_PREFIX}

PSQL_CATALOG_HOST: ${PSQL_CATALOG_HOST}
PSQL_CATALOG_PORT: ${PSQL_CATALOG_PORT}
PSQL_CATALOG_DB: ${PSQL_CATALOG_DB}
PSQL_CATALOG_USER: ${PSQL_CATALOG_USER}
PSQL_CATALOG_PASSWORD: ${PSQL_CATALOG_PASSWORD}
PSQL_CATALOG_STAGE_SCHEMA: ${PSQL_CATALOG_STAGE_SCHEMA}
PSQL_CATALOG_SECURE_STAGE_SCHEMA: ${PSQL_CATALOG_SECURE_STAGE_SCHEMA}
PSQL_CATALOG_GRAPHS_MART_SCHEMA: ${PSQL_CATALOG_GRAPHS_MART_SCHEMA}
PSQL_CATALOG_ANALYTICS_MART_SCHEMA: ${PSQL_CATALOG_ANALYTICS_MART_SCHEMA}
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 10s
Expand Down
2 changes: 1 addition & 1 deletion infra/services/docker/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
- "5432:5432"
environment:
POSTGRES_USER: root
POSTGRES_PASSWORD: ${PGPASSWORD}
POSTGRES_PASSWORD: ${PSQL_ROOT_PASSWORD}
volumes:
- postgres:/var/lib/postgresql/data
networks:
Expand Down
Loading