Skip to content

Commit

Permalink
Merge pull request #5 from jpountz/nyc_taxis
Browse files Browse the repository at this point in the history
Add a track for the NYC taxi rides dataset.
  • Loading branch information
jpountz committed Aug 16, 2016
2 parents a4b0ba5 + cddb1b7 commit 0802c11
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 0 deletions.
15 changes: 15 additions & 0 deletions nyc_taxis/README.txt
@@ -0,0 +1,15 @@
This dataset contains the rides that have been performed in yellow taxis in
New York in December 2015. It can be downloaded
from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml.

This has only de tested with the December 2015 dump, but this should work with
any dump of the yellow taxis, and should be easy to adapt to the green taxis.

Once downloaded, you can generate the mappings with:
python3 parse.py mappings

And the json documents can be generated with
python3 parse.py json file_name.csv > documents.json

Finally the json docs can be compressed with
bzip2 -k documents.json
88 changes: 88 additions & 0 deletions nyc_taxis/mappings.json
@@ -0,0 +1,88 @@
{
"type": {
"properties": {
"surcharge": {
"scaling_factor": 100,
"type": "scaled_float"
},
"dropoff_datetime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"trip_type": {
"type": "keyword"
},
"mta_tax": {
"scaling_factor": 100,
"type": "scaled_float"
},
"rate_code_id": {
"type": "keyword"
},
"passenger_count": {
"type": "integer"
},
"pickup_datetime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"tolls_amount": {
"scaling_factor": 100,
"type": "scaled_float"
},
"tip_amount": {
"scaling_factor": 100,
"type": "scaled_float"
},
"payment_type": {
"type": "keyword"
},
"extra": {
"scaling_factor": 100,
"type": "scaled_float"
},
"vendor_id": {
"type": "keyword"
},
"store_and_fwd_flag": {
"type": "keyword"
},
"improvement_surcharge": {
"scaling_factor": 100,
"type": "scaled_float"
},
"fare_amount": {
"scaling_factor": 100,
"type": "scaled_float"
},
"ehail_fee": {
"scaling_factor": 100,
"type": "scaled_float"
},
"cab_color": {
"type": "keyword"
},
"dropoff_location": {
"type": "geo_point"
},
"vendor_name": {
"type": "text"
},
"total_amount": {
"scaling_factor": 100,
"type": "scaled_float"
},
"trip_distance": {
"scaling_factor": 100,
"type": "scaled_float"
},
"pickup_location": {
"type": "geo_point"
}
},
"_all": {
"enabled": false
},
"dynamic": "strict"
}
}
100 changes: 100 additions & 0 deletions nyc_taxis/parse.py
@@ -0,0 +1,100 @@
import json
import csv
import sys
import re

types = {}
for f in ["vendor_id","cab_color","payment_type","trip_type","rate_code_id","store_and_fwd_flag"]:
types[f] = 'keyword'
for f in ["vendor_name"]:
types[f] = 'text'
for f in ["passenger_count"]:
types[f] = 'integer'
for f in ["pickup_location", "dropoff_location"]:
types[f] = 'geo_point'
for f in ["trip_distance", "fare_amount", "surcharge", "mta_tax", "extra", "ehail_fee", "improvement_surcharge", "tip_amount", "tolls_amount", "total_amount"]:
types[f] = 'scaled_float'
for f in ["pickup_datetime", "dropoff_datetime"]:
types[f] = 'date'

def write_mappings():
mappings = {}
for (k, v) in types.items():
mappings[k] = { "type": v }
if v == 'date':
mappings[k]['format'] = "yyyy-MM-dd HH:mm:ss"
elif v == 'scaled_float':
mappings[k]['scaling_factor'] = 100
mappings = { "properties": mappings }
mappings['_all'] = { "enabled": False }
mappings['dynamic'] = 'strict'
mappings = { "type": mappings }
print(json.dumps(mappings, indent=2))

def to_geo_point(d, f):
lat_field = f + "_latitude"
lon_field = f + "_longitude"
if lat_field in d and lon_field in d:
longitude = float(d[lon_field])
latitude = float(d[lat_field])
if longitude < -180 or longitude > 180 or latitude < -90 or latitude > 90:
raise Exception("Malformed coordinates")
d[f + '_location'] = [float(d[lon_field]), float(d[lat_field])]
del d[lon_field]
del d[lat_field]

def to_underscore(s):
s = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', s)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s).lower()

def to_json(f):
fields = []
for field in f.readline().strip().split(','):
field = to_underscore(field)
if field.startswith('tpep_') or field.startswith('lpep_'):
field = field[5:]
elif field == 'ratecode_id':
field = 'rate_code_id'
fields.append(field)
for line in f.readlines():
cols = line.strip().split(',')
if len(cols) < len(fields):
raise Exception("Cannot parse '%s': number of fields does not match '%s'" %(line, ",".join(fields)))

try:
d = {}
for i in range(len(fields)):
field = fields[i]
value = cols[i]
if value != '': # the way csv says the field does not exist
d[field] = value

to_geo_point(d, 'pickup')
to_geo_point(d, 'dropoff')

for (k, v) in d.items():
if k not in types:
raise Exception("Unknown field '%s'" %k)
t = types[k]
try:
if t == 'integer':
d[k] = int(v)
elif t == 'float':
d[k] = float(v)
except Exception as cause:
raise Exception("Cannot parse (%s,%s)" %(k, v)) from cause

print(json.dumps(d))
except KeyboardInterrupt:
break
except Exception as e:
print("Skipping malformed entry '%s' because of %s" %(line, str(e)), file=sys.stderr)

if sys.argv[1] == "json":
for file_name in sys.argv[2:]:
with open(file_name) as f:
to_json(f)
elif sys.argv[1] == "mappings":
write_mappings()
else:
raise Exception("Expected 'json' or 'mappings' but got %s" %sys.argv[1])
107 changes: 107 additions & 0 deletions nyc_taxis/track.json
@@ -0,0 +1,107 @@
{
"meta": {
"short-description": "Trip records completed in yellow and green taxis in New York in 2015",
"description": "This test indexes 165M taxi rides using 8 client threads and 10,000 docs per bulk request against Elasticsearch",
"data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/nyc_taxis"
},
"indices": [
{
"name": "nyc_taxis",
"types": [
{
"name": "type",
"mapping": "mappings.json",
"documents": "documents.json.bz2",
"document-count": 165346692,
"compressed-bytes": 4812721501,
"uncompressed-bytes": 79802445255
}
]
}
],
"operations": [
{
"name": "index",
"type": "index",
"index-settings": {
"index.number_of_shards": 1,
"index.codec": "best_compression",
"index.number_of_replicas": 0,
"index.refresh_interval": "30s",
"index.translog.flush_threshold_size": "4g"
},
"bulk-size": 10000,
"force-merge": false,
"clients": {
"count": 8
}
},
{
"name": "search",
"type": "search",
"target-throughput": 1,
"warmup-iterations": 100,
"iterations": 100,
"clients": {
"count": 1
},
"queries": [
{
"name": "default",
"body": {
"query": {
"match_all": {}
}
}
},
{
"name": "range",
"body": {
"query": {
"range": {
"total_amount": {
"gte": 5,
"lt": 15
}
}
}
}
},
{
"name": "distance_amount_agg",
"cache": false,
"body": {
"size": 0,
"aggs": {
"distance_histo": {
"histogram": {
"field": "distance",
"interval": 1
},
"aggs": {
"total_amount_stats": {
"stats": {
"field": "total_amount"
}
}
}
}
}
}
}
]
}
],
"challenges": [
{
"name": "append-no-conflicts",
"description": "index data and then run a couple search requests",
"schedule": [
"index",
"search"
]
}

]
}

0 comments on commit 0802c11

Please sign in to comment.