# Büyük Veri Pipeline - B Seçeneği
## US Accidents Veri Analizi

**Mimari:** HDFS + Hive + Kafka + Spark + MongoDB

---

### Pipeline Bileşenleri:
1. Veri Alma (Kafka Producer)
2. Stream İşleme (Spark Streaming)
3. Veri Depolama (HDFS + Hive)
4. Veri Temizleme (PySpark)
5. kNN Sınıflandırma
6. K-Means Kümeleme
7. MongoDB Export

## 1. Kütüphaneler ve Bağlantılar

In [None]:
# Gerekli kütüphaneler
import os
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, sum as spark_sum

# MongoDB
from pymongo import MongoClient

# Görselleştirme ayarları
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 12

print("✓ Kütüphaneler yüklendi")

In [None]:
# Spark Session oluştur
spark = SparkSession.builder \
    .appName("BigData_Analysis") \
    .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("✓ Spark Session oluşturuldu")
print(f"Spark Version: {spark.version}")

In [None]:
# MongoDB bağlantısı
mongo_client = MongoClient("mongodb://admin:admin123@mongodb:27017/")
db = mongo_client['bigdata_pipeline']
print("✓ MongoDB bağlantısı kuruldu")
print(f"Collections: {db.list_collection_names()}")

## 2. HDFS'den Veri Okuma

In [None]:
# Temizlenmiş veriyi oku
HDFS_CLEANED_PATH = 'hdfs://namenode:9000/user/bigdata/accidents_cleaned'

df = spark.read.parquet(HDFS_CLEANED_PATH)
print(f"Toplam kayıt: {df.count():,}")
print(f"Sütun sayısı: {len(df.columns)}")

In [None]:
# Şemayı göster
df.printSchema()

In [None]:
# Örnek veriler
df.select('ID', 'Severity', 'City', 'State', 'Weather_Condition').show(10, truncate=False)

## 3. Hive Sorguları

In [None]:
# Hive veritabanlarını listele
spark.sql("SHOW DATABASES").show()

In [None]:
# Hive tabloları
spark.sql("USE bigdata_db")
spark.sql("SHOW TABLES").show()

In [None]:
# Severity dağılımı (Hive sorgusu)
severity_dist = spark.sql("""
    SELECT Severity, COUNT(*) as count 
    FROM bigdata_db.us_accidents 
    GROUP BY Severity 
    ORDER BY Severity
""")
severity_dist.show()

In [None]:
# En çok kaza olan eyaletler
state_accidents = spark.sql("""
    SELECT State, COUNT(*) as accident_count,
           ROUND(AVG(Severity), 2) as avg_severity
    FROM bigdata_db.us_accidents
    GROUP BY State
    ORDER BY accident_count DESC
    LIMIT 15
""")
state_accidents.show()

## 4. Veri Analizi ve Görselleştirme

In [None]:
# Pandas'a çevir (örneklem)
sample_size = 50000
pdf = df.sample(fraction=sample_size/df.count(), seed=42).toPandas()
print(f"Örneklem boyutu: {len(pdf):,}")

In [None]:
# Severity dağılımı görselleştirme
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Bar chart
severity_counts = pdf['Severity'].value_counts().sort_index()
colors = plt.cm.YlOrRd(np.linspace(0.3, 0.9, len(severity_counts)))
axes[0].bar(severity_counts.index, severity_counts.values, color=colors)
axes[0].set_xlabel('Severity')
axes[0].set_ylabel('Kayıt Sayısı')
axes[0].set_title('Kaza Ciddiyet Dağılımı')

# Pie chart
axes[1].pie(severity_counts.values, labels=[f'Severity {i}' for i in severity_counts.index],
           autopct='%1.1f%%', colors=colors, startangle=90)
axes[1].set_title('Ciddiyet Yüzdeleri')

plt.tight_layout()
plt.savefig('../results/visualizations/severity_distribution.png', dpi=150)
plt.show()

In [None]:
# Saatlik kaza dağılımı
if 'Hour' in pdf.columns:
    plt.figure(figsize=(14, 5))
    hourly = pdf.groupby('Hour').size()
    plt.bar(hourly.index, hourly.values, color='steelblue', alpha=0.8)
    plt.xlabel('Saat')
    plt.ylabel('Kaza Sayısı')
    plt.title('Saatlik Kaza Dağılımı')
    plt.xticks(range(24))
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig('../results/visualizations/hourly_accidents.png', dpi=150)
    plt.show()

In [None]:
# Hava durumu ve ciddiyet ilişkisi
if 'Weather_Category' in pdf.columns:
    plt.figure(figsize=(12, 6))
    weather_severity = pdf.groupby('Weather_Category')['Severity'].mean().sort_values(ascending=False)
    weather_severity.plot(kind='barh', color='coral')
    plt.xlabel('Ortalama Ciddiyet')
    plt.ylabel('Hava Durumu')
    plt.title('Hava Durumuna Göre Ortalama Kaza Ciddiyeti')
    plt.tight_layout()
    plt.savefig('../results/visualizations/weather_severity.png', dpi=150)
    plt.show()

In [None]:
# Coğrafi dağılım
if 'Start_Lat' in pdf.columns and 'Start_Lng' in pdf.columns:
    plt.figure(figsize=(14, 8))
    
    # Örneklem al
    sample = pdf.sample(n=min(5000, len(pdf)), random_state=42)
    
    scatter = plt.scatter(sample['Start_Lng'], sample['Start_Lat'], 
                         c=sample['Severity'], cmap='YlOrRd',
                         alpha=0.5, s=5)
    plt.colorbar(scatter, label='Severity')
    plt.xlabel('Boylam')
    plt.ylabel('Enlem')
    plt.title('Kazaların Coğrafi Dağılımı (ABD)')
    plt.tight_layout()
    plt.savefig('../results/visualizations/geographic_distribution.png', dpi=150)
    plt.show()

## 5. MongoDB Sonuçları

In [None]:
# kNN sonuçları
knn_results = db.model_results.find_one({'model': 'kNN'})
if knn_results:
    print("=" * 50)
    print("kNN SINIFLANDIRMA SONUÇLARI")
    print("=" * 50)
    print(f"Optimal k: {knn_results.get('optimal_k')}")
    print(f"\nMetrikler:")
    metrics = knn_results.get('metrics', {})
    print(f"  Accuracy: {metrics.get('accuracy', 'N/A'):.4f}")
    print(f"  Precision (Macro): {metrics.get('precision_macro', 'N/A'):.4f}")
    print(f"  Recall (Macro): {metrics.get('recall_macro', 'N/A'):.4f}")
    print(f"  F1-Score (Macro): {metrics.get('f1_macro', 'N/A'):.4f}")
    if 'auc_macro' in metrics:
        print(f"  AUC-ROC (Macro): {metrics.get('auc_macro'):.4f}")

In [None]:
# K-Means sonuçları
kmeans_results = db.model_results.find_one({'model': 'K-Means'})
if kmeans_results:
    print("=" * 50)
    print("K-MEANS KÜMELEME SONUÇLARI")
    print("=" * 50)
    print(f"Optimal k: {kmeans_results.get('optimal_k')}")
    print(f"\nMetrikler:")
    metrics = kmeans_results.get('metrics', {})
    print(f"  Silhouette Score: {metrics.get('silhouette_score', 'N/A'):.4f}")
    print(f"  Calinski-Harabasz: {metrics.get('calinski_harabasz', 'N/A'):.2f}")
    print(f"  Davies-Bouldin: {metrics.get('davies_bouldin', 'N/A'):.4f}")

In [None]:
# İstatistikler
stats = db.statistics.find_one({}, sort=[('created_at', -1)])
if stats:
    print("=" * 50)
    print("VERİ İSTATİSTİKLERİ")
    print("=" * 50)
    print(f"Toplam kayıt: {stats.get('total_records', 'N/A'):,}")
    
    if 'severity_distribution' in stats:
        print("\nSeverity Dağılımı:")
        for k, v in stats['severity_distribution'].items():
            print(f"  Severity {k}: {v:,}")
    
    if 'top_states' in stats:
        print("\nEn Çok Kaza Olan Eyaletler:")
        for k, v in list(stats['top_states'].items())[:5]:
            print(f"  {k}: {v:,}")

## 6. Kayıtlı Görselleştirmeler

In [None]:
# Kayıtlı görselleri listele
viz_path = '../results/visualizations/'
if os.path.exists(viz_path):
    files = os.listdir(viz_path)
    print("Kayıtlı Görselleştirmeler:")
    for f in files:
        if f.endswith('.png'):
            print(f"  - {f}")
else:
    print("Görselleştirme dizini bulunamadı")

In [None]:
# kNN sonuç görsellerini göster
from IPython.display import Image, display

knn_images = [
    'knn_confusion_matrix.png',
    'knn_roc_curves.png',
    'knn_k_optimization.png'
]

for img_name in knn_images:
    img_path = os.path.join(viz_path, img_name)
    if os.path.exists(img_path):
        print(f"\n{img_name}:")
        display(Image(filename=img_path))

In [None]:
# K-Means sonuç görsellerini göster
kmeans_images = [
    'kmeans_elbow_curve.png',
    'kmeans_clusters_2d.png',
    'kmeans_cluster_profiles.png',
    'kmeans_distribution.png',
    'kmeans_geographic.png'
]

for img_name in kmeans_images:
    img_path = os.path.join(viz_path, img_name)
    if os.path.exists(img_path):
        print(f"\n{img_name}:")
        display(Image(filename=img_path))

## 7. Pipeline Özeti

In [None]:
# Pipeline metadata
pipeline_meta = db.pipeline_metadata.find_one({}, sort=[('created_at', -1)])
if pipeline_meta:
    print("=" * 60)
    print("PIPELINE ÖZET")
    print("=" * 60)
    print(f"Pipeline: {pipeline_meta.get('pipeline_name')}")
    print(f"Versiyon: {pipeline_meta.get('version')}")
    print(f"Mimari: {pipeline_meta.get('architecture')}")
    print(f"\nBileşenler:")
    components = pipeline_meta.get('components', {})
    for k, v in components.items():
        print(f"  - {k}: {v}")
    print(f"\nİzlenen Metrikler:")
    for m in pipeline_meta.get('metrics_tracked', []):
        print(f"  - {m}")

In [None]:
# Bağlantıları kapat
spark.stop()
mongo_client.close()
print("\n✓ Bağlantılar kapatıldı")
print("\nNotebook tamamlandı!")