In [None]:
!conda install -y pandas pyarrow==7.0.0 conda-pack
!pip install zstd
!conda pack -f -o base_conda_env.tar.gz # create conda-pack with the environment and its packages for the executors

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/default-java"
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "io.delta:delta-core_2.12:1.1.0,org.apache.hadoop:hadoop-aws:3.3.1" pyspark-shell'

import pyspark
from delta import configure_spark_with_delta_pip

namespace = "user-name" # usually "firstname-lastname"

builder = (
    pyspark.sql.SparkSession.builder.appName(f"{namespace}-spark-app")
    .master("k8s://https://kubernetes.default")
    .config("spark.kubernetes.namespace", namespace)
    .config("spark.archives", "base_conda_env.tar.gz#environment") # pass the conda-pack with the necessary packages so the executors can load them
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.WebIdentityTokenCredentialsProvider") # Either use built in authentication for S3
    # or a custom one with specific S3 Access and Secret Keys below
    # .config("spark.hadoop.fs.s3a.access.key", os.environ['AWS_S3_ACCESS_KEY']) # optional
    # .config("spark.hadoop.fs.s3a.secret.key", os.environ['AWS_S3_SECRET_KEY']) # optional
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", "default-editor")
    .config("spark.kubernetes.container.image.pullPolicy", "Always")
    .config("spark.kubernetes.container.image", "public.ecr.aws/atcommons/spark/python:latest")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.driver.port", "2222")
    .config("spark.driver.blockManager.port", "7078")
    .config("spark.blockManager.port", "7079")
    .config("spark.kubernetes.executor.annotation.traffic.sidecar.istio.io/excludeOutboundPorts", "7078")
    .config("spark.kubernetes.executor.annotation.traffic.sidecar.istio.io/excludeInboundPorts", "7079")
    # The section with `spark.kubernetes.executor.volumes.persistentVolumeClaim` is for
    # specifying the usage of a loca volume to enable more storage space for Disk Spilling
    # If not need, just completely remove the properties
    # you need only to modify the necessary size for the volume under `sizeLimit`
    .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName", "OnDemand") # disk storage for spilling
    .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass", "ebs-csi") # disk storage for spilling
    .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit", "100Gi") # disk storage for spilling
    .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path", "/data") # disk storage for spilling
    .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly", "false") # disk storage for spilling
    # The section with `spark.kubernetes.node.selector` is for specifying
    # what nodes to use for the executor and in which Availability Zone (AZ)
    # They need to be in the same zone
    .config("spark.kubernetes.node.selector.topology.ebs.csi.aws.com/zone", "eu-central-1a") # node selector
    .config("spark.kubernetes.node.selector.plural.sh/scalingGroup", "xlarge-mem-optimized-on-demand") # node selector
    .config("spark.driver.host", f"sparknotebook-spark.{namespace}.svc.cluster.local")
    .config("spark.executor.instances", "2") # number of Executors
    .config("spark.executor.memory", "3g") # Executor memory
    .config("spark.executor.cores", "1") # Executor cores 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [None]:
import dbldatagen as dg

schema = dg.SchemaParser.parseCreateTable(spark, """
    create table Test1 (
    source string ,
    language string ,
    topic string ,
    license string )
""")

data_rows = 4*10

x3 = (dg.DataGenerator(sparkSession=spark, name="test_table_query", rows=data_rows, partitions=20)
      .withSchema(schema)
      .withIdOutput()
      .withColumnSpec("source", values=["hackernews", "cc", "wikipedia", "academic", "books", "pubmed", "opensubtitiles", "youtubesubtitles"], random=True)
      .withColumnSpec("language", values=["en", "de", "fr", "es", "ru"], random=True)
      .withColumnSpec("topic", values=["software", "medical", "cultural", "academic", "hardware", "ai", "ml", "random"], random=True)
      .withColumnSpec("license", values=["MIT", "GPL-v2", "GPL-v3", "private", "apache", "cc"], random=True)
     )

x3_output_full = x3.build()

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import BinaryType
import zstd


def compress(in_str: str) -> str:
    return zstd.compress(in_str.encode("utf-8"))
    
compress_udf = F.udf(compress, BinaryType())
    
(
    x3_output_full
    .withColumn("newcol", compress_udf(F.col("topic")))
    .show()
)

In [None]:
import time

start = time.monotonic_ns()
#x3_output_full.write.format("delta").mode("overwrite").saveAsTable("test_data")
x3_output_full.write.format("delta").mode("overwrite").saveAsTable("test_data2", path='s3a://tims-delta-lake/delta-table-bench')
print("Time elapsed : ", (time.monotonic_ns() - start)/10**9, "s")