In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=2910b794de162fe8b2b8063d273ff514d46a138e0d463165f7265a18ff6cf47a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import firebase_admin
from firebase_admin import credentials, db
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LinearSVC, OneVsRest
from sklearn.neighbors import KNeighborsClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import PCA
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt
import pandas as pd
from google.colab import drive
from sklearn.metrics import accuracy_score
import numpy as np
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# Firebase bağlantısı
cred = credentials.Certificate('/content/drive/My Drive/BigData/Blog_Veri_Analizi/buyukveri.json')
firebase_admin.initialize_app(cred, {
    'databaseURL': 'https://buyukveri-5d9ba-default-rtdb.europe-west1.firebasedatabase.app/'})
ref = db.reference("/")

# Firebase'den verileri çekme fonksiyonu
def verileri_firebase_cek(collection_name):
    dokumanlar = ref.child(collection_name).get().values()
    veriler = [(dok['id'], dok['makale'], dok['tur']) for dok in dokumanlar]
    return veriler

In [9]:
# Spark session başlatma
spark = SparkSession.builder \
    .appName("TF-IDF Metin Sınıflandırma") \
    .getOrCreate()

veriler = verileri_firebase_cek("Makaleler")
df = spark.createDataFrame(veriler, ["id", "metin", "etiketler"])

df = df.withColumn("metin_kucuk", lower(col("metin")))

turkce_baglaclar = set(
    ["ve", "veya", "ile", "çünkü", "ama", "fakat", "ancak", "dolayısıyla", "çünkü", "ancak", "gibi"])
kelime_tokenizer = Tokenizer(inputCol="metin_kucuk", outputCol="kelimeler")
baglac_silici = StopWordsRemover(inputCol="kelimeler", outputCol="suzulmus_kelimeler").setStopWords(
    list(turkce_baglaclar))

# TF-IDF işlemleri
hashingTF = HashingTF(inputCol="suzulmus_kelimeler", outputCol="ham_veri", numFeatures=100)
idf = IDF(inputCol="ham_veri", outputCol="veri")

etiket_indexer = StringIndexer(inputCol="etiketler", outputCol="etiket")



 **Logistic Regression sınıflandırıcısı**

In [10]:
log_reg = LogisticRegression(featuresCol="veri", labelCol="etiket")

# Pipeline oluşturma
pipeline = Pipeline(stages=[kelime_tokenizer, baglac_silici, hashingTF, idf, etiket_indexer, log_reg])

model = pipeline.fit(df)
processed_df = model.transform(df)

evaluator = MulticlassClassificationEvaluator(labelCol="etiket", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(processed_df)
print(f"Accuracy: {accuracy}")


# Sonuçları parquet dosyasına kaydetme
processed_df.select("id", "veri", "etiket", "prediction").write.mode("overwrite").parquet("blog_veri.parquet")


df_sonuc = spark.read.parquet("blog_veri.parquet")
df_sonuc.show(truncate=False)


accuracies = []

# Modeli farklı veri oranlarında eğitip doğrulukları toplama
data_splits = [0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 1.0]
for split in data_splits:
    train_data, test_data = df.randomSplit([split, 1 - split], seed=60)
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)
    accuracy = evaluator.evaluate(predictions)
    accuracies.append(accuracy)

# Accuracy grafiği çizme
plt.figure(figsize=(10, 5))
plt.plot(data_splits, accuracies, marker='o')
plt.title('Doğruluk Grafiği')
plt.xlabel('Eğitim Verisi Oranı')
plt.ylabel('Doğruluk')
plt.grid(True)
plt.show()

Accuracy: 1.0
+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

**Decision Tree Classifier sınıflandırıcısı**

In [11]:
dt = DecisionTreeClassifier(featuresCol="veri", labelCol="etiket")

pipeline = Pipeline(stages=[kelime_tokenizer, baglac_silici, hashingTF, idf, etiket_indexer, dt])

model = pipeline.fit(df)
processed_df = model.transform(df)


evaluator = MulticlassClassificationEvaluator(labelCol="etiket", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(processed_df)
print(f"Accuracy: {accuracy}")


processed_df.select("id", "veri", "etiket", "prediction").write.mode("overwrite").parquet("blog_veri.parquet")

df_sonuc = spark.read.parquet("blog_veri.parquet")


accuracies = []
data_splits = [0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95]

for split in data_splits:
    train_data, test_data = df.randomSplit([split, 1 - split], seed=60)
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)
    accuracy = evaluator.evaluate(predictions)
    accuracies.append(accuracy)

Accuracy: 0.8088235294117647


**SVM Algoritması**

In [None]:
pipeline = Pipeline(stages=[kelime_tokenizer, baglac_silici, hashingTF, idf, etiket_indexer])

processed_df = pipeline.fit(df).transform(df)

train_data, test_data = processed_df.randomSplit([0.8, 0.2], seed=123)

svm = LinearSVC(labelCol="etiket", featuresCol="veri", maxIter=80)
ovr = OneVsRest(classifier=svm, labelCol="etiket", featuresCol="veri")
ovr_model = ovr.fit(train_data)

predictions = ovr_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="etiket", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("SVM Accuracy:", accuracy)

predictions.select("id", "etiket", "prediction").show(truncate=False)


**K-NN algoritmasını**

In [None]:

def to_pandas(df):
    return df.toPandas()

# Eğitim ve test verilerini pandas DataFrame'e dönüştürme
train_pd = to_pandas(train_data.select("veri", "etiket"))
test_pd = to_pandas(test_data.select("veri", "etiket"))

knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(np.vstack(train_pd['veri']), train_pd['etiket'])

knn_predictions = knn.predict(np.vstack(test_pd['veri']))
knn_accuracy = accuracy_score(test_pd['etiket'], knn_predictions)
print("K-NN Accuracy:", knn_accuracy)


**K-NN ve SVM nin grafiği**

In [None]:
# Doğruluk oranlarını görselleştireme
accuracies = [accuracy, knn_accuracy]
algorithms = ['SVM', 'K-NN']

plt.figure(figsize=(8, 5))
plt.bar(algorithms, accuracies, color=['blue', 'green'])
plt.xlabel('Algoritmalar')
plt.ylabel('Doğruluk Oranı')
plt.title('SVM ve K-NN Doğruluk Oranları')
plt.ylim(0, 1)
for i, v in enumerate(accuracies):
    plt.text(i, v + 0.01, f"{v:.2f}", ha='center', fontweight='bold')
plt.show()

# Spark oturumunu kapatın
spark.stop()


**K Means**

In [None]:
pipeline = Pipeline(stages=[kelime_tokenizer, baglac_silici, hashingTF, idf, etiket_indexer])

processed_df = pipeline.fit(df).transform(df)

kmeans = KMeans(featuresCol='veri', predictionCol='kume', k=5)  # k parametresi küme sayısını belirtir
model = kmeans.fit(processed_df)

kume_tahminleri = model.transform(processed_df)

kume_tahminleri.select("id", "veri", "etiket", "kume").show(truncate=False)

# Silhouette skoru hesapla
evaluator = ClusteringEvaluator(featuresCol='veri', predictionCol='kume')
silhouette_score = evaluator.evaluate(kume_tahminleri)
print(f'Silhouette Score: {silhouette_score}')

# Sonuçları kaydet
kume_tahminleri.select("id", "veri", "etiket", "kume").write.mode("overwrite").parquet("blog_veri_kume.parquet")

df_sonuc = spark.read.parquet("blog_veri_kume.parquet")

pca = PCA(k=2, inputCol="veri", outputCol="pca_features")
pca_model = pca.fit(df_sonuc)
df_pca = pca_model.transform(df_sonuc)

df_pd = df_pca.select("pca_features", "kume").toPandas()
pca_features = pd.DataFrame(df_pd['pca_features'].tolist(), columns=['PCA1', 'PCA2'])
pca_features['kume'] = df_pd['kume']

plt.figure(figsize=(10, 6))
plt.scatter(pca_features['PCA1'], pca_features['PCA2'], c=pca_features['kume'], cmap='viridis')
plt.xlabel('PCA Bileşeni 1')
plt.ylabel('PCA Bileşeni 2')
plt.title('K-Means Kümeleme Sonuçları')
plt.colorbar(label='Kümeler')
plt.show()



Pipeline

In [None]:
pipeline = Pipeline(stages=[kelime_tokenizer, baglac_silici, hashingTF, idf, etiket_indexer, log_reg, dt])