In [None]:
import pandas as pd
import matplotlib as plt
import numpy as np

In [None]:
import os
import json
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

os.environ["PYSPARK_PYTHON"] = "/srv/spark/.venv/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/srv/spark/.venv/bin/python"

def get_spark_session(app_name: str, conf_path: str = "spark.json") -> SparkSession:
    spark = SparkSession.builder.appName(app_name)
    with open(conf_path, "r") as conf_file:
        cfg = json.loads(conf_file.read())
        spark = spark.master(cfg["master"]) \
        .config("spark.authenticate", "true") \
        .config("spark.authenticate.secret", cfg["secret"]) \
        .config("spark.executor.memory", "8g")\
        .config('spark.sql.catalog.spark_catalog','org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
        .config("spark.sql.hive.convertMetastoreParquet", "false")\
        .config("spark.sql.catalogImplementation", "hive") \
        .config("spark.jars.packages","org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.2")\
        .config('spark.sql.warehouse.dir','/srv/storage/test_hudi')
    return spark.getOrCreate()

#.config('spark.sql.extensions','org.apache.spark.sql.hudi.HoodieSparkSessionExtension')\

spark = get_spark_session("Hudi_spark_test")

print("Spark Running")

In [None]:
# Créer un DataFrame avec les données à insérer dans la table Hudi
data = [
    (1, "John"),
    (2, "Alice"),
    (3, "Bob")
]
df = spark.createDataFrame(data, ["id", "name"])

# Spécifier les options de création de la table Hudi
hudi_options = {
    "hoodie.table.name": "my_hudi_table",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.partitionpath.field": "name",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE"
}

# Écrire le DataFrame dans la table Hudi
df.write.format("hudi").options(**hudi_options).mode("overwrite").save()


In [None]:
parquet_files = "/srv/parquetevents/mongo_parquet/reunion/2022-09-*-reunion.snappy.parquet"

df = spark.read.parquet(parquet_files).select(
    f.col("_id").alias("id"), "date", f.col("userId").alias("user_id"),
    f.col("profil").alias("profile"),f.col("event-type").alias("event_type"),
    "module","ua", f.col("resource-type").alias("resource_type"))

df.write.format("hudi").save("/srv/storage/tests_iceberg/table")


In [None]:
import pyspark
print(pyspark.__version__)


In [None]:
spark.stop()