# Wasabi S3 Storage with PySpark

This notebook demonstrates how to read and write data to Wasabi S3-compatible storage using PySpark.

## Prerequisites
- Wasabi credentials configured in `.env` file
- Hadoop AWS JARs installed (automatically included in Docker image)

In [1]:
from pyspark.sql import SparkSession
import os

# Get Wasabi credentials from environment
wasabi_endpoint = os.getenv('WASABI_ENDPOINT', 's3.wasabisys.com')
wasabi_access_key = os.getenv('WASABI_ACCESS_KEY')
wasabi_secret_key = os.getenv('WASABI_SECRET_KEY')

# Create a Spark session with S3A configuration
# Here some problems with compatibility of versions of hadoop(?)
# so some time values by default stored like 60s, but it trying to read
# it in ms, so i it can't convert '60s' to number and spark nodes down,
# so need to set it as numbers
spark = SparkSession.builder \
    .appName("Wasabi S3 Example") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", wasabi_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", wasabi_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", wasabi_secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.connection.establish.timeout", "30000") \
    .config("spark.hadoop.fs.s3a.connection.timeout", "200000") \
    .config("spark.hadoop.fs.s3a.connection.request.timeout", "0") \
    .config("spark.hadoop.fs.s3a.connection.ttl", "300000") \
    .config("spark.hadoop.fs.s3a.retry.throttle.interval", "1000") \
    .config("spark.hadoop.fs.s3a.threads.keepalivetime", "60") \
    .config("spark.hadoop.fs.s3a.assumed.role.session.duration", "1800000") \
    .config("spark.hadoop.fs.s3a.multipart.purge.age", "86400000") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"S3A endpoint: {wasabi_endpoint}")
print(f"S3A configured: ✓")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/05 20:20:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.0.0
S3A endpoint: s3.us-east-2.wasabisys.com
S3A configured: ✓


## Create Sample Data

In [2]:
# Create a sample DataFrame
data = [
    (1, "Alice", 25, "Engineering"),
    (2, "Bob", 30, "Marketing"),
    (3, "Charlie", 35, "Sales"),
    (4, "Diana", 28, "Engineering"),
    (5, "Eve", 32, "HR")
]

columns = ["id", "name", "age", "department"]
df = spark.createDataFrame(data, columns)

df.show()
df.printSchema()



+---+-------+---+-----------+
| id|   name|age| department|
+---+-------+---+-----------+
|  1|  Alice| 25|Engineering|
|  2|    Bob| 30|  Marketing|
|  3|Charlie| 35|      Sales|
|  4|  Diana| 28|Engineering|
|  5|    Eve| 32|         HR|
+---+-------+---+-----------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- department: string (nullable = true)



                                                                                

## Write Data to Wasabi

Replace `your-bucket-name` with your actual Wasabi bucket name.

In [3]:
# Write to Wasabi in Parquet format
# NOTE: Replace 'your-bucket-name' with your actual bucket name
bucket_name = "dfscrunch-data-lake"
output_path = f"s3a://{bucket_name}/examples/employees"

print(f"Writing data to: {output_path}")

df.write \
    .mode("overwrite") \
    .parquet(output_path)

print("Data written successfully!")

Writing data to: s3a://dfscrunch-data-lake/examples/employees


25/10/05 20:20:13 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/10/05 20:20:15 WARN Base64: JAXB is unavailable. Will fallback to SDK implementation which may be less performant.If you are using Java 9+, you will need to include javax.xml.bind:jaxb-api as a dependency.
                                                                                

Data written successfully!


## Read Data from Wasabi

In [4]:
# Read the data back from Wasabi
print(f"Reading data from: {output_path}")

df_read = spark.read.parquet(output_path)

print("\nData read from Wasabi:")
df_read.show()

print(f"\nTotal records: {df_read.count()}")

Reading data from: s3a://dfscrunch-data-lake/examples/employees

Data read from Wasabi:


                                                                                

+---+-------+---+-----------+
| id|   name|age| department|
+---+-------+---+-----------+
|  1|  Alice| 25|Engineering|
|  4|  Diana| 28|Engineering|
|  3|Charlie| 35|      Sales|
|  2|    Bob| 30|  Marketing|
|  5|    Eve| 32|         HR|
+---+-------+---+-----------+






Total records: 5


                                                                                

## Perform Transformations and Write Back

In [5]:
# Filter and transform data
from pyspark.sql.functions import col, upper

df_engineers = df_read \
    .filter(col("department") == "Engineering") \
    .withColumn("name_upper", upper(col("name")))

print("Filtered Engineers:")
df_engineers.show()

# Write filtered data to a different path
engineers_path = f"s3a://{bucket_name}/examples/engineers"
print(f"\nWriting engineers data to: {engineers_path}")

df_engineers.write \
    .mode("overwrite") \
    .parquet(engineers_path)

print("Engineers data written successfully!")

Filtered Engineers:


                                                                                

+---+-----+---+-----------+----------+
| id| name|age| department|name_upper|
+---+-----+---+-----------+----------+
|  1|Alice| 25|Engineering|     ALICE|
|  4|Diana| 28|Engineering|     DIANA|
+---+-----+---+-----------+----------+


Writing engineers data to: s3a://dfscrunch-data-lake/examples/engineers


                                                                                

Engineers data written successfully!


## Write in Different Formats

In [None]:
# Write as CSV
csv_path = f"s3a://{bucket_name}/examples/employees_csv"
df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(csv_path)
print(f"CSV written to: {csv_path}")

# Write as JSON
json_path = f"s3a://{bucket_name}/examples/employees_json"
df.write \
    .mode("overwrite") \
    .json(json_path)
print(f"JSON written to: {json_path}")

25/10/05 20:21:37 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

## Error Handling Example

In [None]:
# Example of handling missing files
try:
    non_existent_path = f"s3a://{bucket_name}/non-existent-path"
    df_test = spark.read.parquet(non_existent_path)
except Exception as e:
    print(f"Expected error: {type(e).__name__}")
    print(f"Message: {str(e)[:100]}...")

## Cleanup

In [None]:
# Stop Spark session
spark.stop()
print("Spark session stopped")