
- **vendor_id** :A code indicating the TPEP provider that provided the record.
- 1 = Creative Mobile Technologies, LLC
- 2 = Curb Mobility, LLC
- 6 = Myle Technologies Inc
- 7 = Helix
- **rate_code:** The final rate code in effect at the end of the trip.
- 1 = Standard rate
- 2 = JFK
- 3 = Newark
- 4 = Nassau or Westchester
- 5 = Negotiated fare
- 6 = Group ride
- 99 = Null/unknown
- **store_and_fwd_flag:** This flag indicates whether the trip record was held in vehicle memory before
- sending to the vendor, aka “store and forward,” because the vehicle did not
- have a connection to the server.
- Y = store and forward trip
- N = not a store and forward trip
- [https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf](link-URL)

 silver_uber_trip - Veri Temizleme ve Dönüşüm Adımları

- bronze_uber_data Delta tablosu Spark ile okundu
- Gereksiz kolonlar silindi: `EventProcessedUtcTime`, `PartitionId`, `EventEnqueuedUtcTime`
- `pickup_datetime` ve `dropoff_datetime` kolonları timestamp formatına dönüştürüldü
- `store_and_fwd_flag` kolonu boolean veri tipine çevrildi (`Y` → `True`, diğerleri → `False`)
- `medallion` ve `hack_license` kolonları string'e cast edildi
- `passenger_count`, `trip_time_in_secs`, `trip_distance` > 0 olan satırlar filtrelendi
- `rate_code` kolonunda 1–6 dışındaki tüm değerler `None` olarak işaretlendi
- `rate_code` ByteType’a, `passenger_count` ShortType’a çevrildi
- `medallion`, `hack_license`, `pickup_datetime` üçlüsüyle duplicate (satır tekrar) kontrolü yapıldı
- `pickup_longitude`, `pickup_latitude`, `dropoff_longitude`, `dropoff_latitude` koordinatlarının geçerliliği kontrol edildi
- Geçersiz koordinatlar filtrelendi (örneğin: 0.0 veya ekstrem değerler)
- IQR (Interquartile Range) yöntemiyle aykırı değer (outlier) analizi yapıldı.
- `trip_id` oluşturuldu:
  - SHA2 ile `medallion + hack_license + pickup_datetime` birleşimi hashlenerek üretildi
- `month` kolonu `pickup_datetime`'dan çıkarılarak oluşturuldu
- Son olarak veri `silver.silver_uber_trip` tablosuna `partitionBy("month")` ile delta formatında yazıldı


In [58]:
from pyspark.sql.functions import col, unix_timestamp, round, month

# 1. Bronze v3’ü oku
df_trip = spark.table("bronze.bronze_uber_data_partioned")


#pickup_datetime ve dropoff_datetime → timestamp formatına çevrilecek
#store_and_fwd_flag → boolean
#bazı long tipleri int’e veya short’a çevrilecek (örn. passenger_count)

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 60, Finished, Available, Finished)

In [59]:
from pyspark.sql.functions import unix_timestamp, col

df_trip = df_trip.withColumn(
    "trip_duration",
    unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")
).withColumn(
    "average_speed",
    round(col("trip_distance") / (col("trip_duration") / 3600), 2)
)


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 61, Finished, Available, Finished)

In [60]:
df_trip_cleaned = df_trip.drop("EventProcessedUtcTime", "PartitionId", "EventEnqueuedUtcTime")
df_trip_cleaned.printSchema()
#gereksiz kolonlar silindi

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 62, Finished, Available, Finished)

root
 |-- medallion: long (nullable = true)
 |-- hack_license: long (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_time_in_secs: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- ingest_date: date (nullable = true)
 |-- trip_duration: long (nullable = true)
 |-- average_speed: double (nullable = true)



In [61]:
from pyspark.sql.functions import to_timestamp

df_trip_datetime = df_trip_cleaned.withColumn(
    "pickup_datetime", to_timestamp("pickup_datetime", "yyyy-MM-dd HH:mm:ss")
).withColumn(
    "dropoff_datetime", to_timestamp("dropoff_datetime", "yyyy-MM-dd HH:mm:ss")
)

df_trip_datetime.select("pickup_datetime", "dropoff_datetime").show(5, truncate=False)
df_trip_datetime.printSchema()
#time-stamp e cevrim

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 63, Finished, Available, Finished)

+-------------------+-------------------+
|pickup_datetime    |dropoff_datetime   |
+-------------------+-------------------+
|2013-12-21 14:44:00|2013-12-21 14:52:21|
|2013-12-21 14:44:00|2013-12-21 14:51:55|
|2013-12-21 14:44:01|2013-12-21 15:00:13|
|2013-12-21 14:44:01|2013-12-21 14:54:59|
|2013-12-21 14:44:01|2013-12-21 15:11:06|
+-------------------+-------------------+
only showing top 5 rows

root
 |-- medallion: long (nullable = true)
 |-- hack_license: long (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_time_in_secs: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)

In [62]:
from pyspark.sql.functions import when, col

df_trip_flag = df_trip_datetime.withColumn(
    "store_and_fwd_flag",
    when(col("store_and_fwd_flag") == "Y", True).otherwise(False)  #Eğer Y(store and forward) ise True, değilse False
)

df_trip_flag.select("store_and_fwd_flag").distinct().show()
df_trip_flag.printSchema()
#store_and_fwd_flag → boolean

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 64, Finished, Available, Finished)

+------------------+
|store_and_fwd_flag|
+------------------+
|              true|
|             false|
+------------------+

root
 |-- medallion: long (nullable = true)
 |-- hack_license: long (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: long (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = false)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_time_in_secs: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- ingest_date: date (nullable = true)
 |-- trip_duration: long (nullable = true)
 |-- average_speed: double (nullable = true)



In [63]:
from pyspark.sql.types import StringType

df_trip_string = df_trip_flag \
    .withColumn("medallion", col("medallion").cast(StringType())) \
    .withColumn("hack_license", col("hack_license").cast(StringType()))
    #string e donusum

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 65, Finished, Available, Finished)

In [64]:
df_trip_valid = df_trip_string.filter(
    (col("passenger_count") > 0) &
    (col("trip_time_in_secs") > 0) &
    (col("trip_distance") > 0)
)

df_trip_valid.select("passenger_count", "trip_time_in_secs", "trip_distance","rate_code").describe().show()
#belirtilen kolonlarda hata kontrolu  
'''
| Kolon                | Dağılım Durumu              | Standart Sapmanın Yorumu                        |
| -------------------- | --------------------------- | ----------------------------------------------- |
| passenger\_count     | Homojen, düşük sapma        | Çoğu kişi yalnız seyahat ediyor                 |
| trip\_time\_in\_secs | Heterojen, yüksek sapma     | Kısa ve uzun yolculuklar karışık                |
| trip\_distance       | Heterojen, çok yüksek sapma | Kısa ve çok uzun mesafeler iç içe (uç değerler) |
'''

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 66, Finished, Available, Finished)

+-------+------------------+-----------------+------------------+-------------------+
|summary|   passenger_count|trip_time_in_secs|     trip_distance|          rate_code|
+-------+------------------+-----------------+------------------+-------------------+
|  count|          21179750|         21179750|          21179750|           21179750|
|   mean| 1.287888383951652|784.3871663263259| 2.917831900754262| 1.0277089672918707|
| stddev|0.6425074755472334|587.4232462935475|3.3982951049889163|0.34199304912375383|
|    min|                 1|                1|               0.1|                  0|
|    max|                 9|            10503|             100.0|                210|
+-------+------------------+-----------------+------------------+-------------------+



'\n| Kolon                | Dağılım Durumu              | Standart Sapmanın Yorumu                        |\n| -------------------- | --------------------------- | ----------------------------------------------- |\n| passenger\\_count     | Homojen, düşük sapma        | Çoğu kişi yalnız seyahat ediyor                 |\n| trip\\_time\\_in\\_secs | Heterojen, yüksek sapma     | Kısa ve uzun yolculuklar karışık                |\n| trip\\_distance       | Heterojen, çok yüksek sapma | Kısa ve çok uzun mesafeler iç içe (uç değerler) |\n'

In [65]:
from pyspark.sql.functions import col

# 0 ve daha küçük olanlar (geçersiz)
count_below_1 = df_trip_valid.filter(col("rate_code") < 1).count()

# 6'dan büyük olanlar (geçersiz)
count_above_6 = df_trip_valid.filter(col("rate_code") > 6).count()

# Ekrana yazdır
print(f"rate_code < 1 (örneğin 0) olanlar: {count_below_1}")
print(f"rate_code > 6 olanlar: {count_above_6}")


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 67, Finished, Available, Finished)

rate_code < 1 (örneğin 0) olanlar: 1940
rate_code > 6 olanlar: 49


In [66]:
df_trip_valid.dtypes


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 68, Finished, Available, Finished)

[('medallion', 'string'),
 ('hack_license', 'string'),
 ('vendor_id', 'string'),
 ('rate_code', 'bigint'),
 ('store_and_fwd_flag', 'boolean'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('passenger_count', 'bigint'),
 ('trip_time_in_secs', 'bigint'),
 ('trip_distance', 'double'),
 ('pickup_longitude', 'double'),
 ('pickup_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('dropoff_latitude', 'double'),
 ('ingest_date', 'date'),
 ('trip_duration', 'bigint'),
 ('average_speed', 'double')]

In [67]:
from pyspark.sql.functions import when, col

df_cleaned = df_trip_valid.withColumn(
    "rate_code",
    when((col("rate_code") >= 1) & (col("rate_code") <= 6), col("rate_code"))
    .otherwise(None)
)

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 69, Finished, Available, Finished)

In [68]:
from pyspark.sql.types import ByteType, ShortType

df_optimized = df_cleaned.withColumn("rate_code", col("rate_code").cast(ByteType())) \
                         .withColumn("passenger_count", col("passenger_count").cast(ShortType()))

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 70, Finished, Available, Finished)

In [69]:
df_optimized.dtypes


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 71, Finished, Available, Finished)

[('medallion', 'string'),
 ('hack_license', 'string'),
 ('vendor_id', 'string'),
 ('rate_code', 'tinyint'),
 ('store_and_fwd_flag', 'boolean'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('passenger_count', 'smallint'),
 ('trip_time_in_secs', 'bigint'),
 ('trip_distance', 'double'),
 ('pickup_longitude', 'double'),
 ('pickup_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('dropoff_latitude', 'double'),
 ('ingest_date', 'date'),
 ('trip_duration', 'bigint'),
 ('average_speed', 'double')]

In [70]:
from pyspark.sql.functions import count

df_optimized.groupBy("medallion", "hack_license", "pickup_datetime") \
    .agg(count("*").alias("row_count")) \
    .filter(col("row_count") > 1) \
    .show()
    #unique kontrolu

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 72, Finished, Available, Finished)

+---------+------------+---------------+---------+
|medallion|hack_license|pickup_datetime|row_count|
+---------+------------+---------------+---------+
+---------+------------+---------------+---------+



In [83]:
from pyspark.sql.functions import col

# Hem geçersiz hem de sıfır koordinatları filtrele (drop et)
df_invalid_coords = df_optimized.filter(
    (col("pickup_latitude") > 0) & (col("pickup_latitude") <= 90) &
    (col("dropoff_latitude") > 0) & (col("dropoff_latitude") <= 90) &
    (col("pickup_longitude") > -180) & (col("pickup_longitude") < 0) &  # NYC - longitude'lar negatif
    (col("dropoff_longitude") > -180) & (col("dropoff_longitude") < 0)
)


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 85, Finished, Available, Finished)

In [84]:
df_invalid_coords.groupBy(df_invalid_coords.columns) \
    .count() \
    .filter(col("count") > 1) \
    .show()
    #satur duplicate kontrolu

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 86, Finished, Available, Finished)

+---------+------------+---------+---------+------------------+---------------+----------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+-----------+-------------+-------------+-----+
|medallion|hack_license|vendor_id|rate_code|store_and_fwd_flag|pickup_datetime|dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|ingest_date|trip_duration|average_speed|count|
+---------+------------+---------+---------+------------------+---------------+----------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+-----------+-------------+-------------+-----+
+---------+------------+---------+---------+------------------+---------------+----------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+-----------

In [85]:
from pyspark.sql.functions import when, col

# Trip Distance için IQR hesapla
q1_d, q3_d = df_invalid_coords.approxQuantile("trip_distance", [0.25, 0.75], 0.01)
iqr_d = q3_d - q1_d
low_d = q1_d - 1.5 * iqr_d
high_d = q3_d + 1.5 * iqr_d

# Trip Time için IQR hesapla
q1_t, q3_t = df_invalid_coords.approxQuantile("trip_time_in_secs", [0.25, 0.75], 0.01)
iqr_t = q3_t - q1_t
low_t = q1_t - 1.5 * iqr_t
high_t = q3_t + 1.5 * iqr_t

# Aykırı değerleri işaretle
df_flagged = df_invalid_coords.withColumn(
    "trip_distance_outlier",
    when((col("trip_distance") < low_d) | (col("trip_distance") > high_d), True).otherwise(False)
).withColumn(
    "trip_time_outlier",
    when((col("trip_time_in_secs") < low_t) | (col("trip_time_in_secs") > high_t), True).otherwise(False)
)
'''
Verinin yayılımını ölçtük → IQR yöntemiyle:
Q1: Verilerin %25'inin altında kaldığı değer

Q3: Verilerin %75'inin altında kaldığı değer

IQR = Q3 - Q1 → normal yayılım aralığı

Sonra bu aralığın çok dışındaki değerleri tespit ettik:

alt sınır: Q1 - 1.5 × IQR
üst sınır: Q3 + 1.5 × IQR
'''


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 87, Finished, Available, Finished)

"\nVerinin yayılımını ölçtük → IQR yöntemiyle:\nQ1: Verilerin %25'inin altında kaldığı değer\n\nQ3: Verilerin %75'inin altında kaldığı değer\n\nIQR = Q3 - Q1 → normal yayılım aralığı\n\nSonra bu aralığın çok dışındaki değerleri tespit ettik:\n\nalt sınır: Q1 - 1.5 × IQR\nüst sınır: Q3 + 1.5 × IQR\n"

In [86]:

df_flagged.groupBy("trip_distance_outlier").count().show()
df_flagged.groupBy("trip_time_outlier").count().show()
#Outlier = "Sıradan olmayan, aşırı düşük veya aşırı yüksek değer". false → bu satırda aykırı (uç, sorunlu) bir değer yok


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 88, Finished, Available, Finished)

+---------------------+--------+
|trip_distance_outlier|   count|
+---------------------+--------+
|                 true| 1953351|
|                false|18918113|
+---------------------+--------+

+-----------------+--------+
|trip_time_outlier|   count|
+-----------------+--------+
|             true|  947797|
|            false|19923667|
+-----------------+--------+



In [87]:
from pyspark.sql.functions import concat_ws, sha2

df_with_trip_id = df_flagged.withColumn(
    "trip_id",
    sha2(concat_ws("_", "medallion", "hack_license", "pickup_datetime"), 256)
)
#benzersiz bir kimlik olusumu. merge ve joınde kolaylık 

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 89, Finished, Available, Finished)

In [88]:
df_with_trip_id.groupBy("trip_id").count().filter("count > 1").show()


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 90, Finished, Available, Finished)

+-------+-----+
|trip_id|count|
+-------+-----+
+-------+-----+



In [89]:
from pyspark.sql.functions import min, max

df_bronze = spark.read.table("bronze.bronze_uber_data_partioned")  # veya doğru tablo adını yaz

df_bronze.select(
    min("pickup_datetime").alias("en_erken_tarih"),
    max("pickup_datetime").alias("en_gec_tarih")
).show()


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 91, Finished, Available, Finished)

+-------------------+-------------------+
|     en_erken_tarih|       en_gec_tarih|
+-------------------+-------------------+
|2013-09-01 00:00:00|2013-12-31 23:59:57|
+-------------------+-------------------+



In [90]:
from pyspark.sql.functions import month

df_partitioned = df_with_trip_id.withColumn("month", month("pickup_datetime"))


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 92, Finished, Available, Finished)

In [91]:
df_partitioned.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("month") \
    .saveAsTable("silver.silver_uber_data_v3")

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 93, Finished, Available, Finished)

In [92]:
df_partitioned.printSchema()

StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 94, Finished, Available, Finished)

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: byte (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = false)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: short (nullable = true)
 |-- trip_time_in_secs: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- ingest_date: date (nullable = true)
 |-- trip_duration: long (nullable = true)
 |-- average_speed: double (nullable = true)
 |-- trip_distance_outlier: boolean (nullable = false)
 |-- trip_time_outlier: boolean (nullable = false)
 |-- trip_id: string (nullable = true)
 |-- month: integer (nullable = true)



In [93]:
spark.table("silver.silver_uber_data_v3").filter(
    (col("pickup_latitude") == 0) |
    (col("pickup_longitude") == 0) |
    (col("dropoff_latitude") == 0) |
    (col("dropoff_longitude") == 0)
).count()


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 95, Finished, Available, Finished)

0

In [94]:
from pyspark.sql.functions import count

df_silver = spark.table("silver.silver_uber_data_v3")

# trip_id ile gruplandır, birden fazla varsa duplicate’tir
df_silver.groupBy("trip_id").count().filter("count > 1").show()


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 96, Finished, Available, Finished)

+-------+-----+
|trip_id|count|
+-------+-----+
+-------+-----+



In [96]:
df_cleaned = df_silver.filter(col("trip_duration") <= 36000)


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 98, Finished, Available, Finished)

In [97]:
df_v3 = spark.table("silver.silver_uber_data_v3")

print("Silver v3 toplam satır sayısı:", df_v3.count())


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 99, Finished, Available, Finished)

Silver v3 toplam satır sayısı: 20871464


In [98]:
df_v3 = spark.table("silver.silver_uber_data_v3")
df_v = spark.table("silver.silver_uber_data")

count_v3 = df_v3.count()
count_v = df_v.count()

print(f"🟦 Silver v3 satır sayısı: {count_v3}")
print(f"✅ Silver v (temizlenmiş) satır sayısı: {count_v}")
print(f"🧮 Temizleme ile çıkarılan satır sayısı: {count_v3 - count_v}")


StatementMeta(, 4e2e43bd-5086-4272-a8f4-5bab36fa89c2, 100, Finished, Available, Finished)

🟦 Silver v3 satır sayısı: 20871464
✅ Silver v4 (temizlenmiş) satır sayısı: 21179750
🧮 Temizleme ile çıkarılan satır sayısı: -308286
