In [None]:
# --- 1. Kütüphane Kurulumu ---
# TensorFlow ve Horovod'u kuruyoruz. Databricks kısıtlamalarını aşmak için --upgrade kullanıyoruz.
%pip install tensorflow --upgrade
%pip install horovod --upgrade

# Kütüphane kurulumundan sonra Python'ı yeniden başlatma (Zorunludur)
dbutils.library.restartPython()

In [None]:
# --- 2. Dağıtık Eğitim Kodu ---

import tensorflow as tf
from tensorflow import keras
import numpy as np
import horovod.tensorflow.keras as hvd
from horovod.spark import run # Horovod Runner için kritik
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, udf, col
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import FloatType

# Spark Oturumunu Başlatma
spark = SparkSession.builder.appName("HorovodMinimalExample").getOrCreate()
print(f"Spark Uygulama Adı: {spark.sparkContext.appName}")
print(f"TensorFlow Sürümü: {tf.__version__}")
print(f"Horovod Sürümü: {hvd.__version__}")

# --- 1. Veri Hazırlama (Spark DataFrame) ---

# Horovod'un Spark Runner'ı PySpark VectorDense formatında veri bekler.
def create_minimal_data(num_rows=10000):
    # Sentetik veri oluşturma: Tek özellik
    df = spark.range(num_rows).withColumn("feature_raw", rand(seed=42))

    # PySpark Vector ve Label formatına dönüştürme
    vector_udf = udf(lambda x: Vectors.dense([float(x)]), VectorUDT())

    @udf(returnType=FloatType())
    def create_label(feature_val):
        # Basit sınıflandırma: feature > 0.5 ise 1, değilse 0
        return float(feature_val > 0.5)

    processed_df = df.withColumn("features", vector_udf(col("feature_raw")))\
                     .withColumn("label", create_label(col("feature_raw")))\
                     .select("features", "label")

    # Veriyi Spark çekirdeklerine dağıtma (Shuffle)
    return processed_df.repartition(4) # Örnek olarak 4 bölüme ayırıyoruz

train_df = create_minimal_data()
print("\nSpark DataFrame Hazır.")
train_df.printSchema()

# --- 2. Dağıtık Çalıştırılacak Eğitim Fonksiyonunun Tanımlanması ---

def minimal_train_fn(learning_rate=0.001):
    """
    Horovod Runner tarafından her Spark Executor'da çalıştırılacak fonksiyon.
    """
    # 1. Horovod'u Başlatma (Her Worker'da Çalışır)
    hvd.init()

    # GPU/CPU Yapılandırması (GPU'lu küme varsayılırsa)
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        # Her worker'a farklı bir GPU atama
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
        tf.config.experimental.set_memory_growth(gpus[hvd.local_rank()], True)

    # 2. Model Tanımlama
    input_dim = 1
    model = keras.Sequential([
        keras.layers.Dense(32, activation='relu', input_shape=(input_dim,)),
        keras.layers.Dense(1, activation='sigmoid')
    ])

    # 3. Dağıtık Optimizatör
    # Öğrenme hızı ölçeklendirilir
    opt = keras.optimizers.Adam(learning_rate=learning_rate * hvd.size())
    opt = hvd.DistributedOptimizer(opt) # Gradien toplama (Allreduce)

    model.compile(
        optimizer=opt,
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    # 4. Geri Çağrımlar (Callbacks)
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
        hvd.callbacks.MetricAverageCallback(),
    ]

    if hvd.rank() == 0:
        print(f"Toplam Horovod Worker sayısı: {hvd.size()}")

    # Modeli ve geri çağrımları döndürme
    return model, callbacks

# --- 3. Horovod Runner ile Eğitimi Başlatma ---

num_processes = 4 # 4 Spark çekirdeği/worker kullan
print(f"\n--- Horovod Spark Runner {num_processes} Worker ile Başlatılıyor ---")

spark_model = run(
    minimal_train_fn,
    num_proc=num_processes,
    args=(0.001,), # train_fn'e öğrenme hızı parametresini gönderme
    data_frame=train_df,
    verbose=1,
    epochs=5,
    batch_size=128
)

print("\n--- Dağıtık Eğitim Tamamlandı ---")

# Eğitilmiş modeli kullanarak tahmin yapma
if spark_model:
    prediction_df = spark_model.transform(train_df)
    prediction_df.select("label", "prediction").show(5)

spark.stop()