<a href="https://colab.research.google.com/github/elemnurguner/data-ai-projects/blob/main/B%C3%BCy%C3%BCkVeriAnalizi(Apache_Spark).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

🌟 OSM İstanbul Analiz Projesi
Bu proje, OpenStreetMap (OSM) verilerini kullanarak İstanbul'daki kafe ve restoranların analizini yapar. Spark ile büyük veri işleme ve Folium ile haritalama içerir.

🛠️ Teknolojiler
PySpark (v3.5.0)

Folium (Haritalama)

Overpass API (Veri çekme)

Parquet/CSV (Veri depolama)

In [None]:
# 1. Spark'ı kur
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark==3.5.0 pyarrow==14.0.0

# 2. SparkSession oluştur
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .appName("OSM_Analysis") \
    .getOrCreate()

In [None]:
import requests
import json

# Overpass API sorgusu (İstanbul'daki kafe ve restoranlar)
overpass_url = "https://overpass-api.de/api/interpreter"
query = """
[out:json];
area["name"="İstanbul"]->.a;
(
  node["amenity"="cafe"](area.a);
  node["amenity"="restaurant"](area.a);
  way["amenity"="cafe"](area.a);
  way["amenity"="restaurant"](area.a);
);
out center;
"""

# Veriyi indir
response = requests.get(overpass_url, params={'data': query})
osm_data = response.json()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import lit

# Spark başlat
spark = SparkSession.builder.getOrCreate()

# Şema tanımla
schema = StructType([
    StructField("id", LongType()),
    StructField("type", StringType()),
    StructField("lat", DoubleType()),
    StructField("lon", DoubleType()),
    StructField("tags", MapType(StringType(), StringType()))
])

# OSM verisini işle
rows = []
for element in osm_data["elements"]:
    if "tags" in element:
        row = (
            element["id"],
            element["type"],
            element.get("lat", element.get("center", {}).get("lat", 0.0)),
            element.get("lon", element.get("center", {}).get("lon", 0.0)),
            element["tags"]
        )
        rows.append(row)

# DataFrame oluştur
osm_df = spark.createDataFrame(rows, schema)
osm_df.show(3)

+---------+----+----------+----------+--------------------+
|       id|type|       lat|       lon|                tags|
+---------+----+----------+----------+--------------------+
|262402714|node|41.0171629|28.9698993|{name -> Hamdi, n...|
|269507766|node|41.0309965|28.9748683|{name -> Pera Ant...|
|277139848|node|41.0122714|28.9551177|{name -> Pizzeria...|
+---------+----+----------+----------+--------------------+
only showing top 3 rows



📊 3. Gerçek Analiz Örnekleri
A) En Popüler Mekan Zincirleri

In [None]:
from pyspark.sql.functions import col, lower

osm_df.filter(
    (lower(col("tags").getItem("name")).contains("starbucks")) |
    (lower(col("tags").getItem("name")).contains("mcdonalds"))
).groupBy("tags.name").count().orderBy("count", ascending=False).show()

+---------+-----+
|     name|count|
+---------+-----+
|Starbucks|  128|
+---------+-----+



B) Semtlere Göre Dağılım

In [None]:
from pyspark.sql.functions import when

# Semt bilgisi ekle (örnek: Kadıköy, Beşiktaş)
osm_df = osm_df.withColumn(
    "semt",
    when(col("lat").between(41.00, 41.02) & col("lon").between(28.96, 29.00), "Kadıköy")
    .when(col("lat").between(41.03, 41.05) & col("lon").between(28.98, 29.02), "Beşiktaş")
    .otherwise("Diğer")
)

osm_df.groupBy("semt").count().show()

+--------+-----+
|    semt|count|
+--------+-----+
|   Diğer| 7541|
| Kadıköy|  896|
|Beşiktaş|  450|
+--------+-----+



📍 4. Harita Görselleştirme


In [None]:
!pip install folium
import folium

# Harita oluştur (İstanbul merkez)
harita = folium.Map(location=[41.0082, 28.9784], zoom_start=12)

# Veriden rastgele 50 nokta çiz
for row in osm_df.limit(50).collect():
    folium.CircleMarker(
        location=[row["lat"], row["lon"]],
        radius=5,
        popup=row["tags"].get("name", "No Name"),
        color="blue",
        fill=True
    ).add_to(harita)

harita



⚠️ Sorun Çözme
1. Overpass API Limitleri:

2 dakikada bir 10.000 eleman sınırı var. Daha büyük veri için:

In [None]:
query = query.replace("];", "][timeout:300];")  # 5 dakikalık timeout

2. Koordinat Eksikliği:

way tipi elemanlarda merkez noktasını kullanıyoruz:

In [None]:
lat = element.get("lat", element.get("center", {}).get("lat"))

3. Performans İçin:

Büyük veride cache() kullanın:

In [None]:
osm_df.cache().count()

🛠️ 1. Ortam Kurulumu (Kesin Çalışan)


In [None]:
# Tüm bağımlılıkları yükle (Colab için optimize edilmiş)
!apt-get update -qq
!apt-get install -y openjdk-8-jdk-headless
!pip install pyspark==3.5.0 pyarrow==14.0.0 folium geopandas

# Spark başlatma (Bellek ayarlarıyla)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-8-jdk-headless is already the newest version (8u442-b06~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 30 not upgraded.


🌍 2. Gerçek OSM Verisi (Mini Örnek)


In [None]:
import requests
import json

# Overpass API ile İstanbul'dan 50 kafe verisi çekelim
overpass_url = "https://overpass-api.de/api/interpreter"
query = """
[out:json][timeout:25];
area["name"="İstanbul"]->.a;
(
  node["amenity"="cafe"](area.a)(around:5000,41.0082,28.9784);
  way["amenity"="cafe"](area.a)(around:5000,41.0082,28.9784);
);
out center 50;
"""

response = requests.get(overpass_url, params={'data': query})
osm_data = response.json()

# Veriyi DataFrame'e dönüştürme
from pyspark.sql.types import *
schema = StructType([
    StructField("id", LongType()),
    StructField("type", StringType()),
    StructField("lat", DoubleType()),
    StructField("lon", DoubleType()),
    StructField("tags", MapType(StringType(), StringType()))
])

rows = []
for element in osm_data["elements"]:
    tags = element.get("tags", {})
    rows.append((
        element["id"],
        element["type"],
        element.get("lat", element.get("center", {}).get("lat")),
        element.get("lon", element.get("center", {}).get("lon")),
        tags
    ))

osm_df = spark.createDataFrame(rows, schema)
osm_df.show(3)

+---------+----+----------+----------+--------------------+
|       id|type|       lat|       lon|                tags|
+---------+----+----------+----------+--------------------+
|463142937|node|41.0055966|28.9793175|{name -> Java Stu...|
|656087304|node|41.0317779|28.9821871|{name -> Hanımeli...|
|786899486|node|41.0334521|28.9764018|{name -> Mustafa ...|
+---------+----+----------+----------+--------------------+
only showing top 3 rows



📊 3. Garantili Analiz Örnekleri
A) Kafe İsimlerine Göre Gruplama

In [None]:
from pyspark.sql.functions import col

osm_df.filter(col("tags").getItem("amenity") == "cafe") \
     .select(col("tags").getItem("name").alias("kafe_adi")) \
     .groupBy("kafe_adi").count() \
     .orderBy("count", ascending=False) \
     .show(5, truncate=False)

+--------------------+-----+
|kafe_adi            |count|
+--------------------+-----+
|Starbucks           |4    |
|Mado                |2    |
|Simit Sarayı        |2    |
|Java Studio Istanbul|1    |
|Mustafa Abi         |1    |
+--------------------+-----+
only showing top 5 rows



B) Koordinat Bazlı Yoğunluk


In [None]:
!pip install folium
import folium

# Harita oluştur
m = folium.Map(location=[41.0082, 28.9784], zoom_start=14)

# Kafeleri işaretle
for row in osm_df.filter(col("type") == "node").collect():
    folium.CircleMarker(
        location=[row["lat"], row["lon"]],
        radius=5,
        popup=row["tags"].get("name", "Unknown"),
        color='blue',
        fill=True
    ).add_to(m)

m



📌 Ekstra: Veriyi Kaydetme


In [None]:
# Parquet olarak kaydet
osm_df.write.parquet("istanbul_kafeler.parquet")

# CSV olarak kaydet (Pandas ile)
osm_df.limit(1000).toPandas().to_csv("kafeler.csv", index=False)

⚠️ Hata Çözümleri
1. Overpass API Timeout:

In [None]:
query = query.replace("25", "60")  # Timeout'u 60 saniyeye çıkar

2. Eksik Koordinatlar:



In [None]:
osm_df = osm_df.filter(
    (col("lat").isNotNull()) &
    (col("lon").isNotNull()))


3. Spark Memory Hatası:



In [None]:
spark.stop()  # Önceki oturumu kapat
spark = SparkSession.builder \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()