In [3]:
import os
import sys
from dotenv import load_dotenv
from evds import evdsAPI
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

# --- 1. ORTAM VE GÜVENLİK YAPILANDIRMASI ---
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

env_path = os.path.abspath("../src/ingestion/.env")
load_dotenv(env_path)
API_KEY = os.getenv("EVDS_API_KEY")

# --- 2. VERİ EKSTRAKSİYONU (EXTRACT) ---
evds = evdsAPI(API_KEY)
df_pandas = evds.get_data(
    series=["TP.DK.USD.S.YTL"],
    startdate="01-01-2024",
    enddate="31-01-2024"
)

# --- 3. SPARK OTURUMU VE VERİ DÖNÜŞÜMÜ ---
spark = SparkSession.builder \
    .appName("FinancialDataPlatform_ETL") \
    .master("local") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .getOrCreate()

# Pandas -> Spark Dönüşümü
spark_df = spark.createDataFrame(df_pandas)

# --- 4. VERİ TEMİZLEME VE TİP DÖNÜŞÜMÜ (DF_FINAL OLUŞTURMA) ---
# İşte senin o meşhur 'df_final' değişkenin burada doğuyor:
df_cleaned = spark_df.na.drop()
df_final = df_cleaned.withColumn("Tarih", to_date(col("Tarih"), "dd-MM-yyyy"))

# Kontrol
df_final.show(5)
df_final.printSchema()

+----------+---------------+
|     Tarih|TP_DK_USD_S_YTL|
+----------+---------------+
|2024-01-02|        29.4913|
|2024-01-03|        29.7209|
|2024-01-04|         29.793|
|2024-01-05|        29.7988|
|2024-01-08|         29.825|
+----------+---------------+
only showing top 5 rows
root
 |-- Tarih: date (nullable = true)
 |-- TP_DK_USD_S_YTL: double (nullable = true)



In [4]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, round

# --- 1. ANALİTİK PENCERE YAPILANDIRMASI ---
# Veri seti üzerinde zamansal bir sıralama (Window) tanımlıyoruz.
# Bu yapı, satırlar arasında 'önceki' (lag) veya 'sonraki' (lead) değerlere erişmemizi sağlar.
windowSpec = Window.orderBy("Tarih")

# --- 2. ZAMAN SERİSİ ANALİZİ (LAGGING) ---
# Bir önceki iş gününün kur değerini yeni bir sütun olarak ekliyoruz.
# DE Notu: İlk satırda karşılaştıracak veri olmadığı için bu değer 'null' dönecektir.
df_with_lag = df_final.withColumn("Onceki_Gun_Kur", lag("TP_DK_USD_S_YTL").over(windowSpec))

# --- 3. İŞ MANTIĞI HESAPLAMA (BUSINESS LOGIC) ---
# Günlük kur değişimini yüzde (%) bazında hesaplıyoruz.
# Formül: ((Bugünkü Değer - Önceki Değer) / Önceki Değer) * 100
df_transformed = df_with_lag.withColumn(
    "Gunluk_Degisim_Yuzde", 
    round(((col("TP_DK_USD_S_YTL") - col("Onceki_Gun_Kur")) / col("Onceki_Gun_Kur")) * 100, 4)
)

# --- 4. ANALİZ DOĞRULAMA ---
# İlk 10 satırı inceleyerek hesaplamaların doğruluğunu kontrol ediyoruz.
df_transformed.show(10)

+----------+---------------+--------------+--------------------+
|     Tarih|TP_DK_USD_S_YTL|Onceki_Gun_Kur|Gunluk_Degisim_Yuzde|
+----------+---------------+--------------+--------------------+
|2024-01-02|        29.4913|          NULL|                NULL|
|2024-01-03|        29.7209|       29.4913|              0.7785|
|2024-01-04|         29.793|       29.7209|              0.2426|
|2024-01-05|        29.7988|        29.793|              0.0195|
|2024-01-08|         29.825|       29.7988|              0.0879|
|2024-01-09|        29.8879|        29.825|              0.2109|
|2024-01-10|        29.9361|       29.8879|              0.1613|
|2024-01-11|        29.9675|       29.9361|              0.1049|
|2024-01-12|        30.0121|       29.9675|              0.1488|
|2024-01-15|        30.0704|       30.0121|              0.1943|
+----------+---------------+--------------+--------------------+
only showing top 10 rows


In [7]:
import os

# --- 1. DEPOLAMA DİZİN YÖNETİMİ ---
# Çıktıların proje kök dizininde düzenli bir klasörde toplanmasını sağlıyoruz.
output_path = "output_data"
if not os.path.exists(output_path):
    os.makedirs(output_path)

# --- 2. ÇOKLU FORMATTA KAYIT (MULTI-FORMAT STORAGE) ---
# A. CSV Formatı: İş birimleri ve son kullanıcıların kolayca açabilmesi için (İnsan-okunabilir).
df_transformed.write.csv(f"{output_path}/usd_kur_analiz.csv", header=True, mode="overwrite")

# B. Parquet Formatı: Büyük veri sistemleri ve analitik motorlar için (Makine-okunabilir, optimize).
# DE Notu: Parquet, kolon bazlı depolama yaparak disk alanından tasarruf ve hızlı sorgulama sağlar.
df_transformed.write.parquet(f"{output_path}/usd_kur_analiz.parquet", mode="overwrite")

print(f"--- ETL SÜRECİ TAMAMLANDI ---")
print(f"İşlenmiş veriler '{output_path}' klasörüne başarıyla kaydedildi.")

--- ETL SÜRECİ TAMAMLANDI ---
İşlenmiş veriler 'output_data' klasörüne başarıyla kaydedildi.


In [6]:
# Kaydettiğimiz veriyi klasör yolunu vererek geri okuyoruz
check_df = spark.read.parquet("output_data/usd_kur_analiz.parquet")

# Eğer tablo geliyorsa verin oradadır!
check_df.show(5)

+----------+---------------+--------------+--------------------+
|     Tarih|TP_DK_USD_S_YTL|Onceki_Gun_Kur|Gunluk_Degisim_Yuzde|
+----------+---------------+--------------+--------------------+
|2024-01-02|        29.4913|          NULL|                NULL|
|2024-01-03|        29.7209|       29.4913|              0.7785|
|2024-01-04|         29.793|       29.7209|              0.2426|
|2024-01-05|        29.7988|        29.793|              0.0195|
|2024-01-08|         29.825|       29.7988|              0.0879|
+----------+---------------+--------------+--------------------+
only showing top 5 rows
