In [None]:
import os

kubernetes_service_host = os.getenv("KUBERNETES_SERVICE_HOST")
kubernetes_service_port = os.getenv("KUBERNETES_SERVICE_PORT")
spark_driver_bind_address = os.getenv("SPARK_DRIVER_BIND_ADDRESS")
volume_type = os.getenv("VOLUME_TYPE")
volume_name = os.getenv("VOLUME_NAME")
mount_path = os.getenv("MOUNT_PATH")

print(f"{kubernetes_service_host=} {kubernetes_service_port=}")
print(f"{spark_driver_bind_address=}")
print(f"{volume_type=} {volume_name=}")
print(f"{mount_path=}")

warehouse_location = f"{mount_path}/spark-warehouse"
url = "jdbc:mysql://metastore-db/metastore"
driver = "com.mysql.cj.jdbc.Driver"
username = "root"
password = "my-secret-pw"

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master(f"k8s://https://{kubernetes_service_host}:{kubernetes_service_port}") \
    .config("spark.driver.host", spark_driver_bind_address) \
    .config("spark.kubernetes.container.image", "apache/spark-py:v3.3.1") \
    .config("spark.kubernetes.context", "minikube") \
    .config("spark.kubernetes.namespace", "spark-demo") \
    .config(f"spark.kubernetes.driver.volumes.{volume_type}.{volume_name}.mount.path", mount_path) \
    .config(f"spark.kubernetes.driver.volumes.{volume_type}.{volume_name}.options.path", mount_path) \
    .config(f"spark.kubernetes.executor.volumes.{volume_type}.{volume_name}.mount.path", mount_path) \
    .config(f"spark.kubernetes.executor.volumes.{volume_type}.{volume_name}.options.path", mount_path) \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("javax.jdo.option.ConnectionURL", url) \
    .config("javax.jdo.option.ConnectionDriverName", driver) \
    .config("javax.jdo.option.ConnectionUserName", username) \
    .config("javax.jdo.option.ConnectionPassword", password) \
    .appName("Test notebook") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
file_name = "/shared-folder/load_data_write_to_server.py"

lines = sc.textFile(file_name)

words = lines \
    .flatMap(lambda line: line.split(" ")) \
    .filter(lambda word: word)

#MapReduce
wordCount = words \
    .map(lambda word: (word,1)) \
    .reduceByKey(lambda n,m: n+m)

result = wordCount \
    .sortBy((lambda p: p[1]), ascending = False) # ordena por cantidad


In [None]:
local_result = result.collect() # Traigo desde cluster

for word, count in local_result[:10]: # tomo 10
    print(word, count) # los imprimo


In [None]:
spark.stop()