# Pyspark connect session without session level packages

In [None]:
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
import os

# Spark Connect URL
CONNECT_URL = "sc://spark-connect.zerocarbon-1.nzero.net:443/;use_ssl=true"

# add the EDP CA to the environment variable
tls_roots = "../EDP_CA/nzero.pem"
os.environ["GRPC_DEFAULT_SSL_ROOTS_FILE_PATH"] = tls_roots


spark = SparkSession.builder.remote(CONNECT_URL).getOrCreate()
spark.conf.set("spark.sql.session.localRelationCacheThreshold", 64 * 1024 * 1024)

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

In [2]:
spark.stop()

# Pyspark connect Session with Session level packages

In [None]:
import conda_pack
import os

! rm pyspark.tar.gz
conda_pack.pack()

In [None]:
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
import conda_pack
import os

# Spark Connect URL
CONNECT_URL = "sc://spark-connect.zerocarbon-1.nzero.net:443/;use_ssl=true"

# add the EDP CA to the environment variable
tls_roots = "../EDP_CA/nzero.pem"
os.environ["GRPC_DEFAULT_SSL_ROOTS_FILE_PATH"] = tls_roots

spark = SparkSession.builder.remote(CONNECT_URL).config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").appName("SimpleApp").getOrCreate()
spark.conf.set("spark.sql.session.localRelationCacheThreshold", 64 * 1024 * 1024)

spark.addArtifact(
    "pyspark.tar.gz#environment",
    archive=True)
spark.conf.set(
    "spark.sql.execution.pyspark.python", "environment/bin/python")



df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

In [5]:
# Save it as a Delta table
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")


In [None]:
# you won't be able to read it as a Delta table, because the Delta table is saved in the PVC of the executor
# if we used S3, we would be able to read it as a Delta table
spark.read.format("delta").load("/tmp/delta-table").show()

In [6]:
spark.stop()

# Spark connect session with S3 integration

In [None]:
! export MINIO_SECRET_KEY=<your-secret-key>
! export MINIO_ACCESS_KEY=<your-access-key>

In [None]:
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
import os 

# Spark Connect URL
CONNECT_URL = "sc://spark-connect.zerocarbon-1.nzero.net:443/;use_ssl=true"

# add the EDP CA to the environment variable
tls_roots = "../EDP_CA/nzero.pem"
os.environ["GRPC_DEFAULT_SSL_ROOTS_FILE_PATH"] = tls_roots

# Get MinIO credentials from environment variables
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY")


spark = SparkSession.builder.remote(CONNECT_URL) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.endpoint", "https://minio-c2-api.sxp-1.nzero.net:443") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
    .appName("SimpleApp").getOrCreate()



df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

In [9]:
# Save it as a Delta table
df.write.format("delta").mode("overwrite").save("s3a://spark-test/delta-table")

In [None]:
# you won't be able to read it as a Delta table, because the Delta table is saved in the PVC of the executor
# if we used S3, we would be able to read it as a Delta table
spark.read.format("delta").load("s3a://spark-test/delta-table").show()

In [None]:
from pyspark.sql.functions import input_file_name
import json

# Path to the delta log JSON files
log_path = "s3a://spark-test/delta-table/_delta_log"

# Read all JSON log files
log_df = spark.read.option("multiLine", True).json(f"{log_path}/*.json")

# Optional: show raw log structure
log_df.withColumn("file_name", input_file_name()).show(truncate=False)


In [None]:
history_df = log_df.filter("commitInfo IS NOT NULL").select("commitInfo.*")
history_df.select("*").show(truncate=False)


In [13]:
spark.stop()