In [1]:
# Put these at the top of every notebook, to get automatic reloading and inline plotting
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [2]:
import pandas as pd
import partridge as ptg
import matplotlib.pyplot as plt
import seaborn as sns
import altair as alt
import datetime
import numpy as np

import gtfs_utils

alt.renderers.enable('notebook')
alt.data_transformers.enable('json')

sns.set_style("white")
sns.set_context("talk")
sns.set_palette('Set2', 10)

In [3]:
import gzip

def parse_trips(path):
    trip = []
    with gzip.open(path) as f:
        lines = f.read().strip().splitlines()
        data = [line.split(b',')[2:] for line in lines if line]
    f.close()
    for line in data:
        line = [element.decode("utf-8") for element in line]
        trip.append({"agency": line[0], "route_id": line[1], "line_num": line[2]
                        , "service_id": line[3], "start_time": line[4], "bus_id": line[5],
                     "end_time": line[6], "time_recorded": line[7],
                     "coordinates": (line[8], line[9])})

    return trip

In [4]:
from os.path import join

In [5]:
import re
import datetime

In [6]:
def single_timestr_to_seconds(x, *, inverse=False, mod24=False, only_mins=False):
    """
    Given an HH:MM:SS time string ``x``, return the number of seconds
    past midnight that it represents.
    In keeping with GTFS standards, the hours entry may be greater than
    23.
    If ``mod24``, then return the number of seconds modulo ``24*3600``.
    If ``inverse``, then do the inverse operation.
    In this case, if ``mod24`` also, then first take the number of
    seconds modulo ``24*3600``.
    """
    if not inverse:
        try:
            if not only_mins:
                hours, mins, seconds = x.split(":")
                result = int(hours) * 3600 + int(mins) * 60 + int(seconds)
            else:
                hours, mins = x.split(":")
            if mod24:
                result %= 24 * 3600
        except:
            result = np.nan
    else:
        try:
            seconds = int(x)
            if mod24:
                seconds %= 24 * 3600
            hours, remainder = divmod(seconds, 3600)
            mins, secs = divmod(remainder, 60)
            result = "{:02d}:{:02d}:{:02d}".format(hours, mins, secs)
        except:
            result = np.nan
    return result

In [7]:
def timestr_to_seconds(x, *, only_mins=False):
    try:
        hms = x.str.split(':', expand=True)
        if not only_mins:
            result = hms.iloc[:,0].astype(int) * 3600 + hms.iloc[:,1].astype(int) * 60 + hms.iloc[:,2].astype(int)
        else:
            result = hms.iloc[:,0].astype(int) * 3600 + hms.iloc[:,1].astype(int) * 60
    except:
        result = np.nan

    return result

In [8]:
def create_trip_df(path):
    df = pd.DataFrame(parse_trips(path))
    date = datetime.datetime.strptime(re.findall('siri_rt_data\\.([^\\.]+)\\.\\d+\\.log', path)[0], '%Y-%m-%d')
    df[['lat', 'lon']] = pd.DataFrame(df.coordinates.values.tolist()).astype(float)
    df = (df.drop('coordinates', axis=1)
          .assign(agency = lambda x: x.agency.astype(int))
          .assign(service_id = lambda x: x.service_id.astype(int))
          .assign(route_id = lambda x: x.route_id.astype(int))
          .assign(start_time = lambda x: timestr_to_seconds(x.start_time, only_mins=True))
          .assign(end_time = lambda x: timestr_to_seconds(x.end_time, only_mins=True))
          .assign(time_recorded = lambda x: timestr_to_seconds(x.time_recorded))
          .assign(date = date)
         )
    return df

In [27]:
FOLDER = 'data\\siri\\2018-10'
file = 'siri_rt_data.2018-10-08.11.log.gz'

In [28]:
df = create_trip_df(join(FOLDER, file))

In [29]:
df.head()

Unnamed: 0,agency,bus_id,end_time,line_num,route_id,service_id,start_time,time_recorded,lat,lon,date
0,31,9164801,75660,4,1684,35140039,73200,74116,31.661625,34.583431,2018-10-08
1,31,4349034,78600,4,1684,35140040,76200,69156,0.0,0.0,2018-10-08
2,31,4355134,82380,4,1684,35140041,80400,72726,0.0,0.0,2018-10-08
3,14,270B,74460,1,12128,30026850,73200,74107,33.068855,35.14418,2018-10-08
4,14,271B,76620,1,12128,30026855,75600,31417,0.0,0.0,2018-10-08


In [25]:
df.dtypes

agency                    int32
bus_id                   object
end_time                  int32
line_num                 object
route_id                  int32
service_id                int32
start_time                int32
time_recorded             int32
lat                     float64
lon                     float64
date             datetime64[ns]
dtype: object

In [30]:
df.to_parquet(join(FOLDER, 'siri_rt_data.2018-10-08.11.FIXED.parq'))

In [31]:
pdf = pd.read_parquet(join(FOLDER, 'siri_rt_data.2018-10-08.11.FIXED.parq'))

In [32]:
pdf.head()

Unnamed: 0,agency,bus_id,end_time,line_num,route_id,service_id,start_time,time_recorded,lat,lon,date
0,31,9164801,75660,4,1684,35140039,73200,74116,31.661625,34.583431,2018-10-08
1,31,4349034,78600,4,1684,35140040,76200,69156,0.0,0.0,2018-10-08
2,31,4355134,82380,4,1684,35140041,80400,72726,0.0,0.0,2018-10-08
3,14,270B,74460,1,12128,30026850,73200,74107,33.068855,35.14418,2018-10-08
4,14,271B,76620,1,12128,30026855,75600,31417,0.0,0.0,2018-10-08


In [22]:
pdf.dtypes

agency                    int32
bus_id                   object
end_time                  int32
line_num                 object
route_id                  int32
service_id                int32
start_time                int32
time_recorded             int32
lat                     float64
lon                     float64
date             datetime64[ns]
dtype: object

In [17]:
from glob import glob
import os


[0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

In [53]:
x = pd.read_csv(tf, usecols=list(range(2, 12)), header=None, error_bad_lines=False)
x.head()

  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0,2,3,4,5,6,7,8,9,10,11
0,30,11688,1,33565052,06:00,65,06:52,06:25:00,32.79864501953125,35.018318
1,3,3701,25,1308393,08:45,7770269,08:59,04:00:28,0.0,0.0
2,14,6486,81,10416660,06:55,128A,07:05,05:41:36,0.0,0.0
3,7,1424,29,31447402,06:55,637D,07:38,05:31:25,0.0,0.0
4,8,8300,27,29000336,06:55,7262552,07:24,04:01:25,0.0,0.0


In [61]:
tf = r"C:\dev\ds\open-bus-explore\data\siri\2018-11\siri_rt_data.2018-11-03.1.log.gz"
def create_trip_df(path):
    header = ["timestamp", "desc", "agency", 
              "route_id", "line_num", "service_id", 
              "start_time", "bus_id", "end_time", 
              "time_recorded", "lat", "lon"]
    date = datetime.datetime.strptime(re.findall('siri_rt_data\\.([^\\.]+)\\.\\d+\\.log', path)[0], '%Y-%m-%d')
    df = pd.read_csv(path, header=None, error_bad_lines=False)
    df.columns = header
    df = df.drop(['timestamp', 'desc'], axis=1)
    return (df
              .assign(agency = lambda x: x.agency.astype(int))
              .assign(service_id = lambda x: x.service_id.astype(int))
              .assign(route_id = lambda x: x.route_id.astype(int))
              .assign(lat = lambda x: x.lat.astype(float))
              .assign(lon = lambda x: x.lon.astype(float))
              .assign(start_time = lambda x: timestr_to_seconds(x.start_time, only_mins=True))
              .assign(end_time = lambda x: timestr_to_seconds(x.end_time, only_mins=True))
              .assign(time_recorded = lambda x: timestr_to_seconds(x.time_recorded))
              .assign(date = date))
    
t = create_trip_df(tf)
t.head()

Unnamed: 0,agency,route_id,line_num,service_id,start_time,bus_id,end_time,time_recorded,lat,lon,date
0,3,10298,547,35593515,82800,7766969,85980,83712,31.648306,34.678215,2018-11-03
1,25,19308,485,24910479,82800,1536288,85920,83705,31.801535,35.128819,2018-11-03
2,25,19308,485,24910478,0,1536488,3300,82378,0.0,0.0,2018-11-03
3,18,10570,225,35537292,80400,7323552,83760,83704,31.696199,35.112934,2018-11-03
4,18,10570,225,35537293,82800,5591487,86280,83715,31.773964,35.182816,2018-11-03


In [62]:
FOLDER = 'data\\siri\\2018-11'
for file in glob(FOLDER+'/*.log.gz'):
    print(file)
    df = create_trip_df(file)
    bn = os.path.splitext(file)[0]
    df.to_parquet(bn + '.parq')
    os.remove(file)

data\siri\2018-11\siri_rt_data.2018-11-03.1.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.0.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.1.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.10.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.11.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.2.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.3.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.4.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.5.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.6.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.7.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.8.log.gz
data\siri\2018-11\siri_rt_data.2018-11-04.9.log.gz
data\siri\2018-11\siri_rt_data.2018-11-05.0.log.gz
data\siri\2018-11\siri_rt_data.2018-11-05.1.log.gz
data\siri\2018-11\siri_rt_data.2018-11-05.10.log.gz
data\siri\2018-11\siri_rt_data.2018-11-05.11.log.gz
data\siri\2018-11\siri_rt_data.2018-11-05.2.log.gz
data\siri\2018-11\siri_rt_data.2018-11-05.3.log.gz
data\siri\2018-11\siri_rt_d

In [9]:
import sys
sys.path.append('C:/dev/ds/open-bus-explore/open-bus/gtfs/retriever')

In [10]:
import s3_wrapper

aki = 'P6OMDOFWYCQNTWE7XEPR'
sak = 'glx9UFBOBNQCtYqSWIUW5OKWyhn9CedVb5tn7La5u6I'
bucket = 'obus-do1'

In [12]:
crud = s3_wrapper.S3Crud(aki, sak, bucket)
s3_wrapper.list_content(crud, regex_argument='(.*\.2018-11-0[2-9]\..*)')

['siri_logs/2018/11/02/siri_rt_data.2018-11-02.0.log.gz',
 'siri_logs/2018/11/03/siri_rt_data.2018-11-03.0.log.gz',
 'siri_logs/2018/11/03/siri_rt_data.2018-11-03.1.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.0.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.1.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.10.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.11.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.2.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.3.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.4.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.5.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.6.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.7.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.8.log.gz',
 'siri_logs/2018/11/04/siri_rt_data.2018-11-04.9.log.gz',
 'siri_logs/2018/11/05/siri_rt_data.2018-11-05.0.log.gz',
 'siri_logs/2018/11/05/siri_rt_data.2018-11-05.1.log.gz',
 'siri_logs/

In [16]:
FOLDER = 'data\\siri\\2018-11'
#os.mkdir(FOLDER)
for k in s3_wrapper.list_content(crud, regex_argument='(.*\.2018-11-0[2-9]\..*)'):
    file_name = k.split('/')[-1]
    output_path = os.path.join(FOLDER, file_name)
    if not os.path.exists(output_path):
        print (f'Downloading {file_name}')
        s3_wrapper.download(crud, output_path, k)
    

Downloading siri_rt_data.2018-11-02.0.log.gz
Downloading siri_rt_data.2018-11-03.0.log.gz
Downloading siri_rt_data.2018-11-03.1.log.gz
Downloading siri_rt_data.2018-11-04.0.log.gz
Downloading siri_rt_data.2018-11-04.1.log.gz
Downloading siri_rt_data.2018-11-04.10.log.gz
Downloading siri_rt_data.2018-11-04.11.log.gz
Downloading siri_rt_data.2018-11-04.2.log.gz
Downloading siri_rt_data.2018-11-04.3.log.gz
Downloading siri_rt_data.2018-11-04.4.log.gz
Downloading siri_rt_data.2018-11-04.5.log.gz
Downloading siri_rt_data.2018-11-04.6.log.gz
Downloading siri_rt_data.2018-11-04.7.log.gz
Downloading siri_rt_data.2018-11-04.8.log.gz
Downloading siri_rt_data.2018-11-04.9.log.gz
Downloading siri_rt_data.2018-11-05.0.log.gz
Downloading siri_rt_data.2018-11-05.1.log.gz
Downloading siri_rt_data.2018-11-05.10.log.gz
Downloading siri_rt_data.2018-11-05.11.log.gz
Downloading siri_rt_data.2018-11-05.2.log.gz
Downloading siri_rt_data.2018-11-05.3.log.gz
Downloading siri_rt_data.2018-11-05.4.log.gz
Downlo