In [1]:
"""
Bu modül, PySpark kullanarak zaman damgası (timestamp) işlemleri yapmak için gerekli SparkSession oluşturulmasını
ve tarih/zaman fonksiyonlarının içe aktarımını içerir.
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, when, year, current_timestamp, expr,coalesce,month,day,date_format,lit
from pyspark.sql.types import TimestampType,StructType,StructField

In [2]:
"""
SparkSession oluşturulur ve zaman dönüşüm politikasında uyumluluk modu ayarlanır.
"""
spark = SparkSession.builder.appName("FinalTimestampComputation").getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [3]:
"""
Dataframe için manuel şema (schema) tanımı yapılır.
"""

"""
schema = StructType([
    StructField("ts",TimestampType(),True),
    StructField("kafka_ts",TimestampType(),True),
    StructField("signal_timestamp",TimestampType(),True),
    StructField("signal_oem_timestamp",TimestampType(),True),
    StructField("final_ts",TimestampType(),True),
])

"""

'\nschema = StructType([\n    StructField("ts",TimestampType(),True),\n    StructField("kafka_ts",TimestampType(),True),\n    StructField("signal_timestamp",TimestampType(),True),\n    StructField("signal_oem_timestamp",TimestampType(),True),\n    StructField("final_ts",TimestampType(),True),\n])\n\n'

In [4]:
"""
CSV dosyası, Spark DataFrame olarak okunur ve manuel olarak tanımlanmış şema kullanılır.
"""
df = spark.read.csv("dummy_data.csv",header=True,sep=";")

In [5]:
df.show(truncate=False)

+------------------------+-----------------------+-----------------------+-----------------------+--------+
|ts                      |kafka_ts               |signal_timestamp       |signal_oem_timestamp   |final_ts|
+------------------------+-----------------------+-----------------------+-----------------------+--------+
|2025-02-12T09:28:32.611Z|12.02.2025 12:28       |2025-02-12 12:28:32.000|2025-02-12 12:28:32.000|NULL    |
|2025-02-12T09:57:35.677Z|12.02.2025 12:57       |2025-02-12 12:57:35.000|2025-02-12 12:57:35.000|NULL    |
|2025-02-06T09:24:06.421Z|6.02.2025 12:24        |2025-02-06 12:24:03.000|2025-02-06 12:24:03.000|NULL    |
|2025-02-18T11:14:23.573Z|18.02.2025 14:14       |2025-02-18 14:14:21.000|2025-02-18 14:14:21.000|NULL    |
|2025-02-13T11:46:41.500Z|13.02.2025 14:46       |2025-02-13 14:46:40.000|2025-02-13 14:46:40.000|NULL    |
|2025-02-20T10:51:39.210Z|20.02.2025 13:51       |2025-02-20 13:51:38.000|2025-02-20 13:51:38.000|NULL    |
|2025-02-12T04:35:48.127Z|12

In [6]:
# Kısayol atamaları
ingestion = col("ts")
internalRTC = col("signal_timestamp")
modemUTC = col("signal_oem_timestamp")

In [7]:
"""
Timestamp alanları üzerinde çeşitli kontrol koşulları tanımlanır.
"""
cond_both_null = internalRTC.isNull() & modemUTC.isNull()
cond_internal_null = internalRTC.isNull()
cond_modem_null = modemUTC.isNull()
cond_has_millis = internalRTC.cast("string").contains(".")

cols=["ts","signal_timestamp","signal_oem_timestamp","final_ts"]

cond_final_ts_is_null = col("final_ts").isNull()
cond_invalid_years = year(col("final_ts")).isin([2000, 2001, 2002])

cond_weird_dates = (
        ((year(col("final_ts")) == 2021) & (month(col("final_ts")) == 3)) |
        ((year(col("final_ts")) == 1970) & (month(col("final_ts")) == 1)) |
        ((year(col("final_ts")) == 2000) & (month(col("final_ts")) == 1)) |
        ((year(col("final_ts")) == 2018) & (month(col("final_ts")) == 1) & (day(col("final_ts")) == 1)) |
        ((col("final_ts") >= to_timestamp(lit("2014-01-01"))) & (col("final_ts") <= to_timestamp(lit("2014-08-01"))))
    )
cond_future_range = (
        (col("final_ts") > current_timestamp()) &
        (col("final_ts") < expr("current_timestamp() + interval 1 day"))
    )
cond_recent_past = (
        (col("final_ts") < current_timestamp()) &
        (col("final_ts") > expr("current_timestamp() - interval 28 days"))
    )

In [8]:
# Koşula göre filtrele
def show_table(condition,cols,title):
    """
    Belirli bir filtreleme koşuluna göre DataFrame'den veri seçip başlıkla birlikte ekrana yazdırır.

    Parametreler:
    -------------
    condition : Column veya bool expression
        DataFrame'e uygulanacak filtreleme koşulu.

    cols : list of str
        Gösterilecek sütunların isimlerini içeren liste.

    title : str
        Gösterim öncesinde konsola yazılacak başlık metni.Çıktıyı anlamlandırmak için kullanılır.
    """
    print(title)
    df.filter(condition).select(cols).show(10,truncate=False)

In [9]:
# Farklı timestamp formatlarını dönüştür
def convert_timestamp(column):
    """
    Birden fazla olası tarih/zaman formatını kontrol ederek string ifadeleri timestamp veri tipine dönüştürür.
    """
    return coalesce(
        to_timestamp(column, "yyyy-MM-dd HH:mm:ss.SSS"),
        to_timestamp(column, "dd.MM.yyyy HH:mm:ss"),
        to_timestamp(column, "dd.MM.yyyy HH:mm"),
        to_timestamp(column)
    )

In [10]:
"""
DataFrame'deki timestamp içeren sütunlar, çoklu format destekleyen `convert_timestamp()` fonksiyonu ile
standart `TimestampType` biçimine dönüştürülür.
"""
df = df.withColumn("ts", convert_timestamp(col("ts"))) \
       .withColumn("signal_timestamp", convert_timestamp(col("signal_timestamp"))) \
       .withColumn("signal_oem_timestamp", convert_timestamp(col("signal_oem_timestamp")))

In [11]:
# Conditionlara göre filtreleme ve görüntüleme
show_table(cond_both_null,cols,"internalRTC and modemUTC is null:")

internalRTC and modemUTC is null:
+---+----------------+--------------------+--------+
|ts |signal_timestamp|signal_oem_timestamp|final_ts|
+---+----------------+--------------------+--------+
+---+----------------+--------------------+--------+



In [12]:
show_table(cond_internal_null,cols,"internalRTC is null:")

internalRTC is null:
+---+----------------+--------------------+--------+
|ts |signal_timestamp|signal_oem_timestamp|final_ts|
+---+----------------+--------------------+--------+
+---+----------------+--------------------+--------+



In [13]:
show_table(cond_modem_null,cols,"modemUTC is null")

modemUTC is null
+---+----------------+--------------------+--------+
|ts |signal_timestamp|signal_oem_timestamp|final_ts|
+---+----------------+--------------------+--------+
+---+----------------+--------------------+--------+



In [14]:
show_table((~cond_both_null) & cond_has_millis,cols,"both  are not null and internalRTC has milisecond")

both  are not null and internalRTC has milisecond
+-----------------------+-----------------------+--------------------+--------+
|ts                     |signal_timestamp       |signal_oem_timestamp|final_ts|
+-----------------------+-----------------------+--------------------+--------+
|2025-02-26 14:58:39.54 |2025-02-26 17:58:38.055|2025-02-26 17:58:39 |NULL    |
|2025-02-04 05:48:20.019|2025-02-04 08:48:20.042|2025-02-04 08:48:18 |NULL    |
|2025-02-25 15:18:54.229|2025-02-25 18:18:53.061|2025-02-25 18:18:53 |NULL    |
|2025-02-26 23:03:34.782|2025-02-26 21:48:55.091|2025-02-26 21:29:15 |NULL    |
|2025-02-27 18:39:02.066|2025-02-27 21:39:02.099|2025-02-27 21:39:01 |NULL    |
|2025-02-04 14:14:26.446|2025-02-04 17:14:22.057|2025-02-04 17:14:26 |NULL    |
|2025-02-26 09:42:53.002|2025-02-26 12:42:53.02 |2025-02-26 12:42:51 |NULL    |
|2025-02-26 06:16:01.567|2025-02-26 09:15:57.076|2025-02-26 09:16:01 |NULL    |
|2025-02-04 07:02:10.003|2025-02-04 10:02:10.021|2021-03-23 10:49:37 |

In [15]:
show_table((~cond_both_null) & (~cond_has_millis),cols,"both are not null and internalRTC doesn't have millisecond")

both are not null and internalRTC doesn't have millisecond
+-----------------------+-------------------+--------------------+--------+
|ts                     |signal_timestamp   |signal_oem_timestamp|final_ts|
+-----------------------+-------------------+--------------------+--------+
|2025-02-12 09:28:32.611|2025-02-12 12:28:32|2025-02-12 12:28:32 |NULL    |
|2025-02-12 09:57:35.677|2025-02-12 12:57:35|2025-02-12 12:57:35 |NULL    |
|2025-02-06 09:24:06.421|2025-02-06 12:24:03|2025-02-06 12:24:03 |NULL    |
|2025-02-18 11:14:23.573|2025-02-18 14:14:21|2025-02-18 14:14:21 |NULL    |
|2025-02-13 11:46:41.5  |2025-02-13 14:46:40|2025-02-13 14:46:40 |NULL    |
|2025-02-20 10:51:39.21 |2025-02-20 13:51:38|2025-02-20 13:51:38 |NULL    |
|2025-02-12 04:35:48.127|2025-02-12 07:35:47|2025-02-12 07:35:47 |NULL    |
|2025-02-13 08:45:01.209|2025-02-13 11:45:00|2025-02-13 11:45:00 |NULL    |
|2025-02-11 12:39:57.759|2025-02-11 15:39:00|2025-02-11 15:39:58 |NULL    |
|2025-02-23 02:07:06.367|2025

In [16]:
"""
`final_ts` sütununa, `internalRTC` ve `modemUTC` zaman damgalarından en uygun olanı seçilir.
"""
df = df.withColumn("final_ts",
    when(cond_both_null, None)
    .when(cond_internal_null, modemUTC)
    .when(cond_modem_null, internalRTC)
    .when(cond_has_millis, internalRTC)
    .otherwise(modemUTC)
)

In [17]:
# message_time fonksiyonu uygulanmadan önceki timestamp filtrelemeleri ve görüntülemeleri
show_table(cond_invalid_years,cols,"Timestamp year 2000,2001,2002")

Timestamp year 2000,2001,2002
+-----------------------+-------------------+--------------------+-------------------+
|ts                     |signal_timestamp   |signal_oem_timestamp|final_ts           |
+-----------------------+-------------------+--------------------+-------------------+
|2025-02-01 12:08:57.477|2025-02-01 15:08:00|2000-01-01 14:18:11 |2000-01-01 14:18:11|
|2025-02-17 10:41:14.86 |2025-02-17 13:41:00|2000-05-10 16:42:21 |2000-05-10 16:42:21|
|2025-02-11 12:50:41.529|2025-02-11 15:50:00|2000-01-01 02:04:24 |2000-01-01 02:04:24|
|2025-02-27 10:28:32.555|2025-02-27 13:28:00|2000-01-02 09:33:16 |2000-01-02 09:33:16|
|2025-02-14 08:18:29.394|2025-02-14 11:18:00|2000-01-01 14:20:31 |2000-01-01 14:20:31|
+-----------------------+-------------------+--------------------+-------------------+



In [18]:
show_table(cond_weird_dates,cols,"Timestamp March 2021,January 1970, January 2000,1/1/2018,[1/1/2014-1/8/2014]")

Timestamp March 2021,January 1970, January 2000,1/1/2018,[1/1/2014-1/8/2014]
+-----------------------+-------------------+--------------------+-------------------+
|ts                     |signal_timestamp   |signal_oem_timestamp|final_ts           |
+-----------------------+-------------------+--------------------+-------------------+
|2025-02-28 14:51:35.544|2025-02-28 17:51:00|2021-03-23 10:50:21 |2021-03-23 10:50:21|
|2025-02-01 12:08:57.477|2025-02-01 15:08:00|2000-01-01 14:18:11 |2000-01-01 14:18:11|
|2025-02-17 13:53:19.69 |2025-02-17 16:53:00|2021-03-23 13:02:54 |2021-03-23 13:02:54|
|2025-02-11 12:50:41.529|2025-02-11 15:50:00|2000-01-01 02:04:24 |2000-01-01 02:04:24|
|2025-02-05 05:23:33.944|2025-02-05 08:23:00|2021-03-23 10:53:47 |2021-03-23 10:53:47|
|2025-02-27 10:28:32.555|2025-02-27 13:28:00|2000-01-02 09:33:16 |2000-01-02 09:33:16|
|2025-02-14 08:18:29.394|2025-02-14 11:18:00|2000-01-01 14:20:31 |2000-01-01 14:20:31|
+-----------------------+-------------------+--------

In [19]:
show_table(cond_future_range,cols,"Timestamp in the future but <24h in the future")

Timestamp in the future but <24h in the future
+---+----------------+--------------------+--------+
|ts |signal_timestamp|signal_oem_timestamp|final_ts|
+---+----------------+--------------------+--------+
+---+----------------+--------------------+--------+



In [20]:
show_table(cond_recent_past,cols,"Timestamp in the past but < 4 weeks in the past")

Timestamp in the past but < 4 weeks in the past
+---+----------------+--------------------+--------+
|ts |signal_timestamp|signal_oem_timestamp|final_ts|
+---+----------------+--------------------+--------+
+---+----------------+--------------------+--------+



In [21]:
#MESSAGE_TIME
def message_time(df):
    """
  ` final_ts` sütununun güvenilirliğini kontrol eder ve belirli tarihsel anormalliklere göre düzeltir.
    """
    return df.withColumn(
        "final_ts",
        when(cond_final_ts_is_null, None)
        .when(cond_invalid_years, ingestion)
        .when(cond_weird_dates, ingestion)
        .when(cond_future_range, ingestion)
        .when(cond_recent_past, col("final_ts"))
        .otherwise(None)
    )

In [22]:
"""
Zaman doğrulama ve düzeltme işlemleri `message_time` fonksiyonu aracılığıyla uygulanır.
"""
df = message_time(df)

In [23]:
show_table(col("ts") == col("final_ts"),cols,"Message Time=Ingtestion Time'ları gözlemleyelim!")

Message Time=Ingtestion Time'ları gözlemleyelim!
+-----------------------+-------------------+--------------------+-----------------------+
|ts                     |signal_timestamp   |signal_oem_timestamp|final_ts               |
+-----------------------+-------------------+--------------------+-----------------------+
|2025-02-28 14:51:35.544|2025-02-28 17:51:00|2021-03-23 10:50:21 |2025-02-28 14:51:35.544|
|2025-02-01 12:08:57.477|2025-02-01 15:08:00|2000-01-01 14:18:11 |2025-02-01 12:08:57.477|
|2025-02-17 10:41:14.86 |2025-02-17 13:41:00|2000-05-10 16:42:21 |2025-02-17 10:41:14.86 |
|2025-02-17 13:53:19.69 |2025-02-17 16:53:00|2021-03-23 13:02:54 |2025-02-17 13:53:19.69 |
|2025-02-11 12:50:41.529|2025-02-11 15:50:00|2000-01-01 02:04:24 |2025-02-11 12:50:41.529|
|2025-02-05 05:23:33.944|2025-02-05 08:23:00|2021-03-23 10:53:47 |2025-02-05 05:23:33.944|
|2025-02-27 10:28:32.555|2025-02-27 13:28:00|2000-01-02 09:33:16 |2025-02-27 10:28:32.555|
|2025-02-14 08:18:29.394|2025-02-14 11:18

In [24]:
show_table(cond_recent_past,cols,"Timestamp in the past but < 4 weeks in the past")

Timestamp in the past but < 4 weeks in the past
+---+----------------+--------------------+--------+
|ts |signal_timestamp|signal_oem_timestamp|final_ts|
+---+----------------+--------------------+--------+
+---+----------------+--------------------+--------+



In [25]:
show_table(~cond_final_ts_is_null,cols,"final_ts null olmayanları listeleyelim!")

final_ts null olmayanları listeleyelim!
+-----------------------+-------------------+--------------------+-----------------------+
|ts                     |signal_timestamp   |signal_oem_timestamp|final_ts               |
+-----------------------+-------------------+--------------------+-----------------------+
|2025-02-28 14:51:35.544|2025-02-28 17:51:00|2021-03-23 10:50:21 |2025-02-28 14:51:35.544|
|2025-02-01 12:08:57.477|2025-02-01 15:08:00|2000-01-01 14:18:11 |2025-02-01 12:08:57.477|
|2025-02-17 10:41:14.86 |2025-02-17 13:41:00|2000-05-10 16:42:21 |2025-02-17 10:41:14.86 |
|2025-02-17 13:53:19.69 |2025-02-17 16:53:00|2021-03-23 13:02:54 |2025-02-17 13:53:19.69 |
|2025-02-11 12:50:41.529|2025-02-11 15:50:00|2000-01-01 02:04:24 |2025-02-11 12:50:41.529|
|2025-02-05 05:23:33.944|2025-02-05 08:23:00|2021-03-23 10:53:47 |2025-02-05 05:23:33.944|
|2025-02-27 10:28:32.555|2025-02-27 13:28:00|2000-01-02 09:33:16 |2025-02-27 10:28:32.555|
|2025-02-14 08:18:29.394|2025-02-14 11:18:00|2000-