In [11]:
import pandas as pd
import re

from datetime import datetime
import os
import deltalake
from deltalake import DeltaTable
from deltalake.writer import write_deltalake
import duckdb
current_folder_path = os.path.abspath('.')

In [12]:
current_folder_path+ '\\data\\as_of\\'

'/Users/amuhammadzuharimi/personal_projects/data-gov-my-railway\\data\\as_of\\'

In [13]:
def load_railway() -> pd.DataFrame:
    filters = [("frequency", "==", "daily")
            ]
    df = pd.read_parquet('https://storage.data.gov.my/dashboards/prasarana_timeseries.parquet', filters=filters)
    
    df['date'] = pd.to_datetime(df['date'], format="%Y-%B-%d")
    df['year'] = df['date'].dt.year
    df['month'] = df['date'].dt.month
    df['day'] = df['date'].dt.day
    df['day_name'] = df['date'].dt.day_name()

    df['station_code_origin'] = [re.search(r'(\w+):', x).group(1) for x in df['origin']]
    df['station_code_destination'] = [re.search(r'(\w+):', x).group(1) for x in df['destination']]
    df['station_name_origin'] = [re.search(r':\s*([\w\s]+)', x).group(1) + ' (origin)' for x in df['origin']]
    df['station_name_destination'] = [re.search(r':\s*([\w\s]+)', x).group(1) + ' (destination)' for x in df['destination']]

    df['line_origin'] = [re.search(r'(\D+)\d+',x).group(1) for x in df['station_code_origin']]
    df['line_destination'] = [re.search(r'(\D+)\d+',x).group(1) for x in df['station_code_destination']]

    df = df.loc[(df.line_origin != 'A') & (df.line_destination != 'A')].reset_index(drop=True)

    line_mapper_dict = {'A':'All Station', 
                        'AG':'LRT Ampang Line', 
                        'BRT':'Bus Rapid Transit', 
                        'KG':'MRT Kajang Line', 
                        'KJ':'LRT Kajang Line', 
                        'MR':'Monorail', 
                        'PYL':'MRT Putrajaya Line', 
                        'SP':'LRT Sri Petaling Line'}

    df['line_name_origin'] = df['line_origin'].map(line_mapper_dict)
    df['line_name_destination'] = df['line_destination'].map(line_mapper_dict)

    str_cols = [x for x in df.columns if x not in ('date', 'passengers')]
    df[str_cols]= df[str_cols].astype('string')
    print(f"Data contains {df.shape[0]} rows")
    return df

In [14]:
df = load_railway()
# write_deltalake(current_folder_path + '\\data\\ridership\\', df, mode='overwrite')
df.to_parquet('./data/railway/ridership/passengers_daily.parquet')


Data contains 1038597 rows


In [15]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1038597 entries, 0 to 1038596
Data columns (total 18 columns):
 #   Column                    Non-Null Count    Dtype         
---  ------                    --------------    -----         
 0   service                   1038597 non-null  string        
 1   frequency                 1038597 non-null  string        
 2   origin                    1038597 non-null  string        
 3   destination               1038597 non-null  string        
 4   date                      1038597 non-null  datetime64[ns]
 5   passengers                1038597 non-null  int64         
 6   year                      1038597 non-null  string        
 7   month                     1038597 non-null  string        
 8   day                       1038597 non-null  string        
 9   day_name                  1038597 non-null  string        
 10  station_code_origin       1038597 non-null  string        
 11  station_code_destination  1038597 non-null  string

In [16]:
def store_load_date(df:pd.DataFrame):
    data_as_of = df.date.max().date().isoformat()
    df_data_as_of = pd.DataFrame({'load_date':[data_as_of]})
    print('Data as of:',data_as_of)
    try: 
        pd.read_parquet("./data/as_of/").load_date.max()
        if pd.read_parquet("./data/as_of/").load_date.max() <= data_as_of:
            pass
        else:
            write_deltalake(current_folder_path + '\\data\\as_of\\', df_data_as_of, mode='append')
    except:
        write_deltalake(current_folder_path + '\\data\\as_of\\', df_data_as_of, mode='append')

store_load_date(df)

Data as of: 2024-07-27


In [17]:
df.sample(10)

Unnamed: 0,service,frequency,origin,destination,date,passengers,year,month,day,day_name,station_code_origin,station_code_destination,station_name_origin,station_name_destination,line_origin,line_destination,line_name_origin,line_name_destination
527195,rail,daily,KJ25: Lembah Subang,BRT06: South Quay-USJ 1,2024-06-03,1,2024,6,3,Monday,KJ25,BRT06,Lembah Subang (origin),South Quay (destination),KJ,BRT,LRT Kajang Line,Bus Rapid Transit
208324,rail,daily,KG10: TTDI,KJ14: Pasar Seni,2024-07-17,152,2024,7,17,Wednesday,KG10,KJ14,TTDI (origin),Pasar Seni (destination),KG,KJ,MRT Kajang Line,LRT Kajang Line
542883,rail,daily,KJ27: Cgc Glenmarie,KJ08: Damai,2024-06-16,2,2024,6,16,Sunday,KJ27,KJ08,Cgc Glenmarie (origin),Damai (destination),KJ,KJ,LRT Kajang Line,LRT Kajang Line
163507,rail,daily,KG04: Kwasa Damansara,AG01: Sentul Timur,2024-07-02,6,2024,7,2,Tuesday,KG04,AG01,Kwasa Damansara (origin),Sentul Timur (destination),KG,AG,MRT Kajang Line,LRT Ampang Line
174564,rail,daily,KG05: Kwasa Sentral,KJ22: Taman Paramount,2024-07-01,3,2024,7,1,Monday,KG05,KJ22,Kwasa Sentral (origin),Taman Paramount (destination),KG,KJ,MRT Kajang Line,LRT Kajang Line
641879,rail,daily,MR06: Bukit Bintang,AG03: Titiwangsa,2024-06-03,47,2024,6,3,Monday,MR06,AG03,Bukit Bintang (origin),Titiwangsa (destination),MR,AG,Monorail,LRT Ampang Line
941160,rail,daily,SP15: Bandar Tasik Selatan,MR07: Raja Chulan,2024-07-04,58,2024,7,4,Thursday,SP15,MR07,Bandar Tasik Selatan (origin),Raja Chulan (destination),SP,MR,LRT Sri Petaling Line,Monorail
484909,rail,daily,KJ18: Kerinchi,SP31: Putra Heights,2024-06-11,114,2024,6,11,Tuesday,KJ18,SP31,Kerinchi (origin),Putra Heights (destination),KJ,SP,LRT Kajang Line,LRT Sri Petaling Line
1011973,rail,daily,SP27: Bandar Puteri,AG18: Ampang,2024-07-23,16,2024,7,23,Tuesday,SP27,AG18,Bandar Puteri (origin),Ampang (destination),SP,AG,LRT Sri Petaling Line,LRT Ampang Line
759933,rail,daily,PYL14: Kentomen,KG16: Pasar Seni,2024-06-10,7,2024,6,10,Monday,PYL14,KG16,Kentomen (origin),Pasar Seni (destination),PYL,KG,MRT Putrajaya Line,MRT Kajang Line


## Grouping

In [18]:

df_grouped = df.groupby(['origin',
            'line_origin',
            'line_name_origin',
            'date',
            'year',
            'month',
            'day',
            'day_name',]).agg({'passengers':'sum'}).reset_index()

In [19]:
df_grouped.sample(10)

Unnamed: 0,origin,line_origin,line_name_origin,date,year,month,day,day_name,passengers
2614,KG27: Taman Suntex,KG,MRT Kajang Line,2024-07-20,2024,7,20,Saturday,2866
4114,KJ20: Taman Jaya,KJ,LRT Kajang Line,2024-06-11,2024,6,11,Tuesday,4823
3359,KJ06: Jelatek,KJ,LRT Kajang Line,2024-07-24,2024,7,24,Wednesday,3737
4602,KJ28: Subang Jaya,KJ,LRT Kajang Line,2024-07-13,2024,7,13,Saturday,2860
5623,MR10: Chow Kit,MR,Monorail,2024-07-08,2024,7,8,Monday,4094
5794,PYL04: Sungai Buloh,PYL,MRT Putrajaya Line,2024-07-08,2024,7,8,Monday,2206
6728,PYL21: Persiaran KLCC,PYL,MRT Putrajaya Line,2024-06-03,2024,6,3,Monday,765
3449,KJ08: Damai,KJ,LRT Kajang Line,2024-06-30,2024,6,30,Sunday,1313
6864,PYL24: Chan Sow Lin,PYL,MRT Putrajaya Line,2024-06-25,2024,6,25,Tuesday,1048
2519,KG26: Taman Connaught,KG,MRT Kajang Line,2024-06-12,2024,6,12,Wednesday,5874


In [21]:
df_grouped.to_parquet('./data/railway/ridership/daily/passengers_by_station.parquet', )

# Test

df

In [22]:
fname = [x.date().isoformat() for x in df_grouped['date'].unique()]
fname

['2024-06-01',
 '2024-06-02',
 '2024-06-03',
 '2024-06-04',
 '2024-06-05',
 '2024-06-06',
 '2024-06-07',
 '2024-06-08',
 '2024-06-09',
 '2024-06-10',
 '2024-06-11',
 '2024-06-12',
 '2024-06-13',
 '2024-06-14',
 '2024-06-15',
 '2024-06-16',
 '2024-06-17',
 '2024-06-18',
 '2024-06-19',
 '2024-06-20',
 '2024-06-21',
 '2024-06-22',
 '2024-06-23',
 '2024-06-24',
 '2024-06-25',
 '2024-06-26',
 '2024-06-27',
 '2024-06-28',
 '2024-06-29',
 '2024-06-30',
 '2024-07-01',
 '2024-07-02',
 '2024-07-03',
 '2024-07-04',
 '2024-07-05',
 '2024-07-06',
 '2024-07-07',
 '2024-07-08',
 '2024-07-09',
 '2024-07-10',
 '2024-07-11',
 '2024-07-12',
 '2024-07-13',
 '2024-07-14',
 '2024-07-15',
 '2024-07-16',
 '2024-07-17',
 '2024-07-18',
 '2024-07-19',
 '2024-07-20',
 '2024-07-21',
 '2024-07-22',
 '2024-07-23',
 '2024-07-24',
 '2024-07-25',
 '2024-07-26',
 '2024-07-27']

In [23]:
df_grouped['date'] = df_grouped['date'].astype('str')

In [24]:
df_grouped.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8664 entries, 0 to 8663
Data columns (total 9 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   origin            8664 non-null   string
 1   line_origin       8664 non-null   string
 2   line_name_origin  8664 non-null   string
 3   date              8664 non-null   object
 4   year              8664 non-null   string
 5   month             8664 non-null   string
 6   day               8664 non-null   string
 7   day_name          8664 non-null   string
 8   passengers        8664 non-null   int64 
dtypes: int64(1), object(1), string(7)
memory usage: 609.3+ KB


In [27]:
for t in fname:
    temp_ = df_grouped.loc[df_grouped['date'] == t].reset_index(drop=True)
    file_name = f'data/railway/daily_agg/ridership_{t}.parquet'
    temp_.to_parquet(file_name)