In [1]:
import polars as pl
import datetime as dt
import seaborn as sns

In [2]:
pth_data = f"C:/Users/z187070/Documents/Projects/PAXCOUNTER/data/"

In [3]:
region_name = "Ost"
mngmt_name = "Mecklenburg-Vorpommern"
mngmt_filter = "Mecklenburg-Vorpommern"

In [4]:
#pth_master = pth_data + f"master/master/{region_name}/master_data/master_data.csv"
pth_ris = pth_data + f"ris/ris/{region_name}/{mngmt_name}/ris_data/ris_data.csv"
pth_ris_extra = pth_data + f"{mngmt_name}/ris_data/ris_data.csv"
pth_pax = pth_data + f"pax_data_{region_name}_{mngmt_name}.parquet"

## read data from ris

In [5]:
ris = pl.read_csv(pth_ris, has_header=True, separator=";")
ris_extra = pl.read_csv(pth_ris_extra, has_header = True, separator=";")
ris = pl.concat([ris, ris_extra])

In [6]:
ris.select(pl.col("case").unique())

case
str
"""endbahnhof"""
"""ankunft"""
"""abfahrt"""
"""startbahnhof"""


In [7]:
ris.head(2)

fahrtid,ereignis_station_id,zeit_echt,zeit_echt_verspaetung,gleis_echt,tpname_bahnsteig_echt,gattung,case
str,i64,str,i64,str,str,str,str
"""20241130-6c7c3…",2468,"""2024-12-01 00:…",4,"""02468-01-B02-G…","""Bahnsteig 02""","""RE""","""ankunft"""
"""20241130-6c7c3…",2468,"""2024-12-01 00:…",5,"""02468-01-B02-G…","""Bahnsteig 02""","""RE""","""abfahrt"""


In [8]:
ris.head(2).select("zeit_echt").to_series().to_list()

['2024-12-01 00:03:14', '2024-12-01 00:04:21']

In [9]:
data_filter_date = dt.datetime(2025, 4, 1)

In [10]:
base_data = ris.select(
    pl.col("fahrtid"), 
    pl.col("ereignis_station_id"),
    pl.col("zeit_echt").str.to_datetime(), 
    pl.col("case")
).filter(
    pl.col("zeit_echt") >= data_filter_date
)

In [11]:
base_data.select(pl.count())

count
u32
270597


In [12]:
base_data.head()

fahrtid,ereignis_station_id,zeit_echt,case
str,i64,datetime[μs],str
"""20250331-f653a…",3830,2025-04-01 00:01:26,"""ankunft"""
"""20250331-086e5…",3559,2025-04-01 00:03:15,"""ankunft"""
"""20250331-086e5…",3559,2025-04-01 00:03:15,"""abfahrt"""
"""20250331-ff899…",2468,2025-04-01 00:03:38,"""ankunft"""
"""20250401-ebd72…",6050,2025-04-01 00:04:12,"""ankunft"""


In [13]:
base_data.group_by(["fahrtid", "ereignis_station_id"]).agg(pl.count()).filter(pl.col("count") > 2).head()

fahrtid,ereignis_station_id,count
str,i64,u32


checking data quality - some arrival/departure combinations are doubles (4 rows instead of 2) - there is no 3 and no more than 4, the time seems to be the same in all cases, i will just take one of these values

this is not the case for data later than 01.05.2025??

In [14]:
base_data.group_by(["fahrtid", "ereignis_station_id"]).agg(pl.count()).filter(pl.col("count") > 4).head(2).select("fahrtid").to_series().to_list()

[]

In [15]:
base_data.filter(pl.col("fahrtid") == '20250325-353a3053-1386-374c-83d3-c200e93212cb')

fahrtid,ereignis_station_id,zeit_echt,case
str,i64,datetime[μs],str


In [16]:
time_info = base_data.filter(pl.col("case").is_in(["abfahrt", "ankunft"])).\
pivot(columns = "case", index = ["fahrtid", "ereignis_station_id"], values = "zeit_echt", aggregate_function="min").\
with_columns(
    (pl.col("abfahrt")-pl.col("ankunft")).alias("time_diff")
)

In [17]:
time_info.head()

fahrtid,ereignis_station_id,ankunft,abfahrt,time_diff
str,i64,datetime[μs],datetime[μs],duration[μs]
"""20250331-f653a…",3830,2025-04-01 00:01:26,,
"""20250331-086e5…",3559,2025-04-01 00:03:15,2025-04-01 00:03:15,0µs
"""20250331-ff899…",2468,2025-04-01 00:03:38,2025-04-01 00:04:35,57s
"""20250401-ebd72…",6050,2025-04-01 00:04:12,2025-04-01 00:05:10,58s
"""20250331-9b2fd…",5127,2025-04-01 00:04:31,2025-04-01 00:04:44,13s


## creating the table

In [18]:
minutes_before_ankunft = 10
minutes_after_ankunft = 5

In [19]:
df_timematch = time_info.\
    filter(pl.col("ankunft").is_not_null()).\
    select(
        pl.col("fahrtid"),
        pl.col("ereignis_station_id"),
        pl.col("ankunft"),
        (pl.col("ankunft") - dt.timedelta(minutes=minutes_before_ankunft)).alias("begin_tw"),
        (pl.col("ankunft") + dt.timedelta(minutes=minutes_after_ankunft)).alias("end_tw")
).with_columns(
    pl.col("ankunft").dt.date().alias("date")
)
df_timematch.head()

fahrtid,ereignis_station_id,ankunft,begin_tw,end_tw,date
str,i64,datetime[μs],datetime[μs],datetime[μs],date
"""20250331-f653a…",3830,2025-04-01 00:01:26,2025-03-31 23:51:26,2025-04-01 00:06:26,2025-04-01
"""20250331-086e5…",3559,2025-04-01 00:03:15,2025-03-31 23:53:15,2025-04-01 00:08:15,2025-04-01
"""20250331-ff899…",2468,2025-04-01 00:03:38,2025-03-31 23:53:38,2025-04-01 00:08:38,2025-04-01
"""20250401-ebd72…",6050,2025-04-01 00:04:12,2025-03-31 23:54:12,2025-04-01 00:09:12,2025-04-01
"""20250331-9b2fd…",5127,2025-04-01 00:04:31,2025-03-31 23:54:31,2025-04-01 00:09:31,2025-04-01


In [20]:
df_pax = pl.read_parquet(pth_pax).\
    select(
        pl.col("pax_counter_id"),
        pl.col("time_iot").str.to_datetime(), 
        pl.col("station_id"),
        pl.col("data_pax")).\
    filter(
        pl.col("time_iot") >= data_filter_date,
        
    ).with_columns(
        pl.col("time_iot").dt.date().alias("date")
    )

df_pax.head()

pax_counter_id,time_iot,station_id,data_pax,date
str,datetime[μs],i64,i64,date
"""083af23fd0df""",2025-04-01 00:00:20,719,0,2025-04-01
"""083af23fd0df""",2025-04-01 00:01:20,719,0,2025-04-01
"""083af23fd0df""",2025-04-01 00:02:20,719,0,2025-04-01
"""083af23fd0df""",2025-04-01 00:03:20,719,0,2025-04-01
"""083af23fd0df""",2025-04-01 00:04:20,719,0,2025-04-01


In [21]:
dates_to_process = df_timematch.select(pl.col("date").unique()).to_series().to_list()
len(dates_to_process)

84

In [22]:
for date in dates_to_process:
    df_joined_date = df_timematch.filter(
            pl.col("date") == date
        ).join(
            df_pax.filter(pl.col("date") == date),
            left_on = "ereignis_station_id",
            right_on = "station_id",
            how = "inner"
        ).filter(
            (pl.col("time_iot") >= pl.col("begin_tw")),
            (pl.col("time_iot") <= pl.col("end_tw"))
        ).with_columns(
            pl.when(pl.col("time_iot") <= pl.col("ankunft")).\
                then("before").\
                otherwise(pl.when(pl.col("time_iot") > pl.col("ankunft")).\
                    then("after").\
                    otherwise("undefined")).alias("pax_status")
        )
    
    df_before = df_joined_date.filter(pl.col("pax_status") == "before").\
        group_by(["fahrtid", "ereignis_station_id", "pax_counter_id"]).\
        agg(pl.col("data_pax").max().alias("data_pax_before"))

    df_after = df_joined_date.filter(pl.col("pax_status") == "after").\
        group_by(["fahrtid", "ereignis_station_id", "pax_counter_id"]).\
        agg(pl.col("data_pax").min().alias("data_pax_after"))
    
    # if len(df_after) != len(df_before):
    #     raise ValueError("number of entries in after/before not the same")#
    
    df_pax_agg = df_before.\
    join(df_after,
    on = ["fahrtid", "ereignis_station_id", "pax_counter_id"],
    how = "outer")

    # if len(df_pax_agg) != len(df_before):
    #     raise ValueError("number of entries changed by joining")
    
    id_table = base_data.with_columns(
            pl.col("zeit_echt").dt.date().alias("date")
        ).filter(
            pl.col("date") == date
        ).\
        group_by(["fahrtid", "ereignis_station_id"]).agg(pl.count()).select(["fahrtid", "ereignis_station_id"])

    df_pax_result_date = df_pax_agg.join(id_table, on = ["fahrtid", "ereignis_station_id"], how = "left")

    df_pax_result_date.write_parquet(f"../../data/congestion_data/congestion_data_{region_name}_{mngmt_name}_{date}.parquet")


    

  then("before").\
  then("after").\
  otherwise("undefined")).alias("pax_status")


### read all the data in again

In [23]:
dat = pl.read_parquet(f"../../data/congestion_data/*.parquet")
dat.head()

fahrtid,ereignis_station_id,pax_counter_id,data_pax_before,data_pax_after
str,i64,str,i64,i64
"""20250401-288a8…",719,"""083af23fd0df""",1,0
"""20250401-a76ba…",719,"""083af23fd0df""",1,0
"""20250401-74933…",719,"""083af23fd0df""",3,1
"""20250401-6a4eb…",719,"""083af23fd0df""",0,0
"""20250401-12872…",2468,"""083af23ff6e7""",26,2


In [24]:
len(dat)

193641

In [25]:
len(base_data)

270597

## data quality checks - what is happening in singular steps

In [26]:
filter_date = date

In [27]:
first_try = df_timematch.filter(
    pl.col("date") == filter_date
).join(
    df_pax.filter(pl.col("date") == filter_date),
    left_on = "ereignis_station_id",
    right_on = "station_id",
    how = "inner"
).filter(
    (pl.col("time_iot") >= pl.col("begin_tw")),
    (pl.col("time_iot") <= pl.col("end_tw"))
).with_columns(
    pl.when(pl.col("time_iot") <= pl.col("ankunft")).\
        then("before").\
        otherwise(pl.when(pl.col("time_iot") > pl.col("ankunft")).\
            then("after").\
            otherwise("undefined")).alias("pax_status")
)

first_try.head()

  then("before").\
  then("after").\
  otherwise("undefined")).alias("pax_status")


fahrtid,ereignis_station_id,ankunft,begin_tw,end_tw,date,pax_counter_id,time_iot,data_pax,date_right,pax_status
str,i64,datetime[μs],datetime[μs],datetime[μs],date,str,datetime[μs],i64,date,str
"""20250622-c8d8d…",2468,2025-06-23 00:16:21,2025-06-23 00:06:21,2025-06-23 00:21:21,2025-06-23,"""083af23ff6e7""",2025-06-23 00:06:31,0,2025-06-23,"""before"""
"""20250622-c8d8d…",2468,2025-06-23 00:16:21,2025-06-23 00:06:21,2025-06-23 00:21:21,2025-06-23,"""083af23ff6e7""",2025-06-23 00:07:31,0,2025-06-23,"""before"""
"""20250622-c8d8d…",2468,2025-06-23 00:16:21,2025-06-23 00:06:21,2025-06-23 00:21:21,2025-06-23,"""083af23ff6e7""",2025-06-23 00:08:31,0,2025-06-23,"""before"""
"""20250622-c8d8d…",2468,2025-06-23 00:16:21,2025-06-23 00:06:21,2025-06-23 00:21:21,2025-06-23,"""083af23ff6e7""",2025-06-23 00:09:31,0,2025-06-23,"""before"""
"""20250622-c8d8d…",2468,2025-06-23 00:16:21,2025-06-23 00:06:21,2025-06-23 00:21:21,2025-06-23,"""083af23ff6e7""",2025-06-23 00:10:31,0,2025-06-23,"""before"""


In [28]:
first_try.select(pl.col("pax_status").unique())

pax_status
str
"""after"""
"""before"""


In [29]:
df_before = first_try.filter(pl.col("pax_status") == "before").\
    group_by(["fahrtid", "ereignis_station_id", "pax_counter_id"]).\
    agg(pl.col("data_pax").max().alias("data_pax_before"))

df_after = first_try.filter(pl.col("pax_status") == "after").\
    group_by(["fahrtid", "ereignis_station_id", "pax_counter_id"]).\
    agg(pl.col("data_pax").min().alias("data_pax_after"))

In [30]:
df_before.select(pl.count())

count
u32
2257


In [31]:
df_after.select(pl.count())

count
u32
2229


In [32]:
df_pax_agg = df_before.\
    join(df_after,
    on = ["fahrtid", "ereignis_station_id", "pax_counter_id"],
    how = "outer")

In [33]:
df_pax_agg.select(pl.count())

count
u32
2257


In [34]:
df_pax_agg.filter(pl.col("data_pax_after").is_null())

fahrtid,ereignis_station_id,pax_counter_id,data_pax_before,data_pax_after
str,i64,str,i64,i64
"""20250623-83966…",5758,"""244cab046a7b""",0,
"""20250623-83966…",5756,"""244cab06259f""",0,
"""20250623-83966…",2877,"""3494545a2107""",0,
"""20250623-e5aea…",4457,"""244cab01f457""",0,
"""20250623-99a29…",1263,"""244cab06fb47""",0,
"""20250623-bcf35…",5034,"""244cab068dcb""",0,
"""20250623-b6522…",5127,"""a0a3b38c53a3""",0,
"""20250623-b30a0…",6173,"""e831cdc2595b""",0,
"""20250623-362f8…",2468,"""083af23ff6e7""",1,
"""20250623-7fb72…",2468,"""083af23ff6e7""",0,


there are multiple sensors on some stations so more rows than in the cleaned timetable are expected


maximum should be 3 though

In [35]:
df_pax_agg.group_by(["fahrtid", "ereignis_station_id"]).agg(pl.count()).sort("count", descending=True).head()

fahrtid,ereignis_station_id,count
str,i64,u32
"""20250623-0296e…",2877,3
"""20250623-55593…",2877,3
"""20250623-caa20…",2877,3
"""20250623-d641c…",2877,3
"""20250623-e7d31…",2877,3


In [36]:
id_table = base_data.with_columns(
        pl.col("zeit_echt").dt.date().alias("date")
    ).filter(
        pl.col("date") == filter_date
    ).\
    group_by(["fahrtid", "ereignis_station_id"]).agg(pl.count()).select(["fahrtid", "ereignis_station_id"])

In [37]:
id_table.head()

fahrtid,ereignis_station_id
str,i64
"""20250623-dac59…",6103
"""20250623-15690…",4813
"""20250623-15690…",490
"""20250623-d67b7…",5756
"""20250623-e4dd9…",490


In [38]:
df_pax_result = df_pax_agg.join(id_table, on = ["fahrtid", "ereignis_station_id"], how = "left")

In [39]:
df_pax_result.head()

fahrtid,ereignis_station_id,pax_counter_id,data_pax_before,data_pax_after
str,i64,str,i64,i64
"""20250623-55593…",2468,"""083af23ff6e7""",4,1
"""20250623-bb30d…",2468,"""083af23ff6e7""",11,6
"""20250623-2e13c…",2468,"""083af23ff6e7""",35,0
"""20250623-f7b5b…",2468,"""083af23ff6e7""",17,7
"""20250623-ea38e…",2468,"""083af23ff6e7""",32,9


### something isn't quite right with the data it seems - if i merge the other way around, null entries exist but those sensors should work

In [40]:
result = id_table.join(df_pax_agg, 
    on = ["fahrtid", "ereignis_station_id"],
    how = "outer")

result

fahrtid,ereignis_station_id,pax_counter_id,data_pax_before,data_pax_after
str,i64,str,i64,i64
"""20250623-55593…",2468,"""083af23ff6e7""",4,1
"""20250623-bb30d…",2468,"""083af23ff6e7""",11,6
"""20250623-2e13c…",2468,"""083af23ff6e7""",35,0
"""20250623-f7b5b…",2468,"""083af23ff6e7""",17,7
"""20250623-ea38e…",2468,"""083af23ff6e7""",32,9
"""20250623-75665…",5758,"""244cab006177""",3,0
"""20250623-7b9e1…",5758,"""244cab006177""",9,0
"""20250623-0fdcf…",5758,"""244cab006177""",2,0
"""20250623-09193…",5758,"""244cab006177""",1,0
"""20250623-403b3…",5758,"""244cab006177""",0,0


In [41]:
result_nulls = result.filter(pl.col("pax_counter_id").is_null())
result_nulls.head()

fahrtid,ereignis_station_id,pax_counter_id,data_pax_before,data_pax_after
str,i64,str,i64,i64
"""20250623-3868e…",2730,,,
"""20250623-9bd12…",4866,,,
"""20250623-5da9b…",3559,,,
"""20250623-c674d…",1263,,,
"""20250623-a7e70…",3716,,,


In [42]:
ris.filter(pl.col("ereignis_station_id") == 7983,
           pl.col("fahrtid") == '20250515-51adc984-0a84-3e60-a391-665c5d3c6136')

fahrtid,ereignis_station_id,zeit_echt,zeit_echt_verspaetung,gleis_echt,tpname_bahnsteig_echt,gattung,case
str,i64,str,i64,str,str,str,str
"""20250515-51adc…",7983,"""2025-05-15 09:…",1,"""07983-01-B01-G…","""Bahnsteig 01""","""RB""","""abfahrt"""


In [43]:
pl.read_parquet(pth_pax).filter(pl.col("station_id") == 7983).\
filter(pl.col("time_iot") > "2025-05-15 09:06:20").\
filter(pl.col("time_iot") < "2025-05-15 09:20:20")

pax_counter_id,time_iot,data_pax,station_id,station_name,tpname,station_longitude,station_latitude
str,str,i64,i64,str,str,f64,f64
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436
"""3494545a2a77""","""2025-05-15 09:…",0,7983,"""Tessin West""","""Bahnsteig 01""",12.44257,54.034436


In [44]:
pl.Config(fmt_str_lengths=100)

<polars.config.Config at 0x22e8659e410>

### rolling mean

In [45]:
df_pax.select(pl.col("pax_counter_id").unique()).to_series().to_list()

['244cab0712df',
 'd48afc8efd5b',
 'c45bbe932723',
 '244cab065dbb',
 'a0a3b38c5463',
 '244cab0468b7',
 'e831cdc25847',
 '244cab01fe3f',
 '244cab067843',
 'e831cdc2581f',
 '244cab03803f',
 '244cab068dcb',
 '244cab02fcff',
 '244cab03ce33',
 'a0a3b3311b9f',
 'e831cdc26497',
 '244cab03c4df',
 '244cab0237a7',
 '3494545a2a1f',
 '1097bdd7722b',
 'a0a3b32f794b',
 '244cab020efb',
 '34ab9540699b',
 'e831cdc25877',
 'a0a3b38c538b',
 '244cab039cf3',
 '3494545a213b',
 'a0a3b38c5473',
 '3494545a2107',
 'a0a3b38c58c7',
 'a0a3b32f9e87',
 '244cab034443',
 '3494545a2a77',
 '244cab06fb47',
 '244cab02425b',
 '244cab039c97',
 '244cab02ea4f',
 '244cab03544b',
 'd48afc8ec603',
 '244cab06abe7',
 '244cab03b9ff',
 '34ab954069ab',
 '244cab06b913',
 '244cab06f80b',
 'd48afc8ec50f',
 'a0a3b32fac63',
 'e831cdc2597b',
 'a0a3b32f82e7',
 '244cab01f2b7',
 '083af23fd0df',
 '244cab05572b',
 '3494545a2c67',
 '244cab00fa6b',
 'e831cdc25833',
 'e831cdc2595b',
 '244cab06275b',
 '244cab05d187',
 '244cab0161bb',
 '244cab046a7b

In [46]:
df_pax.group_by(["pax_counter_id"]).\
    agg(pl.col("data_pax").rolling_mean(window_size=10)).\
    explode(pl.col("data_pax"))

pax_counter_id,data_pax
str,f64
"""244cab03c4df""",
"""244cab03c4df""",
"""244cab03c4df""",
"""244cab03c4df""",
"""244cab03c4df""",
"""244cab03c4df""",
"""244cab03c4df""",
"""244cab03c4df""",
"""244cab03c4df""",
"""244cab03c4df""",0.0
