In [0]:
from pyspark.sql import functions as F
from datetime import datetime

silver_log_df = spark.table("vr_hopea.ingestion_log")
last_write_time = silver_log_df.agg({'latest_data_date': 'max'}).first()[0]

expected_columns = [
    'departureDate',
    'trainNumber',
    'trainCategory',
    'trainType',
    'commuterLineID',
    'stationShortCode',
    'trainStopping',
    'commercialStop',
    'type',
    'scheduledTime',
    'actualTime',
    'differenceInMinutes',
    'operatorUICCode',
    'operatorShortCode',
    'cancelled'
]

bronze_df_new_rows = spark.table("vr_pronssi.vr_raw").where(F.col("departureDate") > last_write_time)
if bronze_df_new_rows.isEmpty():
  raise Exception("PRONSSITASOLLA EI OLLUT UUSIA RIVEJÄ")

actual_columns = bronze_df_new_rows.columns

if not set(expected_columns) == set(actual_columns):
  dbutils.notebook.exit("SARAKKEET EIVÄT TÄSMÄNNEET")

In [0]:
last_write_time

datetime.date(2023, 3, 1)

In [0]:
display(bronze_df_new_rows.limit(5))

departureDate,trainNumber,trainCategory,trainType,commuterLineID,stationShortCode,type,scheduledTime,actualTime,differenceInMinutes,operatorUICCode,operatorShortCode,cancelled
2026-01-26,1,Long-distance,IC,,HKI,DEPARTURE,2026-01-26T04:54:00.000Z,2026-01-26T04:54:26.000Z,0.0,10,vr,False
2026-01-26,1,Long-distance,IC,,PSL,ARRIVAL,2026-01-26T04:59:00.000Z,2026-01-26T04:58:51.000Z,0.0,10,vr,False
2026-01-26,1,Long-distance,IC,,PSL,DEPARTURE,2026-01-26T05:00:00.000Z,2026-01-26T05:00:47.000Z,1.0,10,vr,False
2026-01-26,1,Long-distance,IC,,LOP,ARRIVAL,2026-01-26T05:00:53.000Z,,1.0,10,vr,False
2026-01-26,1,Long-distance,IC,,LOP,DEPARTURE,2026-01-26T05:00:53.000Z,,1.0,10,vr,False


In [0]:
bronze_df_new_rows.printSchema()

root
 |-- departureDate: string (nullable = true)
 |-- trainNumber: long (nullable = true)
 |-- trainCategory: string (nullable = true)
 |-- trainType: string (nullable = true)
 |-- commuterLineID: string (nullable = true)
 |-- stationShortCode: string (nullable = true)
 |-- type: string (nullable = true)
 |-- scheduledTime: string (nullable = true)
 |-- actualTime: string (nullable = true)
 |-- differenceInMinutes: double (nullable = true)
 |-- operatorUICCode: long (nullable = true)
 |-- operatorShortCode: string (nullable = true)
 |-- cancelled: boolean (nullable = true)



In [0]:
col_changes = [
    F.to_date(F.col("departureDate"), "yyyy-MM-dd").alias('departure_date'),
    F.col('trainNumber').cast('int').alias('train_number'),
    F.col('trainCategory').alias('train_category'),
    F.col('trainType').alias('train_type'),
    F.col('commuterLineID').alias('commuter_line_id'),
    F.col('stationShortCode').alias('station_short_code'),
    F.col('trainStopping').alias('train_stopping'),
    F.col('commercialStop').alias('commercial_stop'),
    F.col('type').alias('event_type'),
    F.to_timestamp('scheduledTime', "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").alias('scheduled_time'),
    F.to_timestamp('actualTime', "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").alias('actual_time'),
    F.col('differenceInMinutes').cast('int').alias('difference_in_minutes'),
    F.col('operatorUICCode').cast('int').alias('operator_uic_code'),
    F.col('operatorShortCode').alias('operator_short_code')
]

column_names_changed = ['departureDate', 'trainNumber', 'trainCategory', 'trainType', 'commuterLineID',
                         'stationShortCode', 'trainStopping', 'commercialStop', 'type', 'scheduledTime', 'actualTime', 'differenceInMinutes', 'operatorUICCode', 'operatorShortCode']

silver_df_new_rows = (
    bronze_df_new_rows
    .select('*', *col_changes)
    .drop(*column_names_changed)
)

In [0]:
display(silver_df_new_rows.limit(5))

cancelled,departure_date,train_number,train_category,train_type,commuter_line_id,station_short_code,event_type,scheduled_time,actual_time,difference_in_minutes,operator_uic_code,operator_short_code
False,2026-01-26,1,Long-distance,IC,,HKI,DEPARTURE,2026-01-26T04:54:00.000Z,2026-01-26T04:54:26.000Z,0,10,vr
False,2026-01-26,1,Long-distance,IC,,PSL,ARRIVAL,2026-01-26T04:59:00.000Z,2026-01-26T04:58:51.000Z,0,10,vr
False,2026-01-26,1,Long-distance,IC,,PSL,DEPARTURE,2026-01-26T05:00:00.000Z,2026-01-26T05:00:47.000Z,1,10,vr
False,2026-01-26,1,Long-distance,IC,,LOP,ARRIVAL,2026-01-26T05:00:53.000Z,,1,10,vr
False,2026-01-26,1,Long-distance,IC,,LOP,DEPARTURE,2026-01-26T05:00:53.000Z,,1,10,vr


In [0]:

avg_diff_time = silver_df_new_rows.select(
    (F.unix_timestamp('actual_time') - F.unix_timestamp('scheduled_time'))
    .alias('diff')
    ).where(F.col('diff').isNotNull()) \
    .agg(F.avg(F.col('diff') / 60)) \
    .first()[0]
    
avg_diff_time = int(round(avg_diff_time)) if avg_diff_time is not None else 0

silver_df_new_rows = silver_df_new_rows.withColumn(
    'difference_in_minutes', 
    F.when(
        F.col('difference_in_minutes').isNotNull(),
        F.col('difference_in_minutes')
    ).when(
        F.col('actual_time').isNotNull() & F.col('scheduled_time').isNotNull(),
        F.round((F.unix_timestamp('actual_time') - F.unix_timestamp('scheduled_time')) / 60).cast('int')
    ).otherwise(
        F.lit(avg_diff_time).cast('int')
        )
    ).withColumn(
        'event_nk',
        F.sha2(
            F.concat_ws(
                '||',
                F.col('train_number'),
                F.col('station_short_code'),
                F.col('event_type'),
                F.col('scheduled_time')
            ), 
            256
        )
    ).withColumn(
        'actual_hour',
        F.hour(F.coalesce('actual_time', 'scheduled_time'))
    ).withColumn(
        'commercial_stop',
        F.coalesce(F.col('train_stopping'), F.lit(False))
    )

In [0]:
display(silver_df_new_rows.limit(5))

cancelled,departure_date,train_number,train_category,train_type,commuter_line_id,station_short_code,event_type,scheduled_time,actual_time,difference_in_minutes,operator_uic_code,operator_short_code,difference_in_seconds,event_nk,actual_hour
False,2026-01-26,1,Long-distance,IC,,HKI,DEPARTURE,2026-01-26T04:54:00.000Z,2026-01-26T04:54:26.000Z,0,10,vr,-26.0,97b6cc28f73b4b484d4164257b00d38cc4cefc7aa9f75cd4b2f8c6b74e4e24ef,4
False,2026-01-26,1,Long-distance,IC,,PSL,ARRIVAL,2026-01-26T04:59:00.000Z,2026-01-26T04:58:51.000Z,0,10,vr,9.0,32db92735c1eb96168a1af31ffb0fbcd0b3f490f63c189d49ebe88f5c3ade07e,4
False,2026-01-26,1,Long-distance,IC,,PSL,DEPARTURE,2026-01-26T05:00:00.000Z,2026-01-26T05:00:47.000Z,1,10,vr,-47.0,f01b2f0a61aed7e51b06d5ccb4cc9a644bd1dfbdd5b5eef283a9452ad29d4307,5
False,2026-01-26,1,Long-distance,IC,,LOP,ARRIVAL,2026-01-26T05:00:53.000Z,,1,10,vr,-60.0,b44ab8ee224bc79bbaacbd281f1922444a67be14b73c4819cfd9de9877b6c798,5
False,2026-01-26,1,Long-distance,IC,,LOP,DEPARTURE,2026-01-26T05:00:53.000Z,,1,10,vr,-60.0,62b6835fb39fc0a4f9f376deb8d8f3ce5edcc642361557ff8959f9e910f0a35b,5


In [0]:
silver_df_new_rows.write.mode("append").saveAsTable("vr_hopea.vr_processed")

In [0]:
now = datetime.now()
max_date = silver_df_new_rows.agg({'departure_date': 'max'}).first()[0]
row_count = silver_df_new_rows.count()

spark.sql(f"INSERT INTO vr_hopea.ingestion_log VALUES ('{now}', '{max_date}', '{row_count}')")

dbutils.notebook.exit("HOPEATASON DATA JA LOKI KIRJOITETTU ONNISTUNEESTI")

