In [1]:
! python3 -V
! java --version
! pyspark --version

Python 3.11.10
openjdk 17.0.12 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu224.04)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu224.04, mixed mode, sharing)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.2
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 17.0.12
Branch HEAD
Compiled by user ubuntu on 2024-08-06T11:36:15Z
Revision bb7846dd487f259994fdc69e18e03382e3f64f42
Url https://github.com/apache/spark
Type --help for more information.


In [2]:
import os

# get minio credentials
with open("/minio-s3-credentials/accessKey", "r") as f:
    minio_user = f.read().strip()

with open("/minio-s3-credentials/secretKey", "r") as f:
    minio_pwd = f.read().strip()

In [3]:
try:
    spark.stop()
except NameError:
    pass

## from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .appName("local-with-s3a")
        .master("local[*]")
        # >>> fehlende Treiber+SDK holen
        .config(
            "spark.jars.packages",
            "org.apache.hadoop:hadoop-aws:3.3.4,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.688"
        )
        # >>> MinIO-Parameter
        .config("spark.hadoop.fs.s3a.impl",
                "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.access.key",  minio_user)
        .config("spark.hadoop.fs.s3a.secret.key",  minio_pwd)
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")  # wenn MinIO nur HTTP spricht
        .getOrCreate()
)
Spark
Spark can be used in client mode (recommended for JupyterHub notebooks, as code is intended to be called in an interactive
fashion), which is the default, or cluster mode. This notebook uses spark in client mode, meaning that the notebook itself
acts as the driver. It is important that the versions of spark and python match across the driver (running in the juypyterhub image)
and the executor(s) (running in a separate image, specified below with the `spark.kubernetes.container.image` setting).

The jupyterhub image `quay.io/jupyter/pyspark-notebook:spark-3.5.2` uses a base ubuntu image (like the spark images).
The versions of java match exactly. Python versions can differ at patch level, and the image used below `oci.stackable.tech/demos/spark:3.5.2-python311` is built from a `spark:3.5.2-scala2.12-java17-ubuntu` base image with python 3.11 (the same major/minor version as the notebook) installed.

## S3
As we will be reading data from an S3 bucket, we need to add the necessary `hadoop` and `aws` libraries in the same hadoop version as the
notebook image (see `spark.jars.packages`), and define the endpoint settings (see `spark.hadoopo.fs.*`).

In [4]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .appName("local-with-s3a")
        .master("local[*]")
        # Treiber/JARs
        .config(
            "spark.jars.packages",
            "org.apache.hadoop:hadoop-aws:3.3.4,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.688"
        )
        # S3A / MinIO-Basics
        .config("spark.hadoop.fs.s3a.impl",
                "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.access.key",  minio_user) \
        .config("spark.hadoop.fs.s3a.secret.key",  minio_pwd) \
        # -------------------------------------------------
        # >>> S3-tauglicher Committer (Fix für rename-Fehler)
        # -------------------------------------------------
        .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
        .config("spark.hadoop.fs.s3a.committer.name", "directory") \
        .getOrCreate()
)

print(spark.range(3).collect())


[Row(id=0), Row(id=1), Row(id=2)]


In [5]:
df = spark.createDataFrame([("a", 1), ("b", 2)], ["col1", "col2"])
df.show()

+----+----+
|col1|col2|
+----+----+
|   a|   1|
|   b|   2|
+----+----+



In [6]:
# Manual S3 file check via pyarrow.fs
import pyarrow.fs as fs

s3 = fs.S3FileSystem(endpoint_override="http://minio:9000/", access_key=minio_user, secret_key=minio_pwd, scheme="http")
files = s3.get_file_info(fs.FileSelector("demo/gas-sensor/raw/", recursive=True))
for f in files:
    print("Found file:", f.path)

Found file: demo/gas-sensor/raw/20160930_203718.csv


In [7]:
df = spark.read.csv("s3a://demo/gas-sensor/raw/", header = True)
df.show()

+--------+------+--------+-----------+--------+-------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|timesecs| coppm|humidity|temperature|flowrate|heatervoltage|     r1|     r2|     r3|     r4|     r5|     r6|     r7|     r8|     r9|    r10|    r11|    r12|    r13|    r14|
+--------+------+--------+-----------+--------+-------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|  0.0000|0.0000| 49.7534|    23.7184|233.2737|       0.8993| 0.2231| 0.6365| 1.1493| 0.8483| 1.2534| 1.4449| 1.9906| 1.3303| 1.4480| 1.9148| 3.4651| 5.2144| 6.5806| 8.6385|
|  0.3090|0.0000| 55.8400|    26.6200|241.6323|       0.2112| 2.1314| 5.3552| 9.7569| 6.3188| 9.4472|10.5769|13.6317|21.9829|16.1902|24.2780|31.1014|34.7193|31.7505|41.9167|
|  0.6180|0.0000| 55.8400|    26.6200|241.3888|       0.2070|10.5318|22.5612|37.2635|17.7848|33.0704|36.3160|42.5746|49.7495|31.75

In [8]:
df.write.csv("s3a://demo/gas-sensor/rewritten/", mode="overwrite")
df.write.parquet("s3a://demo/gas-sensor/parquet/", mode="overwrite")

df2 = spark.read.parquet("s3a://demo/gas-sensor/parquet/", header = True)
df2.count()

295719

In [9]:
from pyspark.sql import functions

df2 = df2.withColumn("hour", (functions.floor(df2.timesecs / 60) + 1))

dfs = df2.select(
    df2.hour,
    df2.humidity,
    df2.temperature,
    df2.flowrate
).groupby("hour").agg(
    functions.round(functions.avg('humidity'), 2).alias('humidity'),
    functions.round(functions.avg('temperature'), 2).alias('temperature'),
    functions.round(functions.avg('flowrate'), 2).alias('flowrate')
).orderBy("hour")

dfs.show()

+----+--------+-----------+--------+
|hour|humidity|temperature|flowrate|
+----+--------+-----------+--------+
|   1|    55.4|      26.61|  240.05|
|   2|   54.84|      26.61|  239.94|
|   3|   54.38|      26.58|  239.98|
|   4|   53.95|      26.58|  239.99|
|   5|   53.82|      26.58|  240.02|
|   6|   53.46|      26.58|  239.96|
|   7|   53.25|      26.58|  239.98|
|   8|   52.85|      26.58|  239.95|
|   9|   52.81|      26.58|  240.03|
|  10|   52.81|      26.57|  240.02|
|  11|   52.81|      26.57|  239.98|
|  12|   52.67|      26.58|  240.02|
|  13|    52.3|      26.57|  240.03|
|  14|    52.3|      26.57|  240.03|
|  15|   52.29|      26.56|  239.97|
|  16|   51.22|      26.55|  239.96|
|  17|   49.02|      26.55|  239.98|
|  18|   47.44|      26.55|  240.01|
|  19|   46.51|      26.54|  239.97|
|  20|   45.69|      26.54|  239.97|
+----+--------+-----------+--------+
only showing top 20 rows



In [10]:
dfs.write.parquet("s3a://demo/gas-sensor/agg/", mode="overwrite")

### Convert between Spark and Pandas DataFrames

In [11]:
df_pandas = dfs.toPandas()
df_pandas.head(10)

Unnamed: 0,hour,humidity,temperature,flowrate
0,1,55.4,26.61,240.05
1,2,54.84,26.61,239.94
2,3,54.38,26.58,239.98
3,4,53.95,26.58,239.99
4,5,53.82,26.58,240.02
5,6,53.46,26.58,239.96
6,7,53.25,26.58,239.98
7,8,52.85,26.58,239.95
8,9,52.81,26.58,240.03
9,10,52.81,26.57,240.02


In [12]:
spark_df = spark.createDataFrame(df_pandas)
spark_df.show()

+----+--------+-----------+--------+
|hour|humidity|temperature|flowrate|
+----+--------+-----------+--------+
|   1|    55.4|      26.61|  240.05|
|   2|   54.84|      26.61|  239.94|
|   3|   54.38|      26.58|  239.98|
|   4|   53.95|      26.58|  239.99|
|   5|   53.82|      26.58|  240.02|
|   6|   53.46|      26.58|  239.96|
|   7|   53.25|      26.58|  239.98|
|   8|   52.85|      26.58|  239.95|
|   9|   52.81|      26.58|  240.03|
|  10|   52.81|      26.57|  240.02|
|  11|   52.81|      26.57|  239.98|
|  12|   52.67|      26.58|  240.02|
|  13|    52.3|      26.57|  240.03|
|  14|    52.3|      26.57|  240.03|
|  15|   52.29|      26.56|  239.97|
|  16|   51.22|      26.55|  239.96|
|  17|   49.02|      26.55|  239.98|
|  18|   47.44|      26.55|  240.01|
|  19|   46.51|      26.54|  239.97|
|  20|   45.69|      26.54|  239.97|
+----+--------+-----------+--------+
only showing top 20 rows

