In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, expr

from dotenv import load_dotenv
import os

# Charger les variables d’environnement (.env)
load_dotenv()

# Récupérer les identifiants depuis .env
minio_endpoint = os.getenv("MINIO_ENDPOINT")
minio_access_key = os.getenv("MINIO_ACCESS_KEY")
minio_secret_key = os.getenv("MINIO_SECRET_KEY")
bucket = os.getenv("MINIO_BUCKET")

# Initialisation de la SparkSession
spark = SparkSession.builder \
    .appName("TransformEcommerceData") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", f"http://{minio_endpoint}") \
    .config("spark.hadoop.fs.s3a.access.key", minio_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", minio_secret_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [26]:
# 1️⃣ Lecture brute
raw_df = spark.read \
    .option("encoding", "ISO-8859-1") \
    .text(f"s3a://{bucket}/raw/ecommerce/ecommerce.csv")

# 2️⃣ Supprimer les lignes parasites (SET avec espaces, BOM, etc.)
filtered_df = raw_df.filter(~col("value").rlike(r"(?i)^.*\bSET\b.*")) \
                    .filter(col("value").isNotNull()) \
                    .filter(~col("value").rlike(r"^\s*$"))

# 3️⃣ Split manuel sur les virgules
split_df = filtered_df.withColumn("splitted", split(col("value"), ","))

# 4️⃣ Créer les vraies colonnes (on repart uniquement de 'split_df')
df = split_df.select(
    col("splitted")[0].alias("InvoiceNo"),
    col("splitted")[1].alias("StockCode"),
    col("splitted")[2].alias("Description"),
    col("splitted")[3].cast("int").alias("Quantity"),
    col("splitted")[4].alias("InvoiceDate"),
    col("splitted")[5].cast("float").alias("UnitPrice"),
    col("splitted")[6].alias("CustomerID"),
    col("splitted")[7].alias("Country")
)

# 5️⃣ Aperçu
df.printSchema()
df.show(5, truncate=False)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+----------------------------------+--------+--------------+---------+----------+--------------+
|565541   |20972    |PINK CREAM FELT CRAFT TRINKET BOX |3       |9/5/2011 12:00|1.25     |14159     |United Kingdom|
|565541   |22620    |4 TRADITIONAL SPINNING TOPS       |2       |9/5/2011 12:00|1.45     |14159     |United Kingdom|
|565541   |22621    |TRADITIONAL KNITTING NANCY        |2       |9/5/2011 12:00

In [27]:
# 1️⃣ Exemple de transformation simple : total_price
df_transformed = df.withColumn(
    "TotalPrice", (col("Quantity") * col("UnitPrice")).cast("float")
)

# 2️⃣ Affichage
df_transformed.printSchema()
df_transformed.show(5, truncate=False)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- TotalPrice: float (nullable = true)

+---------+---------+----------------------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |TotalPrice|
+---------+---------+----------------------------------+--------+--------------+---------+----------+--------------+----------+
|565541   |20972    |PINK CREAM FELT CRAFT TRINKET BOX |3       |9/5/2011 12:00|1.25     |14159     |United Kingdom|3.75      |
|565541   |22620    |4 TRADITIONAL SPINNING TOPS       |2       |9/5/2011 12:00|1.45     |14159     |United Kin

In [28]:
df_transformed.write \
    .mode("overwrite") \
    .parquet("s3a://datalake/processed/ecommerce_parquet/")

In [29]:
df_transformed.write \
    .format("delta") \
    .mode("overwrite") \
    .save("s3a://datalake/processed/ecommerce_delta/")

Py4JJavaError: An error occurred while calling o513.save.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormatWriter$Empty2Null
	at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:218)
	at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:215)
	at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:231)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:92)
	at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:171)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormatWriter$Empty2Null
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	... 46 more
