In [None]:
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import rand

In [None]:
conf = SparkConf()

#url par défaut d'une api kubernetes accédé depuis l'intérieur du cluster (ici le notebook tourne lui même dans kubernetes)
conf.setMaster("k8s://https://kubernetes.default.svc:443")

#image des executors spark: pour des raisons de simplicité on réutilise l'image du notebook
conf.set("spark.kubernetes.container.image", os.environ['IMAGE_NAME'])

# Nom du compte de service pour contacter l'api kubernetes : attention le package du datalab crée lui même cette variable d'enviromment.
# Dans un pod du cluster kubernetes il faut lire le fichier /var/run/secrets/kubernetes.io/serviceaccount/token
# Néanmoins ce paramètre est inutile car le contexte kubernetes local de ce notebook est préconfiguré
# conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT']) 

# Nom du namespace kubernetes
conf.set("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE'])

# Nombre d'executeur spark, il se lancera autant de pods kubernetes que le nombre indiqué.
conf.set("spark.executor.instances", "5")

# Mémoire alloué à la JVM
# Attention par défaut le pod kubernetes aura une limite supérieur qui dépend d'autres paramètres.
# On manipulera plus bas pour vérifier la limite de mémoire totale d'un executeur
conf.set("spark.executor.memory", "4g")

conf.set("spark.kubernetes.driver.pod.name", os.environ['KUBERNETES_POD_NAME'])

#ici pour gérer le dateTimeFormatter dépendant de la verion de java...
conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

# pour spark-history
conf.set("spark.history.fs.logDirectory", "s3a://auredisanto/spark-history")
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.dir", "s3a://auredisanto/spark-history")

spark = SparkSession.builder.appName("SparkHistoryUsage").config(conf = conf).getOrCreate()

In [None]:
parquet_df1 = spark.read.parquet("s3a://auredisanto/data-exemples-avantages-spark/fichier1.parquet")
parquet_df1.createOrReplaceTempView("fichier1")

In [None]:
spark.stop()