In [5]:
import numpy as np
import pandas as pd

In [6]:
def create_sample_data(n_rows=1000000):
    """
    Örnek büyük veri seti oluşturur
    """
    np.random.seed(42)
    
    data = {
        'user_id': np.random.randint(1, 100000, n_rows),
        'timestamp': pd.date_range('2020-01-01', periods=n_rows, freq='1min'),
        'product_id': np.random.randint(1, 10000, n_rows),
        'category': np.random.choice(['Elektronik', 'Giyim', 'Ev&Bahçe', 'Spor', 'Kitap'], n_rows),
        'price': np.random.lognormal(3, 1, n_rows),
        'quantity': np.random.randint(1, 10, n_rows),
        'rating': np.random.uniform(1, 5, n_rows),
        'is_mobile': np.random.choice([True, False], n_rows, p=[0.7, 0.3]),
        'city': np.random.choice(['İstanbul', 'Ankara', 'İzmir', 'Antalya', 'Bursa'], n_rows)
    }
    
    df = pd.DataFrame(data)
    df['total_amount'] = df['price'] * df['quantity']
    
    return df

# 1 milyon kayıtlık örnek veri oluşturalım
sample_data = create_sample_data(1000000)

print(f"Veri seti boyutu: {sample_data.shape}")
print(f"Bellek kullanımı: {sample_data.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print("\nİlk 5 satır:")
sample_data.head()


Veri seti boyutu: (1000000, 10)
Bellek kullanımı: 171.85 MB

İlk 5 satır:


Unnamed: 0,user_id,timestamp,product_id,category,price,quantity,rating,is_mobile,city,total_amount
0,15796,2020-01-01 00:00:00,448,Elektronik,55.785318,7,1.782854,True,İzmir,390.497224
1,861,2020-01-01 00:01:00,7658,Spor,15.208016,3,2.73345,True,Antalya,45.624049
2,76821,2020-01-01 00:02:00,3227,Kitap,17.19924,4,1.727572,True,Bursa,68.796962
3,54887,2020-01-01 00:03:00,8424,Giyim,110.84594,2,1.30179,True,İzmir,221.69188
4,6266,2020-01-01 00:04:00,3710,Spor,4.096469,4,3.93006,False,Bursa,16.385875


In [7]:
# 1. Bellek optimizasyonu
def optimize_dtypes(df):
    """
    DataFrame'in veri tiplerini optimize eder
    """
    optimized_df = df.copy()
    
    # Integer kolonları optimize et
    for col in optimized_df.select_dtypes(include=['int64']).columns:
        if optimized_df[col].min() >= 0:
            if optimized_df[col].max() < 255:
                optimized_df[col] = optimized_df[col].astype('uint8')
            elif optimized_df[col].max() < 65535:
                optimized_df[col] = optimized_df[col].astype('uint16')
            else:
                optimized_df[col] = optimized_df[col].astype('uint32')
    
    # Float kolonları optimize et
    for col in optimized_df.select_dtypes(include=['float64']).columns:
        optimized_df[col] = optimized_df[col].astype('float32')
    
    # String kolonları kategori yap
    for col in optimized_df.select_dtypes(include=['object']).columns:
        if optimized_df[col].nunique() < optimized_df.shape[0] * 0.5:
            optimized_df[col] = optimized_df[col].astype('category')
    
    return optimized_df

# Orijinal ve optimize edilmiş veriyi karşılaştıralım
original_size = sample_data.memory_usage(deep=True).sum() / 1024**2
optimized_data = optimize_dtypes(sample_data)
optimized_size = optimized_data.memory_usage(deep=True).sum() / 1024**2

print(f"Orijinal boyut: {original_size:.2f} MB")
print(f"Optimize edilmiş boyut: {optimized_size:.2f} MB")
print(f"Tasarruf: {((original_size - optimized_size) / original_size * 100):.1f}%")

Orijinal boyut: 171.85 MB
Optimize edilmiş boyut: 28.61 MB
Tasarruf: 83.4%


In [8]:
# 2. Chunked Processing (Parça Parça İşleme)
import time

def process_in_chunks(df, chunk_size=100000, operation=None):
    """
    Büyük DataFrame'i parçalar halinde işler
    """
    results = []
    total_chunks = len(df) // chunk_size + (1 if len(df) % chunk_size != 0 else 0)
    
    for i, chunk in enumerate(np.array_split(df, total_chunks)):
        print(f"İşleniyor: Parça {i+1}/{total_chunks} ({len(chunk)} kayıt)")
        
        if operation:
            result = operation(chunk)
            results.append(result)
        else:
            # Basit aggregation örneği
            result = chunk.groupby('category')['total_amount'].sum()
            results.append(result)
    
    return results

# Kategori bazında toplam satış miktarını parçalı olarak hesaplayalım
start_time = time.time()
chunk_results = process_in_chunks(optimized_data)
end_time = time.time()

# Sonuçları birleştirelim
# final_result = sum(chunk_results)
print(f"\nİşlem süresi: {end_time - start_time:.2f} saniye")
# print("\nKategori bazında toplam satış:")
# print(final_result.sort_values(ascending=False))

İşleniyor: Parça 1/10 (100000 kayıt)
İşleniyor: Parça 2/10 (100000 kayıt)
İşleniyor: Parça 3/10 (100000 kayıt)
İşleniyor: Parça 4/10 (100000 kayıt)
İşleniyor: Parça 5/10 (100000 kayıt)
İşleniyor: Parça 6/10 (100000 kayıt)
İşleniyor: Parça 7/10 (100000 kayıt)
İşleniyor: Parça 8/10 (100000 kayıt)
İşleniyor: Parça 9/10 (100000 kayıt)
İşleniyor: Parça 10/10 (100000 kayıt)

İşlem süresi: 0.03 saniye


  return bound(*args, **kwds)
  result = chunk.groupby('category')['total_amount'].sum()


In [9]:
import time
# Yavaş yöntem: Loop
def calculate_discount_loop(df):
    discounts = []
    for _, row in df.iterrows():
        if row['total_amount'] > 1000:
            discount = row['total_amount'] * 0.1
        elif row['total_amount'] > 500:
            discount = row['total_amount'] * 0.05
        else:
            discount = 0
        discounts.append(discount)
    return discounts

# Hızlı yöntem: Vectorization
def calculate_discount_vectorized(df):
    conditions = [
        df['total_amount'] > 1000,
        df['total_amount'] > 500,
    ]
    choices = [
        df['total_amount'] * 0.1,
        df['total_amount'] * 0.05,
    ]
    return np.select(conditions, choices, default=0)

# Küçük bir örneklem ile test edelim (1000 kayıt)
test_data = sample_data.head(1000)

# Loop yöntemi
start = time.time()
discount_loop = calculate_discount_loop(test_data)
loop_time = time.time() - start

# Vectorized yöntem
start = time.time()
discount_vectorized = calculate_discount_vectorized(test_data)
vectorized_time = time.time() - start

print(f"Loop yöntemi: {loop_time:.4f} saniye")
print(f"Vectorized yöntem: {vectorized_time:.4f} saniye")
print(f"Hız artışı: {loop_time / vectorized_time:.1f}x")

# Sonuçların aynı olduğunu doğrulayalım
print(f"Sonuçlar eşit: {np.allclose(discount_loop, discount_vectorized)}")

Loop yöntemi: 0.0471 saniye
Vectorized yöntem: 0.0007 saniye
Hız artışı: 65.4x
Sonuçlar eşit: True


## Dask

In [14]:
# %%
# Dask kurulumu ve temel kullanım
import dask.dataframe as dd
from dask import delayed
import dask

# Dask'ı synchronous scheduler ile kullanma (daha güvenli)
print("=== DASK KURULUMU ===")
try:
    # Distributed client yerine synchronous scheduler kullan
    dask.config.set(scheduler='synchronous')
    print("✅ Dask synchronous scheduler ile yapılandırıldı")
    dask_available = True
    client = None  # Synchronous modda client gerekmez
except Exception as e:
    print(f"⚠️ Dask yapılandırılamadı: {e}")
    dask_available = False
    client = None

# Alternatif: Distributed client deneme (opsiyonel)
if dask_available:
    try:
        from dask.distributed import Client
        # Önce mevcut client'ları kapat
        try:
            from dask.distributed import default_client
            old_client = default_client()
            old_client.close()
        except:
            pass
        
        # Yeni client oluştur - daha konservatif ayarlarla
        client = Client(
            processes=False,  # Thread-based workers kullan (daha stabil)
            threads_per_worker=1, 
            n_workers=1,  # Tek worker ile başla
            memory_limit='512MB',  # Daha düşük memory limit
            silence_logs=True,
            dashboard_address=None  # Dashboard'u devre dışı bırak
        )
        print(f"✅ Dask distributed client başarıyla oluşturuldu")
        print(client)
        dask.config.set(scheduler='distributed')
    except Exception as e:
        print(f"⚠️ Distributed client oluşturulamadı: {e}")
        print("Synchronous scheduler kullanılacak...")
        dask.config.set(scheduler='synchronous')
        client = None

# %%
# Pandas DataFrame'i Dask DataFrame'e dönüştürme

if dask_available:
    # 1. Pandas'dan Dask'a
    dask_df = dd.from_pandas(sample_data, npartitions=4)
    print(f"Dask DataFrame oluşturuldu: {dask_df.npartitions} partition")
    print(f"Her partition boyutu: ~{len(sample_data) // dask_df.npartitions:,} kayıt")
    
    # 2. Dosyadan direkt okuma (daha verimli)
    # Önce veriyi kaydetelim
    sample_data.to_csv('sample_data.csv', index=False)
    dask_df_from_file = dd.read_csv('sample_data.csv')
    
    print(f"\nDosyadan okunan Dask DataFrame: {dask_df_from_file.npartitions} partition")
else:
    print("⚠️ Dask kullanılamıyor, bu bölüm atlanıyor...")

# %%
# Dask ile temel operasyonlar

if dask_available:
    # 1. Lazy evaluation örneği
    print("=== LAZY EVALUATION ÖRNEĞİ ===")
    
    # Bu işlemler hemen çalışmaz, sadece task graph oluşturur
    filtered_data = dask_df[dask_df['total_amount'] > 100]
    grouped_data = filtered_data.groupby('category')['total_amount'].mean()
    
    print("İşlemler tanımlandı, henüz çalıştırılmadı...")
    
    # compute() ile işlemleri çalıştıralım
    start_time = time.time()
    result = grouped_data.compute()
    end_time = time.time()
    
    print(f"İşlem süresi: {end_time - start_time:.2f} saniye")
    print("\nSonuç:")
    print(result.sort_values(ascending=False))
else:
    print("⚠️ Dask kullanılamıyor, bu bölüm atlanıyor...")

# %%
# Dask vs Pandas performans karşılaştırması

if dask_available:
    print("=== PERFORMANS KARŞILAŞTIRMASI ===")
    
    # Kompleks bir aggregation işlemi tanımlayalım
    def complex_aggregation_pandas(df):
        return df.groupby(['category', 'city']).agg({
            'total_amount': ['sum', 'mean', 'count'],
            'rating': 'mean',
            'quantity': 'sum'
        })
    
    def complex_aggregation_dask(df):
        return df.groupby(['category', 'city']).agg({
            'total_amount': ['sum', 'mean', 'count'],
            'rating': 'mean',
            'quantity': 'sum'
        }).compute()
    
    # Pandas ile test
    start = time.time()
    pandas_result = complex_aggregation_pandas(sample_data)
    pandas_time = time.time() - start
    
    # Dask ile test
    start = time.time()
    dask_result = complex_aggregation_dask(dask_df)
    dask_time = time.time() - start
    
    print(f"Pandas süresi: {pandas_time:.2f} saniye")
    print(f"Dask süresi: {dask_time:.2f} saniye")
    print(f"Hız karşılaştırması: {pandas_time/dask_time:.1f}x {'(Dask daha hızlı)' if dask_time < pandas_time else '(Pandas daha hızlı)'}")
    
    print(f"\nSonuç boyutu: {pandas_result.shape}")
    print("İlk 5 satır:")
    print(pandas_result.head())
else:
    print("⚠️ Dask kullanılamıyor, performans karşılaştırması atlanıyor...")

=== DASK KURULUMU ===
✅ Dask synchronous scheduler ile yapılandırıldı


2025-08-14 11:37:14,700 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2025-08-14 11:37:14,704 - distributed.scheduler - INFO - State start
2025-08-14 11:37:14,709 - distributed.scheduler - INFO -   Scheduler at:   inproc://10.0.5.2/765/1
2025-08-14 11:37:14,710 - distributed.scheduler - INFO -   dashboard at:  http://10.0.5.2:8787/status
2025-08-14 11:37:14,711 - distributed.scheduler - INFO - Registering Worker plugin shuffle


2025-08-14 11:37:14,718 - distributed.worker - INFO -       Start worker at:    inproc://10.0.5.2/765/4
2025-08-14 11:37:14,719 - distributed.worker - INFO -          Listening to:             inproc10.0.5.2
2025-08-14 11:37:14,720 - distributed.worker - INFO -           Worker name:                          0
2025-08-14 11:37:14,720 - distributed.worker - INFO -          dashboard at:             10.0.5.2:38537
2025-08-14 11:37:14,721 - distributed.worker - INFO - Waiting to connect to:    inproc://10.0.5.2/765/1
2025-08-14 11:37:14,722 - distributed.worker - INFO - -------------------------------------------------
2025-08-14 11:37:14,723 - distributed.worker - INFO -               Threads:                          1
2025-08-14 11:37:14,723 - distributed.worker - INFO -                Memory:                 488.28 MiB
2025-08-14 11:37:14,724 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space/worker-iwa8orvm
2025-08-14 11:37:14,724 - distributed.worker - INFO

✅ Dask distributed client başarıyla oluşturuldu
<Client: 'inproc://10.0.5.2/765/1' processes=1 threads=1, memory=488.28 MiB>




ImportError: pyarrow>=10.0.1 is required for PyArrow backed StringArray.

In [12]:
%pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-21.0.0-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Downloading pyarrow-21.0.0-cp312-cp312-manylinux_2_28_x86_64.whl (42.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 MB[0m [31m159.6 MB/s[0m  [33m0:00:00[0mm0:00:01[0m
[?25hInstalling collected packages: pyarrow
Successfully installed pyarrow-21.0.0
Note: you may need to restart the kernel to use updated packages.
