In [None]:
# PySpark ile CSV'den Yalan Haber Tespiti - Tam Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import DoubleType
from builtins import max

import pandas as pd

In [None]:
# 1. SPARK SESSION BAŞLATMA
# ========================
def initialize_spark():
    """Spark session başlatır - CSV için optimize edilmiş"""

    spark = SparkSession.builder \
        .appName("FakeNewsDetection_CSV") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .config("spark.driver.maxResultSize", "2g") \
        .config("spark.network.timeout", "300s") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    print("✅ Spark Session başlatıldı")
    print(f"📋 Spark Version: {spark.version}")

    return spark


In [None]:

# ========================
# 2. CSV VERİ YÜKLEME
# ========================
def load_csv_data(spark, csv_path):
    """
    CSV dosyasından veri yükler

    Args:
        spark: SparkSession
        csv_path: CSV dosyasının yolu

    Returns:
        DataFrame: text ve label kolonları içeren DataFrame
    """

    print(f"📖 CSV dosyası okunuyor: {csv_path}")

    try:
        # CSV'yi Spark DataFrame olarak oku
        df = spark.read \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("encoding", "UTF-8") \
            .option("multiline", "true") \
            .option("escape", '"') \
            .csv(csv_path)

        # Kolon isimlerini kontrol et ve standartlaştır
        columns = df.columns
        print(f"📋 Bulunan kolonlar: {columns}")

        # Kolon isimlerini temizle (boşluk, özel karakter kaldır)
        for col_name in columns:
            clean_name = col_name.strip().lower()
            if col_name != clean_name:
                df = df.withColumnRenamed(col_name, clean_name)

        # Kolon isimlerini standart hale getir
        columns = df.columns
        text_col = None
        label_col = None

        # Metin kolonu bulma
        for column in columns:
            if any(keyword in column.lower() for keyword in ['metin', 'text', ...]):
                text_col = column
                break


        # Etiket kolonu bulma
        for column in columns:
            if any(keyword in column.lower() for keyword in ['etiket', 'label']):
                label_col = column
                break

        if not text_col or not label_col:
            print(f"⚠️ Uygun kolonlar bulunamadı. Mevcut kolonlar: {columns}")
            print("💡 İlk kolonu metin, ikinci kolonu etiket olarak kullanacağım.")
            text_col = columns[0]
            label_col = columns[1]

        print(f"📝 Metin kolonu: {text_col}")
        print(f"🏷️ Etiket kolonu: {label_col}")

        # Kolonları yeniden adlandır
        df = df.select(
            col(text_col).alias("text"),
            col(label_col).alias("label_raw")
        )

        # Null değerleri filtrele
        df = df.filter(col("text").isNotNull() & col("label_raw").isNotNull())
        df = df.filter((col("text") != "") & (length(col("text")) > 10))

        # Etiketleri kontrol et ve dönüştür
        unique_labels = df.select("label_raw").distinct().collect()
        print(f"📊 Unique etiketler: {[row.label_raw for row in unique_labels]}")

        # String etiketleri sayısal değerlere çevir
        df = convert_labels_to_numeric(df)

        # Veri istatistikleri
        total_count = df.count()
        label_counts = df.groupBy("label").count().collect()

        print(f"✅ Toplam {total_count} haber yüklendi")
        for row in label_counts:
            label_name = "Doğru Haber" if row.label == 1 else "Yalan Haber"
            print(f"   - {label_name}: {row.count}")

        # DataFrame'i optimize et
        df = df.coalesce(4).cache()

        return df

    except Exception as e:
        print(f"❌ CSV okuma hatası: {e}")
        raise

def convert_labels_to_numeric(df):
    """Etiketleri sayısal değerlere çevirir"""

    # Önce etiket türünü kontrol et
    sample_labels = df.select("label_raw").limit(10).collect()
    sample_values = [row.label_raw for row in sample_labels]

    print(f"🔍 Örnek etiket değerleri: {sample_values}")

    # Eğer zaten sayısal ise
    if all(isinstance(val, (int, float)) for val in sample_values if val is not None):
        print("📊 Etiketler zaten sayısal")
        # 0 ve 1'e normalize et
        df = df.withColumn("label",
                          when(col("label_raw") > 0, 1).otherwise(0))
    else:
        # String etiketleri dönüştür
        print("🔄 String etiketler sayısal değerlere çevriliyor")

        # Yaygın etiket türleri için mapping
        df = df.withColumn("label",
            when(lower(col("label_raw")).isin(["true", "doğru", "gerçek", "real", "1", "dogru"]), 1)
            .when(lower(col("label_raw")).isin(["false", "yalan", "sahte", "fake", "0"]), 0)
            .otherwise(
                when(col("label_raw").cast("int").isNotNull(),
                     when(col("label_raw").cast("int") > 0, 1).otherwise(0))
                .otherwise(0)  # Bilinmeyen değerleri 0 yap
            )
        )

    return df.drop("label_raw")


In [None]:
# ========================
# 3. VERİ KEŞFİ VE ANALİZİ
# ========================
def explore_data(df):
    """Veri keşfi ve analizi"""

    print("\n" + "="*50)
    print("📊 VERİ KEŞFİ VE ANALİZİ")
    print("="*50)

    # Temel istatistikler
    print("📋 Temel İstatistikler:")
    df.describe().show()

    # Etiket dağılımı
    print("📊 Etiket Dağılımı:")
    df.groupBy("label").count().orderBy("label").show()

    # Metin uzunluk istatistikleri
    print("📏 Metin Uzunluk İstatistikleri:")
    df.select(
        length(col("text")).alias("text_length")
    ).describe().show()

    # Örnek metinler
    print("📝 Örnek Metinler:")
    sample_texts = df.select("text", "label").limit(3).collect()
    for i, row in enumerate(sample_texts, 1):
        label_name = "Doğru" if row.label == 1 else "Yalan"
        print(f"\n{i}. {label_name} Haber:")
        print(f"   {row.text[:200]}...")

    # Null değer kontrolü
    print("🔍 Null Değer Kontrolü:")
    null_counts = df.select(
        sum(col("text").isNull().cast("int")).alias("text_nulls"),
        sum(col("label").isNull().cast("int")).alias("label_nulls")
    ).collect()[0]

    print(f"   Text null: {null_counts.text_nulls}")
    print(f"   Label null: {null_counts.label_nulls}")


In [None]:
# ========================
# 4. METİN ÖN İŞLEME PİPELİNE
# ========================
def create_preprocessing_pipeline():
    """Metin önişleme pipeline'ı oluşturur"""

    print("🔧 Metin önişleme pipeline'ı oluşturuluyor...")

    # 1. Tokenizasyon
    tokenizer = Tokenizer(inputCol="text", outputCol="words")

    # 2. Stop words kaldırma (Türkçe + İngilizce)
    turkish_stopwords = [
        "ve", "ile", "bu", "şu", "o", "da", "de", "ta", "te", "ki", "mi", "mu", "mü",
        "için", "gibi", "kadar", "sonra", "önce", "üzere", "göre", "karşı", "rağmen",
        "bir", "iki", "üç", "dört", "beş", "altı", "yedi", "sekiz", "dokuz", "on",
        "ben", "sen", "biz", "siz", "onlar", "benim", "senin", "bizim", "sizin",
        "bunun", "şunun", "onun", "bunlar", "şunlar", "olan", "oldu", "olur", "olsa",
        "daha", "çok", "az", "en", "hem", "ya", "veya", "ama", "fakat", "ancak",
        "her", "hiç", "kendi", "tüm", "bütün", "hangi", "nasıl", "neden", "niçin"
    ]

    english_stopwords = StopWordsRemover.loadDefaultStopWords("english")
    all_stopwords = list(set(turkish_stopwords + english_stopwords))

    stop_remover = StopWordsRemover(
        inputCol="words",
        outputCol="filtered_words",
        stopWords=all_stopwords
    )

    # 3. TF-IDF
    hashing_tf = HashingTF(
        inputCol="filtered_words",
        outputCol="raw_features",
        numFeatures=20000  # Özellik sayısını artırdık
    )

    idf = IDF(
        inputCol="raw_features",
        outputCol="features",
        minDocFreq=2  # En az 2 dokümanda geçen terimleri al
    )

    # Pipeline oluştur
    preprocessing_pipeline = Pipeline(stages=[
        tokenizer,
        stop_remover,
        hashing_tf,
        idf
    ])

    print("✅ Metin önişleme pipeline'ı oluşturuldu")
    return preprocessing_pipeline


In [None]:

# ========================
# 5. MODEL EĞİTİMİ
# ========================
def train_logistic_regression(train_df):
    """Logistic Regression modeli eğitir"""
    lr = LogisticRegression(
        featuresCol="features",
        labelCol="label",
        maxIter=100,
        regParam=0.1,
        elasticNetParam=0.8,
        family="binomial"
    )

    print("🔄 Logistic Regression eğitiliyor...")
    lr_model = lr.fit(train_df)
    print("✅ Logistic Regression eğitimi tamamlandı")

    return lr_model

def train_naive_bayes(train_df):
    """Naive Bayes modeli eğitir"""
    nb = NaiveBayes(
        featuresCol="features",
        labelCol="label",
        smoothing=1.0,
        modelType="multinomial"
    )

    print("🔄 Naive Bayes eğitiliyor...")
    nb_model = nb.fit(train_df)
    print("✅ Naive Bayes eğitimi tamamlandı")

    return nb_model


In [None]:

# ========================
# 6. MODEL DEĞERLENDİRME
# ========================
def evaluate_model(model, test_df, model_name):
    """Model performansını değerlendirir"""

    print(f"\n🔍 {model_name} modeli değerlendiriliyor...")

    # Tahmin yap
    predictions = model.transform(test_df)

    # Binary evaluator (AUC)
    binary_evaluator = BinaryClassificationEvaluator(
        labelCol="label",
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    )
    auc = binary_evaluator.evaluate(predictions)

    # Multiclass evaluator
    multi_evaluator = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction"
    )

    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
    f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

    print(f"\n📊 {model_name} Model Sonuçları:")
    print(f"   AUC-ROC: {auc:.4f}")
    print(f"   Accuracy: {accuracy:.4f}")
    print(f"   Precision: {precision:.4f}")
    print(f"   Recall: {recall:.4f}")
    print(f"   F1-Score: {f1:.4f}")

    # Confusion Matrix
    print(f"\n📊 {model_name} Confusion Matrix:")
    confusion_matrix = predictions.groupBy("label", "prediction").count().orderBy("label", "prediction")
    confusion_matrix.show()

    # Detaylı sınıf bazlı metrikler
    print(f"\n📊 {model_name} Sınıf Bazlı Metrikler:")
    for label_val in [0, 1]:
        label_name = "Yalan Haber" if label_val == 0 else "Doğru Haber"

        tp = predictions.filter((col("label") == label_val) & (col("prediction") == label_val)).count()
        fp = predictions.filter((col("label") != label_val) & (col("prediction") == label_val)).count()
        fn = predictions.filter((col("label") == label_val) & (col("prediction") != label_val)).count()
        tn = predictions.filter((col("label") != label_val) & (col("prediction") != label_val)).count()

        precision_class = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall_class = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_class = 2 * (precision_class * recall_class) / (precision_class + recall_class) if (precision_class + recall_class) > 0 else 0

        print(f"   {label_name}:")
        print(f"      Precision: {precision_class:.4f}")
        print(f"      Recall: {recall_class:.4f}")
        print(f"      F1-Score: {f1_class:.4f}")

    return {
        "model_name": model_name,
        "auc": auc,
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1
    }


In [None]:
# ========================
# 7. MODEL KAYDETME VE YÜKLEME
# ========================
def save_complete_pipeline(preprocessing_model, ml_model, model_path):
    """Tüm pipeline'ı kaydeder"""

    import os
    os.makedirs(model_path, exist_ok=True)

    # Preprocessing pipeline'ı kaydet
    preprocessing_path = f"{model_path}/preprocessing_pipeline"
    preprocessing_model.write().overwrite().save(preprocessing_path)

    # ML modelini kaydet
    ml_model_path = f"{model_path}/ml_model"
    ml_model.write().overwrite().save(ml_model_path)

    print(f"✅ Tüm pipeline kaydedildi: {model_path}")

def load_complete_pipeline(spark, model_path):
    """Kaydedilmiş pipeline'ı yükler"""
    from pyspark.ml import PipelineModel
    from pyspark.ml.classification import LogisticRegressionModel, NaiveBayesModel

    preprocessing_path = f"{model_path}/preprocessing_pipeline"
    ml_model_path = f"{model_path}/ml_model"

    # Pipeline'ı yükle
    preprocessing_pipeline = PipelineModel.load(preprocessing_path)

    # ML modelini yükle
    try:
        ml_model = LogisticRegressionModel.load(ml_model_path)
        model_type = "LogisticRegression"
    except:
        try:
            ml_model = NaiveBayesModel.load(ml_model_path)
            model_type = "NaiveBayes"
        except Exception as e:
            raise Exception(f"Model yüklenemedi: {e}")

    print(f"✅ Pipeline yüklendi: {model_type}")
    return preprocessing_pipeline, ml_model, model_type


In [None]:
# ========================
# 8. YENİ METİN TAHMİNİ
# ========================
def predict_single_text(spark, preprocessing_pipeline, ml_model, text):
    """Tek bir metin için tahmin yapar"""

    # DataFrame oluştur
    new_data = spark.createDataFrame([(text,)], ["text"])

    # Önişleme uygula
    processed_data = preprocessing_pipeline.transform(new_data)

    # Tahmin yap
    prediction = ml_model.transform(processed_data)

    # Sonucu al
    result = prediction.select("text", "prediction", "probability").collect()[0]

    pred_label = int(result.prediction)
    probability = result.probability.toArray()
    confidence = max(probability)

    news_type = "DOĞRU HABER" if pred_label == 1 else "YALAN HABER"

    print(f"\n🔍 Tahmin Sonucu:")
    print(f"   Metin: {text[:150]}...")
    print(f"   Sonuç: {news_type}")
    print(f"   Güven: {confidence:.4f}")
    print(f"   Olasılıklar:")
    print(f"      Yalan Haber: {probability[0]:.4f}")
    print(f"      Doğru Haber: {probability[1]:.4f}")

    return pred_label, confidence


In [None]:

# ========================
# 9. BATCH TAHMİN
# ========================
def predict_batch_texts(spark, preprocessing_pipeline, ml_model, texts):
    """Birden fazla metin için toplu tahmin"""

    # DataFrame oluştur
    texts_df = spark.createDataFrame([(text,) for text in texts], ["text"])

    # Önişleme uygula
    processed_df = preprocessing_pipeline.transform(texts_df)

    # Tahmin yap
    predictions_df = ml_model.transform(processed_df)

    # Sonuçları al
    results = predictions_df.select("text", "prediction", "probability").collect()

    print(f"\n🔍 Toplu Tahmin Sonuçları ({len(results)} metin):")
    print("-" * 80)

    batch_results = []
    for i, result in enumerate(results, 1):
        pred_label = int(result.prediction)
        probability = result.probability.toArray()
        confidence = max(probability)
        news_type = "DOĞRU" if pred_label == 1 else "YALAN"

        print(f"{i}. Metin: {result.text[:100]}...")
        print(f"   Sonuç: {news_type} HABER (Güven: {confidence:.4f})")

        batch_results.append({
            "text": result.text,
            "prediction": pred_label,
            "confidence": confidence,
            "probabilities": probability
        })

    return batch_results


In [None]:

# ========================
# 10. ANA FONKSİYON
# ========================
def main():
    """Ana çalıştırma fonksiyonu"""

    # Spark başlat
    spark = initialize_spark()

    try:
        # 1. CSV veri yükleme
        csv_path = "etiketli_haberler.csv"  # CSV dosyasının yolu
        df = load_csv_data(spark, csv_path)

        # 2. Veri keşfi
        explore_data(df)

        # 3. Train-Test split
        print("\n" + "="*50)
        print("📊 VERİ BÖLME")
        print("="*50)

        train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

        train_count = train_df.count()
        test_count = test_df.count()

        print(f"📊 Eğitim seti: {train_count}")
        print(f"📊 Test seti: {test_count}")

        # Eğitim setinde etiket dağılımı
        print("\n📊 Eğitim Seti Etiket Dağılımı:")
        train_df.groupBy("label").count().orderBy("label").show()

        # 4. Önişleme pipeline'ı
        print("\n" + "="*50)
        print("🔧 ÖNİŞLEME PİPELİNE")
        print("="*50)

        preprocessing_pipeline = create_preprocessing_pipeline()
        preprocessing_model = preprocessing_pipeline.fit(train_df)

        # Önişlemeyi uygula
        train_processed = preprocessing_model.transform(train_df).cache()
        test_processed = preprocessing_model.transform(test_df).cache()

        print("✅ Önişleme tamamlandı")

        # 5. Model eğitimi ve karşılaştırma
        print("\n" + "="*50)
        print("🎯 MODEL EĞİTİMİ VE KARŞILAŞTIRMA")
        print("="*50)

        # Logistic Regression
        lr_model = train_logistic_regression(train_processed)
        lr_results = evaluate_model(lr_model, test_processed, "Logistic Regression")

        # Naive Bayes
        nb_model = train_naive_bayes(train_processed)
        nb_results = evaluate_model(nb_model, test_processed, "Naive Bayes")

        # 6. En iyi modeli seç
        print("\n" + "="*50)
        print("🏆 MODEL KARŞILAŞTIRMA")
        print("="*50)

        models_comparison = [lr_results, nb_results]
        best_result = max(models_comparison, key=lambda x: x["f1"])
        best_model = lr_model if best_result["model_name"] == "Logistic Regression" else nb_model

        print(f"🏆 En iyi model: {best_result['model_name']}")
        print(f"   F1-Score: {best_result['f1']:.4f}")
        print(f"   Accuracy: {best_result['accuracy']:.4f}")
        print(f"   AUC-ROC: {best_result['auc']:.4f}")

        # 7. Model kaydetme
        import traceback

        print("\n" + "="*50)
        print("💾 MODEL KAYDETME")
        print("="*50)

        model_save_path = "./fake_news_model"

        try:
            save_complete_pipeline(preprocessing_model, best_model, model_save_path)
        except Exception as e:
            print("❌ Hata oluştu:")
            traceback.print_exc()  # Hata detaylarını tam gösterir

        # 8. Kaydedilmiş modeli test et
        print("\n" + "="*50)
        print("🔄 KAYDEDİLMİŞ MODEL TESTİ")
        print("="*50)

        loaded_preprocessing, loaded_model, model_type = load_complete_pipeline(spark, model_save_path)

        # 9. Örnek tahminler
        print("\n" + "="*50)
        print("🧪 ÖRNEK TAHMİNLER")
        print("="*50)

        sample_texts = [
            "Cumhurbaşkanı bugün yeni ekonomik reformları açıkladı ve enflasyonla mücadele planını duyurdu.",
            "Şok eden iddia: 5G kulelerinin koronavirüs yaydığı bilimsel olarak kanıtlandı!",
            "Türkiye'nin ihracat rakamları geçen yıla göre yüzde 15 artış gösterdi.",
            "İnanılmaz keşif: Bu bitki suyu içerek 1 ayda 20 kilo verebilirsiniz!",
            "Merkez Bankası faiz oranlarını değiştirmeme kararı aldı.",
            "Uzmanlar uyarıyor: Cep telefonu kullanımı beyin kanserine neden oluyor!"
        ]

        # Tekli tahminler
        for i, text in enumerate(sample_texts, 1):
            print(f"\n--- Örnek {i} ---")
            predict_single_text(spark, loaded_preprocessing, loaded_model, text)

        # Toplu tahmin
        print("\n" + "="*50)
        print("📊 TOPLU TAHMİN TESTİ")
        print("="*50)

        batch_results = predict_batch_texts(spark, loaded_preprocessing, loaded_model, sample_texts[:3])

        print("\n✅ Tüm işlemler başarıyla tamamlandı!")


    except Exception as e:
        print(f"❌ Hata: {e}")
        import traceback
        traceback.print_exc()


    finally:
        # Spark'ı kapat
        spark.stop()
        print("\n🔄 Spark session sonlandırıldı")


if __name__ == "__main__":
    main()

✅ Spark Session başlatıldı
📋 Spark Version: 3.5.1
📖 CSV dosyası okunuyor: etiketli_haberler.csv
📋 Bulunan kolonlar: ['metin', 'etiket']
📝 Metin kolonu: metin
🏷️ Etiket kolonu: etiket
📊 Unique etiketler: [1, 0]
🔍 Örnek etiket değerleri: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
📊 Etiketler zaten sayısal
✅ Toplam 4458 haber yüklendi
   - Doğru Haber: <built-in method count of Row object at 0x7af8680efdd0>
   - Yalan Haber: <built-in method count of Row object at 0x7af8680ec180>

📊 VERİ KEŞFİ VE ANALİZİ
📋 Temel İstatistikler:
+-------+--------------------+-------------------+
|summary|                text|              label|
+-------+--------------------+-------------------+
|  count|                4458|               4458|
|   mean|                NULL| 0.5148048452220727|
| stddev|                NULL|0.49983683229829845|
|    min|a haber kar yağış...|                  0|
|    max|şırnaktan acı hab...|                  1|
+-------+--------------------+-------------------+

📊 Etiket Dağılımı:
+--

In [18]:
!zip -r fake_news_model.zip fake_news_model


  adding: fake_news_model/ (stored 0%)
  adding: fake_news_model/ml_model/ (stored 0%)
  adding: fake_news_model/ml_model/metadata/ (stored 0%)
  adding: fake_news_model/ml_model/metadata/part-00000 (deflated 48%)
  adding: fake_news_model/ml_model/metadata/_SUCCESS (stored 0%)
  adding: fake_news_model/ml_model/metadata/.part-00000.crc (stored 0%)
  adding: fake_news_model/ml_model/metadata/._SUCCESS.crc (stored 0%)
  adding: fake_news_model/ml_model/data/ (stored 0%)
  adding: fake_news_model/ml_model/data/part-00000-1fdc2f49-5e3e-4a87-aec2-a105ca894ca5-c000.snappy.parquet (deflated 9%)
  adding: fake_news_model/ml_model/data/.part-00000-1fdc2f49-5e3e-4a87-aec2-a105ca894ca5-c000.snappy.parquet.crc (stored 0%)
  adding: fake_news_model/ml_model/data/_SUCCESS (stored 0%)
  adding: fake_news_model/ml_model/data/._SUCCESS.crc (stored 0%)
  adding: fake_news_model/preprocessing_pipeline/ (stored 0%)
  adding: fake_news_model/preprocessing_pipeline/stages/ (stored 0%)
  adding: fake_news_m

In [19]:
from google.colab import files
files.download("fake_news_model.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>