This repository has been archived by the owner on Jan 17, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
gtfs.clj
312 lines (270 loc) · 11.6 KB
/
gtfs.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
(ns hiposfer.kamal.router.io.gtfs
"namespace for handling GTFS to Datascript convertion and vice versa.
The algorithms coerces GTFS csv files into entities that reference
each other through IDs. The IDs are taken directly from GTFS.
A small dynamic renaming is performed to make it more compatible
with Datascript design. The renaming is based on the GTFS naming
convention, for example: stop_lon, stop_url, trip_id, trip_headsign.
We transform those strings into: :stop/lon, :stop/url, :trip/id,
:trip/headsign, respectively.
This is important to be able to create relationships between entities through
transactions, for example {:trip/id 1, :trip/route [:route/id 1]}. This
allows us to create relationships between entities without using java references
... just data"
(:require [hiposfer.gtfs.edn :as reference]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[clojure.edn :as edn]
[datascript.impl.entity :as dimp]
[datascript.core :as data])
(:import (java.util.zip ZipInputStream ZipEntry ZipOutputStream)
(java.time Duration LocalDate ZoneId)
(java.time.format DateTimeFormatter)
(java.util List)))
;; detailed diagram of files relations
;; http://tommaps.com/wp-content/uploads/2016/09/gtfs-feed-diagram.png
;; Route Planning in Transportation Networks
;; Overview of the different algorithms and models used
;; - https://arxiv.org/pdf/1504.05140.pdf
;; A good introduction to time-dependent model for timetable information
;; - https://www.ceid.upatras.gr/webpages/faculty/zaro/pub/jou/J23-TTI-Springer.pdf
(def re-number #"-?\d+(\.\d+)?")
(def re-duration #"(\d{2}):(\d{2}):(\d{2})")
;; we restrict this to avoid collisions with normal integers of length 8
(def re-date #"20\d{2}[0-1]\d[0-3]\d")
(def re-timezone #"\w+/\w+")
(defn- duration
"parse the arrival and departure time into Duration instances. This is
due to GTFS allowing times greater than 23:59:59 which is the biggest
Java Local Time. A Trip real arrival time can then be calculated as the
start time + duration at each stop"
[text]
(let [[_ & values] (re-matches re-duration text)
[hh mm ss] (map #(Long/parseLong %) values)
[hh mm ss] [(Duration/ofHours hh)
(Duration/ofMinutes mm)
(Duration/ofSeconds ss)]]
(.getSeconds (reduce (fn [^Duration res v] (. res (plus v)))
hh
[mm ss]))))
(def date-format (DateTimeFormatter/ofPattern "uuuuMMdd"))
(defn- local-date [text] (LocalDate/parse text date-format))
(defn- number
"this is to avoid using edn/read-string which fails when encountering
a number like 008"
[text]
(try
(Long/parseLong text)
(catch Exception e
(Double/parseDouble text))))
(defn- timezone
[text]
(try
(ZoneId/of text)
(catch Exception _ text))) ;; failure, not a timezone, return text
(defn decode
[text]
(cond ;; date before number due to the overlapping regex :(
(re-matches re-date text) (local-date text)
(re-matches re-number text) (number text)
(re-matches re-duration text) (duration text)
(re-matches re-timezone text) (timezone text)
:else text)) ;; simple string
;(coerce "-0.2")
;(coerce "212")
;(coerce "25:30:02")
;(coerce "20180802")
;(coerce "65123245")
;(coerce "Europe/Berlin")
(def truncators
"map of GTFS filenames to post-processing functions. Useful to remove
unnecessary information from the GTFS feed; just to reduce the amount
of datoms in datascript"
{"routes.txt" #(dissoc % :route/url)
"trips.txt" #(dissoc % :trip/shape)})
(defn ref?
"a reference is a field that links to a unique field in another file
and that is not that field itself"
[field]
(and (contains? (set (map :field-name reference/identifiers))
(:field-name field))
(not (:unique field))))
(defn parse
"takes a filename and parses its content if supported by this parser.
Entries that do not conform to the gtfs spec are removed. Returns
a vector of conformed entries"
[^ZipInputStream zipstream filename]
(let [file (io/reader zipstream)
content (csv/read-csv file)
head (map #(reference/get-mapping filename %) (first content))
fields (into {} (map (juxt :keyword identity) head))]
(for [row (rest content)]
(into {}
(for [[k v] (map vector (map :keyword head) row)
:when (not-empty v)]
(if (ref? (get fields k))
[k [(keyword (name k) "id") (decode v)]]
[k (decode v)]))))))
;(with-open [f (io/reader "resources/frankfurt.gtfs/trips.txt")]
; (into [] (take 10 (parse (csv/read-csv f) "trips.txt"))))
;; these are also the only supported files for the time being
;; we dont read anything that is not here yet
(def read-order ["agency.txt" "calendar.txt" "routes.txt"
"trips.txt" "stops.txt" "stop_times.txt"
"frequencies.txt"])
(defn- transaction*
"returns a sequence of [filename transaction].
WARNING: The order of the sequence is not guarantee to be consistent
for a Datascript transaction"
[^ZipInputStream zipstream]
(for [^ZipEntry entry (repeatedly #(. zipstream (getNextEntry)))
:while (some? entry)
:when (contains? (set read-order) (. entry (getName)))]
(let [filename (. entry (getName))
truncate (get truncators filename identity)]
;; NOTE: we are forced to use into [] because otherwise the stream
;; would close before we get to read all values
[filename (into [] (map truncate) (parse zipstream filename))])))
;; NOTE: the commented lines below allow a lazy sorting but are much
;; more complicated and since the processing time is completely dominated
;; by stop_times.txt file, it is almost irrelevant to make this lazy
;; maybe in the future it could be different
(defn transaction!
"returns a Datascript transaction of the complete gtfs feed"
[zipstream]
(mapcat second (sort-by #(. ^List read-order (indexOf (first %)))
(transaction* zipstream))))
;([files-content stack]
; (lazy-seq
; (when (and (not-empty stack) (not-empty files-content))
; (let [tx (some (fn [[file content]]
; (when (= file (first stack))
; content))
; files-content)]
; (concat tx (transaction! (remove (fn [[file]] (= file (first stack)))
; files-content)
; (rest stack))))))))
;(with-open [z (ZipInputStream. (io/input-stream "resources/frankfurt.gtfs.zip"))]
; (time (vec (drop 100 (transaction! z)))))
;; ............................................................................
;; just for convenience
(def uniques (into #{} (comp (filter :unique) (map :keyword)) reference/fields))
(def keywords (into #{} (map :keyword reference/fields)))
(defn resource
"takes a datascript entity and checks if any of its values are entities, if so
replaces them by their unique identity value"
[entity]
(into {} (remove nil?)
(for [[k v] entity]
(cond
(not (contains? keywords k))
nil
(dimp/entity? v)
[k (select-keys v [(some (set (keys v)) uniques)])]
(set? v)
[k (map #(select-keys % [(some (set (keys %)) uniques)]) v)]
(contains? keywords k) [k v]))))
;; ............................................................................
(defn encode
"encodes the value of a gtfs field back to the format specified in the
reference spec"
[k v]
(case k
(:stop_time/arrival_time :stop_time/departure_time
:frequency/start_time :frequency/end_time)
(let [duration (Duration/ofSeconds v)]
(format "%d:%02d:%02d"
(.toHours duration)
(mod (.toMinutes duration) 60)
(mod (.getSeconds duration) 60)))
(:service/start_date :service/end_date)
(. ^LocalDate v (format date-format))
;; default - no op
v))
(defn- reference-keyword
"returns the attribute id for a reference field.
For example: [trips.txt service_id] -> :service/id"
[field]
(reduce (fn [_ entry] (when (= (:field-name field) (:field-name entry))
(reduced (:keyword entry))))
nil
(filter :unique reference/fields)))
(def feed-hidrator
"A map of filename -> fn -> [entity ...]. Useful to fetch entities that do
not have a unique field to fetch from"
{"stop_times.txt"
(fn [network]
(for [trip (data/datoms network :aevt :trip/id)
:let [std (data/datoms network :avet :stop_time/trip (:e trip))
entries (map #(data/entity network (:e %)) std)]
stop_time (sort-by :stop_time/stop_sequence entries)]
stop_time))
"frequencies.txt"
(fn [network]
(for [trip (data/datoms network :aevt :trip/id)
:let [fd (data/datoms network :avet :frequency/trip (:e trip))
entries (map #(data/entity network (:e %)) fd)]
frequency (sort-by :frequency/start_time entries)]
frequency))})
;TODO
;calendar_dates.txt
;fare_rules.txt
;transfers.txt
;feed_info.txt
(defn- key-paths
"returns a lazy sequence of [k1 ..] (a pth) to fetch the values from
feed such that references are deconstructed up until its value"
[feed]
(for [field reference/fields
:when (= (:filename feed) (:filename field))]
(if (ref? field)
[(:keyword field) (reference-keyword field)]
[(:keyword field)])))
(defn- feed-entities
"returns a lazy sequence of all entities belonging to feed"
[network feed]
(let [filename (:filename feed)
field-id (first (filter :unique (:fields feed)))
id (reference/get-mapping filename (:field-name field-id))]
(cond
;; the feed contains a unique ID, use it to fetch all entities
(some? id)
(for [d (data/datoms network :avet (:keyword id))]
(data/entity network (:e d)))
;; if there is a feed hidrator for the feed use it to fetch entities
(some? (feed-hidrator (:filename feed)))
(apply (feed-hidrator (:filename feed)) [network]))))
;; nothing found - ignore feed)))
(defn- dump
"returns a lazy sequence of [filename content] for each feed of the gtfs
spec present in network"
[network]
(for [feed (:feeds reference/gtfs-spec)
:let [paths (key-paths feed)]
:when (not-empty (feed-entities network feed))]
[(:filename feed)
(cons (map :field-name (:fields feed));; header
(for [entity (feed-entities network feed)]
(for [path paths]
(if (= 1 (count path))
(encode (first path) (or (get-in entity path) ""))
;; references MUST NOT be encoded
(get-in entity path)))))]))
(defn dump!
"writes the complete GTFS information from network into outstream as a Zip file"
[network outstream]
(let [zipstream (ZipOutputStream. outstream)]
(with-open [writer (io/writer zipstream)]
(doseq [[filename content] (dump network)
:let [zip-entry (ZipEntry. ^String filename)]]
#_(println "writing" filename)
(.putNextEntry zipstream zip-entry)
(csv/write-csv writer content)
(.flush writer)))))
#_(time (dump! @(first @(:networks (:router hiposfer.kamal.dev/system)))
(io/output-stream "resources/test/foo.zip")))
#_(reference/get-mapping "agency.txt" (:field-name (first (filter :unique reference/fields))))
#_(reference-keyword (reference/get-mapping "trips.txt" "service_id"))
#_(reference/get-mapping "calendar.txt" "start_date")
;(seq (Locale/getAvailableLocales))
;(seq (Locale/getISOLanguages))