diff --git a/.github/workflows/style-check.yaml b/.github/workflows/style-check.yaml index 3591fba..548441e 100644 --- a/.github/workflows/style-check.yaml +++ b/.github/workflows/style-check.yaml @@ -21,7 +21,7 @@ jobs: pip install --upgrade black # Note: black fails when it doesn't have to do anything. git diff --name-only --no-color --diff-filter=ACM $(git merge-base origin/main HEAD) | - grep -v '\(\.json\|\.csv\|\.ipynb\|\.hpp\.in\|\.ref\|\.example\|\.txt\|\.lock\|\.js\)$' | + grep -v '\(\.json\|\.csv\|\.ipynb\|\.hpp\.in\|\.ref\|\.example\|\.txt\|\.lock\|\.js\|\.properties\)$' | 2>/dev/null xargs black || true git diff --exit-code diff --git a/.github/workflows/trino.yaml b/.github/workflows/trino.yaml index c77e4a6..e9af09f 100644 --- a/.github/workflows/trino.yaml +++ b/.github/workflows/trino.yaml @@ -12,23 +12,20 @@ jobs: name: Trino Build runs-on: ubuntu-20.04 steps: - - name: Clone - run: git clone --depth 1 --branch ${{ github.event.inputs.trino-version }} https://github.com/trinodb/trino.git + - uses: actions/checkout@v3 + with: + repository: 'cloudfuse-io/trino' + ref: ${{ github.event.inputs.trino-version }}-patch - uses: actions/setup-java@v3 with: distribution: 'zulu' java-version: 17 cache: 'maven' - - name: Patch - run: | - sed -i '/verifyJvmRequirements()/d' trino/core/trino-main/src/main/java/io/trino/server/Server.java - sed -i '/import static io.trino.server.TrinoSystemRequirements.verifyJvmRequirements;/d' trino/core/trino-main/src/main/java/io/trino/server/Server.java - name: Build - working-directory: ./trino - run: ./mvnw -pl core/trino-server clean install -DskipTests + run: ./mvnw -pl core/trino-main,core/trino-server clean install -DskipTests - name: Release uses: softprops/action-gh-release@v1 with: tag_name: trino-server-${{ github.event.inputs.trino-version }} body: Custom build of Trino Server version ${{ github.event.inputs.trino-version }} disabling file descriptor checks - files: trino/core/trino-server/target/trino-server-${{ github.event.inputs.trino-version }}.tar.gz + files: core/trino-server/target/trino-server-${{ github.event.inputs.trino-version }}.tar.gz diff --git a/cli/core.py b/cli/core.py index 322226c..c8cc61b 100644 --- a/cli/core.py +++ b/cli/core.py @@ -296,4 +296,4 @@ def dockerized(c, engine): compose = f"docker compose -f {RUNTIME_TFDIR}/{engine}/build/docker-compose.yaml" c.run(f"{compose} down -v") c.run(f"{compose} build") - c.run(f"DATA_BUCKET_NAME={bucket_name(c)} {compose} up") + c.run(f"DATA_BUCKET_NAME={bucket_name(c)} {compose} run {engine}") diff --git a/cli/plugins/monitoring.py b/cli/plugins/monitoring.py index 76390ea..ffd6c54 100644 --- a/cli/plugins/monitoring.py +++ b/cli/plugins/monitoring.py @@ -7,6 +7,7 @@ import plugins.spark as spark import plugins.dremio as dremio import plugins.dask as dask +import plugins.trino as trino from datetime import datetime from google.cloud import bigquery from google.oauth2 import service_account @@ -106,4 +107,6 @@ def run_and_send_twice(example): run_and_send_twice(dremio.lambda_example) if "dask" in active_plugins: run_and_send_twice(dask.lambda_example) + if "trino" in active_plugins: + run_and_send_twice(trino.lambda_example) time.sleep(300) diff --git a/cli/plugins/trino.py b/cli/plugins/trino.py new file mode 100644 index 0000000..822640d --- /dev/null +++ b/cli/plugins/trino.py @@ -0,0 +1,23 @@ +"""Trino on AWS Lambda""" + +from invoke import task +import core + + +@task(autoprint=True) +def lambda_example(c, json_output=False, month="01"): + """SUM(trip_distance) GROUP_BY payment_type with preliminary CREATE EXTERNAL TABLE""" + sql = f""" +CREATE TABLE hive.default.taxi2019{month} (trip_distance REAL, payment_type VARCHAR) +WITH ( + external_location = 's3a://{core.bucket_name(c)}/nyc-taxi/2019/{month}/', + format = 'PARQUET' +); + +SELECT payment_type, SUM(trip_distance) +FROM hive.default.taxi2019{month} +GROUP BY payment_type; +""" + if not json_output: + print(sql) + return core.run_lambda(c, "trino", sql, json_output=json_output) diff --git a/infra/runtime/dremio/build/README.md b/infra/runtime/dremio/build/README.md index 5823171..5ae0ebb 100644 --- a/infra/runtime/dremio/build/README.md +++ b/infra/runtime/dremio/build/README.md @@ -4,4 +4,7 @@ - create a Dremio user and use its credentials to: - create a source - start the query - - poll for the resulut + - poll for the result +- By default Dremio tries to discover its private IP and uses that to + communicate. We want to loopback on `localhost` instead, hence the + configuration `registration.publish-host: "localhost"` diff --git a/infra/runtime/trino/.terraform.lock.hcl b/infra/runtime/trino/.terraform.lock.hcl new file mode 100644 index 0000000..c51eb53 --- /dev/null +++ b/infra/runtime/trino/.terraform.lock.hcl @@ -0,0 +1,22 @@ +# This file is maintained automatically by "terraform init". +# Manual edits may be lost in future updates. + +provider "registry.terraform.io/hashicorp/aws" { + version = "3.75.2" + constraints = "~> 3.0" + hashes = [ + "h1:x0gluX9ZKEmz+JJW3Ut5GgWDFOq/lhs2vkqJ+xt57zs=", + "zh:0e75fb14ec42d69bc46461dd54016bb2487d38da324222cec20863918b8954c4", + "zh:30831a1fe29f005d8b809250b43d09522288db45d474c9d238b26f40bdca2388", + "zh:36163d625ab2999c9cd31ef2475d978f9f033a8dfa0d585f1665f2d6492fac4b", + "zh:48ec39685541e4ddd8ddd196e2cfb72516b87f471d86ac3892bc11f83c573199", + "zh:707b9c8775efd6962b6226d914ab25f308013bba1f68953daa77adca99ff6807", + "zh:72bd9f4609a827afa366c6f119c7dec7d73a35d712dad1457c0497d87bf8d160", + "zh:930e3ae3d0cb152e17ee5a8aee5cb47f7613d6421bc7c22e7f50c19da484a100", + "zh:9b12af85486a96aedd8d7984b0ff811a4b42e3d88dad1a3fb4c0b580d04fa425", + "zh:a19bf49b80101a0f0272b994153eeff8f8c206ecc592707bfbce7563355b6882", + "zh:a34b5d2bbaf52285b0c9a8df6258f4789f4d927ff777e126bdc77e7887abbeaa", + "zh:caad6fd5e79eae33e6d74e38c3b15c28a5482f2a1a8ca46cc1ee70089de61adb", + "zh:f2eae988635030de9a088f8058fbcd91e2014a8312a48b16bfd09a9d69d9d6f7", + ] +} diff --git a/infra/runtime/trino/build/Dockerfile b/infra/runtime/trino/build/Dockerfile new file mode 100644 index 0000000..c0d8ed0 --- /dev/null +++ b/infra/runtime/trino/build/Dockerfile @@ -0,0 +1,85 @@ +ARG FUNCTION_DIR="/function" +ARG HADOOP_VERSION=3.2.0 +# The SDK version must be the one in the Hadoop package +ARG AWS_JAVA_SDK_VERSION=1.11.375 +ARG METASTORE_VERSION=3.0.0 +# We use custom builds of trino-server +ARG TRINO_VERSION=378 + + +FROM ubuntu:20.04 as ric-dependency + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update && \ + apt-get install -y \ + g++ \ + make \ + cmake \ + unzip \ + python3 \ + python3-pip \ + libcurl4-openssl-dev +ARG FUNCTION_DIR +RUN mkdir -p ${FUNCTION_DIR} +RUN pip3 install \ + --target ${FUNCTION_DIR} \ + awslambdaric +COPY lambda-handler.py ${FUNCTION_DIR} + + +FROM ubuntu:20.04 +ARG HADOOP_VERSION +ARG METASTORE_VERSION +ARG TRINO_VERSION +ARG AWS_JAVA_SDK_VERSION + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update && apt-get install -y \ + curl \ + less \ + openjdk-11-jdk \ + python3 \ + && rm -rf /var/lib/apt/lists/* +ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/ +RUN ln -s /usr/bin/python3 /usr/bin/python + +# HIVE METASTORE + +WORKDIR /opt + +ENV HADOOP_HOME=/opt/hadoop-${HADOOP_VERSION} +ENV HIVE_HOME=/opt/apache-hive-metastore-${METASTORE_VERSION}-bin +# jars used by Trino +ENV HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/tools/lib/aws-java-sdk-bundle-${AWS_JAVA_SDK_VERSION}.jar:${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-aws-${HADOOP_VERSION}.jar + +RUN curl -L https://archive.apache.org/dist/hive/hive-standalone-metastore-${METASTORE_VERSION}/hive-standalone-metastore-${METASTORE_VERSION}-bin.tar.gz | tar zxf - && \ + # Download from mirror and trim some unused libraries + curl -L https://github.com/cloudfuse-io/lambdatization/releases/download/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz | tar zxf - && \ + cd hadoop-${HADOOP_VERSION}/share/hadoop/ && \ + rm -r client/* && \ + rm -r hdfs/* && \ + rm -r mapreduce/* && \ + rm -r yarn/* && \ + find ./tools/lib -type f -not \( -name "aws-java-sdk-bundle-${AWS_JAVA_SDK_VERSION}.jar" -or -name "hadoop-aws-${HADOOP_VERSION}.jar" \) -delete + +ENV PATH="${HIVE_HOME}/bin:${PATH}" +COPY metastore-site.xml ${HIVE_HOME}/conf + +# TRINO + +ENV TRINO_HOME=/opt/trino-server-${TRINO_VERSION} +RUN curl -L https://github.com/cloudfuse-io/lambdatization/releases/download/trino-server-${TRINO_VERSION}/trino-server-${TRINO_VERSION}.tar.gz | tar zxf - && \ + curl -L https://repo1.maven.org/maven2/io/trino/trino-cli/${TRINO_VERSION}/trino-cli-${TRINO_VERSION}-executable.jar -o ${TRINO_HOME}/bin/trino && \ + chmod +x ${TRINO_HOME}/bin/trino +ENV PATH="${TRINO_HOME}/bin:${PATH}" +COPY trino-etc ${TRINO_HOME}/etc + +# LAMBDA ENTRYPOINT + +ARG FUNCTION_DIR +COPY --from=ric-dependency ${FUNCTION_DIR} ${FUNCTION_DIR} +WORKDIR ${FUNCTION_DIR} +ENTRYPOINT [ "python3", "-m", "awslambdaric" ] +CMD [ "lambda-handler.handler" ] diff --git a/infra/runtime/trino/build/README.md b/infra/runtime/trino/build/README.md new file mode 100644 index 0000000..32bd0ed --- /dev/null +++ b/infra/runtime/trino/build/README.md @@ -0,0 +1,41 @@ +# Trino lambdatization tricks + +## List of tricks + +- Trino loads many plugins by default, which implies opening many jar files in + parallel. To make sure this process doesn't exceed the system's maximum number + of file descriptors, it performs a check of the ulimit when starting. The + minimum required is 4096, but unfortunately we have a hard limit on AWS Lambda + at 1024. We had to + [rebuild](https://github.com/cloudfuse-io/lambdatization/actions/workflows/trino.yaml) + Trino with a patch that: + - loads less plugins + - removes the check on fileno +- Trino, like Dremio, automatically detects its private IP and tries to use it + for internal connections. We didn't find a knob to disable this behaviour, so + we had to harcode it in the patch. +- It seems you cannot query S3 without using the Hive metastore, so we had to + install a local version of it running on Derby which adds to the init time. +- The container image is huge (>2GB): + - we are pulling in a full Hadoop distribution, in which most files won't be + used. We started removing some libraries from it but we could probably trim + a few more hundreds of MBs + - we could also use a remote Hive metastore (like Glue) instead of installing + a local one + - obviously, we could use a smaller base image + +## Updating Trino version + +To change the Trino version, the patch needs to be applied to that version (xxx): +```bash +git clone cloudfuse-io/trino +cd trino +git checkout 378-patch +git checkout -b xxx-patch +git rebase xxx +git push +``` + +Then run the build in the [Trino +workflow](https://github.com/cloudfuse-io/lambdatization/actions/workflows/trino.yaml) +with your new Trino version number xxx diff --git a/infra/runtime/trino/build/docker-compose.yaml b/infra/runtime/trino/build/docker-compose.yaml new file mode 100644 index 0000000..fe1a478 --- /dev/null +++ b/infra/runtime/trino/build/docker-compose.yaml @@ -0,0 +1,34 @@ +version: "3.9" +services: + trino: + build: . + image: cloudfuse-io/l12n:trino + cap_drop: + - ALL + read_only: true + volumes: + - trino-tmp:/tmp + user: nobody + entrypoint: + # - bash + - python3 + - lambda-handler.py + environment: + - AWS_ACCESS_KEY_ID=$LAMBDA_ACCESS_KEY_ID + - AWS_SECRET_ACCESS_KEY=$LAMBDA_SECRET_ACCESS_KEY + - AWS_SESSION_TOKEN=$LAMBDA_SESSION_TOKEN + - AWS_REGION=$L12N_AWS_REGION + - DATA_BUCKET_NAME + networks: + - tmpengine + ulimits: + nofile: + soft: 1024 + hard: 1024 + +volumes: + trino-tmp: + + +networks: + tmpengine: diff --git a/infra/runtime/trino/build/lambda-handler.py b/infra/runtime/trino/build/lambda-handler.py new file mode 100644 index 0000000..ef80a59 --- /dev/null +++ b/infra/runtime/trino/build/lambda-handler.py @@ -0,0 +1,161 @@ +import logging +import base64 +import os +import tempfile +import time +import sys +import threading +import selectors +import subprocess +from typing import Tuple + +logging.getLogger().setLevel(logging.INFO) + + +class StdLogger: + """A class that efficiently multiplexes std streams into logs""" + + def _start_logging(self): + while True: + for key, _ in self.selector.select(): + # read1 instead or read to avoid blocking + data = key.fileobj.read1() + if key.fileobj not in self.files: + raise Exception("Unexpected file desc in selector") + with self.lock: + name = self.files[key.fileobj] + if not data: + print(f"{name} - EOS", flush=True, file=sys.stderr) + self.selector.unregister(key.fileobj) + with self.lock: + del self.files[key.fileobj] + else: + lines = data.decode().splitlines() + for line in lines: + print(f"{name} - {line}", flush=True, file=sys.stderr) + with self.lock: + self.logs[name].extend(lines) + + def __init__(self): + self.lock = threading.Lock() + self.files = {} + self.logs = {} + self.selector = selectors.DefaultSelector() + self.thread = threading.Thread(target=self._start_logging, daemon=True) + + def start(self): + """Start consuming registered streams (if any) and logging them""" + self.thread.start() + + def add(self, name: str, file): + """Add a new stream with the given name""" + with self.lock: + self.files[file] = name + self.logs[name] = [] + self.selector.register(file, selectors.EVENT_READ) + + def get(self, name: str) -> str: + """Get the history of the stream for the given name""" + with self.lock: + return "\n".join(self.logs[name]) + + +IS_COLD_START = True +STD_LOGGER: StdLogger = None + + +def init(): + global STD_LOGGER + STD_LOGGER = StdLogger() + STD_LOGGER.start() + + trino_proc = subprocess.Popen( + ["launcher", "run"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + STD_LOGGER.add("trino-srv|stdout", trino_proc.stdout) + + schematool_proc = subprocess.Popen( + ["schematool", "-initSchema", "-dbType", "derby"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd="/tmp", + ) + STD_LOGGER.add("schematool|stdout", schematool_proc.stdout) + STD_LOGGER.add("schematool|stderr", schematool_proc.stderr) + if schematool_proc.wait() != 0: + raise Exception("Hive schema seeding failed") + hive_proc = subprocess.Popen( + ["start-metastore"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd="/tmp" + ) + STD_LOGGER.add("hive-srv|stdout", hive_proc.stdout) + STD_LOGGER.add("hive-srv|stderr", hive_proc.stderr) + + for line_bytes in trino_proc.stderr: + log_line = line_bytes.decode() + print(f"trino-srv|stderr - {log_line}", flush=True, file=sys.stderr, end="") + if "======== SERVER STARTED ========" in log_line: + return + raise Exception("Trino server didn't start successfully") + + +def query(sql: str) -> Tuple[str, str]: + """Run a single SQL query using Trino cli""" + with tempfile.NamedTemporaryFile(prefix="query", delete=False) as tmp: + query_file = tmp.name + tmp.write(sql.encode()) + + cli_proc = subprocess.Popen( + ["trino", f"--file={query_file}", "--progress"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + STD_LOGGER.add("trino-cli|stdout", cli_proc.stdout) + STD_LOGGER.add("trino-cli|stderr", cli_proc.stderr) + if cli_proc.wait() != 0: + raise Exception(f"Query failed: {STD_LOGGER.get('trino-cli|stderr')}") + return (STD_LOGGER.get("trino-cli|stdout"), STD_LOGGER.get("trino-cli|stderr")) + + +def handler(event, context): + """An AWS Lambda handler that runs the provided command with bash and returns the standard output""" + start = time.time() + global IS_COLD_START + is_cold_start = IS_COLD_START + IS_COLD_START = False + if is_cold_start: + init() + src_command = base64.b64decode(event["query"]).decode("utf-8") + + (resp_stdout, resp_stderr) = query(src_command) + + result = { + "resp": resp_stdout, + "logs": resp_stderr, + "parsed_queries": [src_command], + "context": { + "cold_start": is_cold_start, + "handler_duration_sec": time.time() - start, + }, + } + return result + + +if __name__ == "__main__": + query_str = f""" +CREATE TABLE hive.default.taxi201901 (trip_distance REAL, payment_type VARCHAR) +WITH ( + external_location = 's3a://{os.getenv("DATA_BUCKET_NAME")}/nyc-taxi/2019/01/', + format = 'PARQUET' +); + +SELECT payment_type, SUM(trip_distance) +FROM hive.default.taxi201901 +GROUP BY payment_type; +""" + res = handler( + {"query": base64.b64encode(query_str.encode("utf-8"))}, + {}, + ) + print(res) diff --git a/infra/runtime/trino/build/metastore-site.xml b/infra/runtime/trino/build/metastore-site.xml new file mode 100644 index 0000000..a359112 --- /dev/null +++ b/infra/runtime/trino/build/metastore-site.xml @@ -0,0 +1,24 @@ + + + metastore.thrift.uris + thrift://localhost:9083 + Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. + + + metastore.task.threads.always + org.apache.hadoop.hive.metastore.events.EventCleanerTask,org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask + + + metastore.expression.proxy + org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy + + + metastore.storage.schema.reader.impl + org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader + + + datanucleus.autoCreateSchema + false + Creates necessary schema on a startup, we use init script instead + + diff --git a/infra/runtime/trino/build/trino-etc/catalog/hive.properties b/infra/runtime/trino/build/trino-etc/catalog/hive.properties new file mode 100644 index 0000000..0a60f08 --- /dev/null +++ b/infra/runtime/trino/build/trino-etc/catalog/hive.properties @@ -0,0 +1,5 @@ +connector.name=hive +hive.metastore.uri=thrift://localhost:9083 +hive.storage-format=ORC + +hive.allow-drop-table=true diff --git a/infra/runtime/trino/build/trino-etc/config.properties b/infra/runtime/trino/build/trino-etc/config.properties new file mode 100644 index 0000000..6f93799 --- /dev/null +++ b/infra/runtime/trino/build/trino-etc/config.properties @@ -0,0 +1,6 @@ +#single node install config +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +discovery.uri=http://localhost:8080 +log.path=/tmp/trino/var/log/server.log diff --git a/infra/runtime/trino/build/trino-etc/jvm.config b/infra/runtime/trino/build/trino-etc/jvm.config new file mode 100644 index 0000000..c675bde --- /dev/null +++ b/infra/runtime/trino/build/trino-etc/jvm.config @@ -0,0 +1,14 @@ +-server +-Xmx1G +-XX:-UseBiasedLocking +-XX:+UseG1GC +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+HeapDumpOnOutOfMemoryError +-XX:+ExitOnOutOfMemoryError +-XX:-OmitStackTraceInFastThrow +-XX:ReservedCodeCacheSize=256M +-XX:PerMethodRecompilationCutoff=10000 +-XX:PerBytecodeRecompilationCutoff=10000 +-Djdk.attach.allowAttachSelf=true +-Djdk.nio.maxCachedBufferSize=2000000 diff --git a/infra/runtime/trino/build/trino-etc/node.properties b/infra/runtime/trino/build/trino-etc/node.properties new file mode 100644 index 0000000..723662f --- /dev/null +++ b/infra/runtime/trino/build/trino-etc/node.properties @@ -0,0 +1,2 @@ +node.environment=docker +node.data-dir=/tmp/trino diff --git a/infra/runtime/trino/main.tf b/infra/runtime/trino/main.tf new file mode 100644 index 0000000..4ef4ced --- /dev/null +++ b/infra/runtime/trino/main.tf @@ -0,0 +1,58 @@ +variable "region_name" {} + +variable "trino_image" {} + +variable "bucket_arn" {} + +module "env" { + source = "../../common/env" +} + +provider "aws" { + region = var.region_name + default_tags { + tags = module.env.default_tags + } +} + +resource "aws_iam_policy" "s3_access" { + name = "${module.env.module_name}-trino-s3-access-${var.region_name}-${module.env.stage}" + + policy = < images.generated.tfvars +EOT + ] + } + + extra_arguments "image_vars" { + commands = ["apply"] + arguments = ["-var-file=${get_terragrunt_dir()}/images.generated.tfvars"] + } + +} + +inputs = { + region_name = local.region_name + trino_image = "dummy_overriden_by_before_hook" + bucket_arn = dependency.core.outputs.bucket_arn +}