Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a track for the NYC taxi rides dataset. #5

Merged
merged 1 commit into from Aug 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions nyc_taxis/README.txt
@@ -0,0 +1,15 @@
This dataset contains the rides that have been performed in yellow taxis in
New York in December 2015. It can be downloaded
from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml.

This has only de tested with the December 2015 dump, but this should work with
any dump of the yellow taxis, and should be easy to adapt to the green taxis.

Once downloaded, you can generate the mappings with:
python3 parse.py mappings

And the json documents can be generated with
python3 parse.py json file_name.csv > documents.json

Finally the json docs can be compressed with
bzip2 -k documents.json
88 changes: 88 additions & 0 deletions nyc_taxis/mappings.json
@@ -0,0 +1,88 @@
{
"type": {
"properties": {
"surcharge": {
"scaling_factor": 100,
"type": "scaled_float"
},
"dropoff_datetime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"trip_type": {
"type": "keyword"
},
"mta_tax": {
"scaling_factor": 100,
"type": "scaled_float"
},
"rate_code_id": {
"type": "keyword"
},
"passenger_count": {
"type": "integer"
},
"pickup_datetime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"tolls_amount": {
"scaling_factor": 100,
"type": "scaled_float"
},
"tip_amount": {
"scaling_factor": 100,
"type": "scaled_float"
},
"payment_type": {
"type": "keyword"
},
"extra": {
"scaling_factor": 100,
"type": "scaled_float"
},
"vendor_id": {
"type": "keyword"
},
"store_and_fwd_flag": {
"type": "keyword"
},
"improvement_surcharge": {
"scaling_factor": 100,
"type": "scaled_float"
},
"fare_amount": {
"scaling_factor": 100,
"type": "scaled_float"
},
"ehail_fee": {
"scaling_factor": 100,
"type": "scaled_float"
},
"cab_color": {
"type": "keyword"
},
"dropoff_location": {
"type": "geo_point"
},
"vendor_name": {
"type": "text"
},
"total_amount": {
"scaling_factor": 100,
"type": "scaled_float"
},
"trip_distance": {
"scaling_factor": 100,
"type": "scaled_float"
},
"pickup_location": {
"type": "geo_point"
}
},
"_all": {
"enabled": false
},
"dynamic": "strict"
}
}
100 changes: 100 additions & 0 deletions nyc_taxis/parse.py
@@ -0,0 +1,100 @@
import json
import csv
import sys
import re

types = {}
for f in ["vendor_id","cab_color","payment_type","trip_type","rate_code_id","store_and_fwd_flag"]:
types[f] = 'keyword'
for f in ["vendor_name"]:
types[f] = 'text'
for f in ["passenger_count"]:
types[f] = 'integer'
for f in ["pickup_location", "dropoff_location"]:
types[f] = 'geo_point'
for f in ["trip_distance", "fare_amount", "surcharge", "mta_tax", "extra", "ehail_fee", "improvement_surcharge", "tip_amount", "tolls_amount", "total_amount"]:
types[f] = 'scaled_float'
for f in ["pickup_datetime", "dropoff_datetime"]:
types[f] = 'date'

def write_mappings():
mappings = {}
for (k, v) in types.items():
mappings[k] = { "type": v }
if v == 'date':
mappings[k]['format'] = "yyyy-MM-dd HH:mm:ss"
elif v == 'scaled_float':
mappings[k]['scaling_factor'] = 100
mappings = { "properties": mappings }
mappings['_all'] = { "enabled": False }
mappings['dynamic'] = 'strict'
mappings = { "type": mappings }
print(json.dumps(mappings, indent=2))

def to_geo_point(d, f):
lat_field = f + "_latitude"
lon_field = f + "_longitude"
if lat_field in d and lon_field in d:
longitude = float(d[lon_field])
latitude = float(d[lat_field])
if longitude < -180 or longitude > 180 or latitude < -90 or latitude > 90:
raise Exception("Malformed coordinates")
d[f + '_location'] = [float(d[lon_field]), float(d[lat_field])]
del d[lon_field]
del d[lat_field]

def to_underscore(s):
s = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', s)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s).lower()

def to_json(f):
fields = []
for field in f.readline().strip().split(','):
field = to_underscore(field)
if field.startswith('tpep_') or field.startswith('lpep_'):
field = field[5:]
elif field == 'ratecode_id':
field = 'rate_code_id'
fields.append(field)
for line in f.readlines():
cols = line.strip().split(',')
if len(cols) < len(fields):
raise Exception("Cannot parse '%s': number of fields does not match '%s'" %(line, ",".join(fields)))

try:
d = {}
for i in range(len(fields)):
field = fields[i]
value = cols[i]
if value != '': # the way csv says the field does not exist
d[field] = value

to_geo_point(d, 'pickup')
to_geo_point(d, 'dropoff')

for (k, v) in d.items():
if k not in types:
raise Exception("Unknown field '%s'" %k)
t = types[k]
try:
if t == 'integer':
d[k] = int(v)
elif t == 'float':
d[k] = float(v)
except Exception as cause:
raise Exception("Cannot parse (%s,%s)" %(k, v)) from cause

print(json.dumps(d))
except KeyboardInterrupt:
break
except Exception as e:
print("Skipping malformed entry '%s' because of %s" %(line, str(e)), file=sys.stderr)

if sys.argv[1] == "json":
for file_name in sys.argv[2:]:
with open(file_name) as f:
to_json(f)
elif sys.argv[1] == "mappings":
write_mappings()
else:
raise Exception("Expected 'json' or 'mappings' but got %s" %sys.argv[1])
107 changes: 107 additions & 0 deletions nyc_taxis/track.json
@@ -0,0 +1,107 @@
{
"meta": {
"short-description": "Trip records completed in yellow and green taxis in New York in 2015",
"description": "This test indexes 165M taxi rides using 8 client threads and 10,000 docs per bulk request against Elasticsearch",
"data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/nyc_taxis"
},
"indices": [
{
"name": "nyc_taxis",
"types": [
{
"name": "type",
"mapping": "mappings.json",
"documents": "documents.json.bz2",
"document-count": 165346692,
"compressed-bytes": 4812721501,
"uncompressed-bytes": 79802445255
}
]
}
],
"operations": [
{
"name": "index",
"type": "index",
"index-settings": {
"index.number_of_shards": 1,
"index.codec": "best_compression",
"index.number_of_replicas": 0,
"index.refresh_interval": "30s",
"index.translog.flush_threshold_size": "4g"
},
"bulk-size": 10000,
"force-merge": false,
"clients": {
"count": 8
}
},
{
"name": "search",
"type": "search",
"target-throughput": 1,
"warmup-iterations": 100,
"iterations": 100,
"clients": {
"count": 1
},
"queries": [
{
"name": "default",
"body": {
"query": {
"match_all": {}
}
}
},
{
"name": "range",
"body": {
"query": {
"range": {
"total_amount": {
"gte": 5,
"lt": 15
}
}
}
}
},
{
"name": "distance_amount_agg",
"cache": false,
"body": {
"size": 0,
"aggs": {
"distance_histo": {
"histogram": {
"field": "distance",
"interval": 1
},
"aggs": {
"total_amount_stats": {
"stats": {
"field": "total_amount"
}
}
}
}
}
}
}
]
}
],
"challenges": [
{
"name": "append-no-conflicts",
"description": "index data and then run a couple search requests",
"schedule": [
"index",
"search"
]
}

]
}