-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathkafka-tutorial.html
644 lines (621 loc) · 44.8 KB
/
kafka-tutorial.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>5.8. Kafka Connector Tutorial — Presto 0.153 Documentation</title>
<link rel="stylesheet" href="../_static/presto.css" type="text/css" />
<link rel="stylesheet" href="../_static/pygments.css" type="text/css" />
<script type="text/javascript">
var DOCUMENTATION_OPTIONS = {
URL_ROOT: '../',
VERSION: '0.153',
COLLAPSE_INDEX: false,
FILE_SUFFIX: '.html',
HAS_SOURCE: true
};
</script>
<script type="text/javascript" src="../_static/jquery.js"></script>
<script type="text/javascript" src="../_static/underscore.js"></script>
<script type="text/javascript" src="../_static/doctools.js"></script>
<link rel="top" title="Presto 0.153 Documentation" href="../index.html" />
<link rel="up" title="5. Connectors" href="../connector.html" />
<link rel="next" title="5.9. Local File Connector" href="localfile.html" />
<link rel="prev" title="5.7. Kafka Connector" href="kafka.html" />
</head>
<body role="document">
<div class="header">
<h1 class="heading"><a href="../index.html">
<span>Presto 0.153 Documentation</span></a></h1>
<h2 class="heading"><span>5.8. Kafka Connector Tutorial</span></h2>
</div>
<div class="topnav">
<p class="nav">
<span class="left">
« <a href="kafka.html">5.7. Kafka Connector</a>
</span>
<span class="right">
<a href="localfile.html">5.9. Local File Connector</a> »
</span>
</p>
</div>
<div class="content">
<div class="section" id="kafka-connector-tutorial">
<h1>5.8. Kafka Connector Tutorial</h1>
<div class="contents local topic" id="contents">
<ul class="simple">
<li><a class="reference internal" href="#introduction" id="id1">Introduction</a></li>
<li><a class="reference internal" href="#installation" id="id2">Installation</a><ul>
<li><a class="reference internal" href="#step-1-install-apache-kafka" id="id3">Step 1: Install Apache Kafka</a></li>
<li><a class="reference internal" href="#step-2-load-data" id="id4">Step 2: Load data</a></li>
<li><a class="reference internal" href="#step-3-make-the-kafka-topics-known-to-presto" id="id5">Step 3: Make the Kafka topics known to Presto</a></li>
<li><a class="reference internal" href="#step-4-basic-data-querying" id="id6">Step 4: Basic data querying</a></li>
<li><a class="reference internal" href="#step-5-add-a-topic-decription-file" id="id7">Step 5: Add a topic decription file</a></li>
<li><a class="reference internal" href="#step-6-map-all-the-values-from-the-topic-message-onto-columns" id="id8">Step 6: Map all the values from the topic message onto columns</a></li>
<li><a class="reference internal" href="#step-7-use-live-data" id="id9">Step 7: Use live data</a></li>
<li><a class="reference internal" href="#epilogue-time-stamps" id="id10">Epilogue: Time stamps</a></li>
</ul>
</li>
</ul>
</div>
<div class="section" id="introduction">
<h2>Introduction</h2>
<p>The Kafka Connector for Presto allows access to live topic data from
Apache Kafka using Presto. This tutorial shows how to set up topics and
how to create the topic description files that back Presto tables.</p>
</div>
<div class="section" id="installation">
<h2>Installation</h2>
<p>This tutorial assumes familiarity with Presto and a working local Presto
installation (see <a class="reference internal" href="../installation/deployment.html"><span class="doc">Deploying Presto</span></a>). It will focus on
setting up Apache Kafka and integrating it with Presto.</p>
<div class="section" id="step-1-install-apache-kafka">
<h3>Step 1: Install Apache Kafka</h3>
<p>Download and extract <a class="reference external" href="https://kafka.apache.org/">Apache Kafka</a>.</p>
<div class="admonition note">
<p class="first admonition-title">Note</p>
<p class="last">This tutorial was tested with Apache Kafka 0.8.1.
It should work with any 0.8.x version of Apache Kafka.</p>
</div>
<p>Start ZooKeeper and the Kafka server:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
</pre></div>
</div>
<div class="highlight-none"><div class="highlight"><pre><span></span>$ bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
</pre></div>
</div>
<p>This will start Zookeeper on port <code class="docutils literal"><span class="pre">2181</span></code> and Kafka on port <code class="docutils literal"><span class="pre">9092</span></code>.</p>
</div>
<div class="section" id="step-2-load-data">
<h3>Step 2: Load data</h3>
<p>Download the tpch-kafka loader from Maven central:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>$ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
$ chmod 755 kafka-tpch
</pre></div>
</div>
<p>Now run the <code class="docutils literal"><span class="pre">kafka-tpch</span></code> program to preload a number of topics with tpch data:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny
2014-07-28T17:17:07.594-0700 INFO main io.airlift.log.Logging Logging to stderr
2014-07-28T17:17:07.623-0700 INFO main de.softwareforge.kafka.LoadCommand Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region]
2014-07-28T17:17:07.981-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2014-07-28T17:17:07.981-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2014-07-28T17:17:07.981-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Loading table 'partsupp' into topic 'tpch.partsupp'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Loading table 'supplier' into topic 'tpch.supplier'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Loading table 'nation' into topic 'tpch.nation'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Loading table 'region' into topic 'tpch.region'...
2014-07-28T17:17:10.612-0700 ERROR pool-1-thread-8 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.region
2014-07-28T17:17:10.781-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Generated 5 rows for table 'region'.
2014-07-28T17:17:10.797-0700 ERROR pool-1-thread-3 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.lineitem
2014-07-28T17:17:10.932-0700 ERROR pool-1-thread-1 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.customer
2014-07-28T17:17:11.068-0700 ERROR pool-1-thread-2 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.orders
2014-07-28T17:17:11.200-0700 ERROR pool-1-thread-6 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.supplier
2014-07-28T17:17:11.319-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Generated 100 rows for table 'supplier'.
2014-07-28T17:17:11.333-0700 ERROR pool-1-thread-4 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.part
2014-07-28T17:17:11.466-0700 ERROR pool-1-thread-5 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.partsupp
2014-07-28T17:17:11.597-0700 ERROR pool-1-thread-7 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.nation
2014-07-28T17:17:11.706-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Generated 25 rows for table 'nation'.
2014-07-28T17:17:12.180-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Generated 1500 rows for table 'customer'.
2014-07-28T17:17:12.251-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Generated 2000 rows for table 'part'.
2014-07-28T17:17:12.905-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Generated 15000 rows for table 'orders'.
2014-07-28T17:17:12.919-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Generated 8000 rows for table 'partsupp'.
2014-07-28T17:17:13.877-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Generated 60175 rows for table 'lineitem'.
</pre></div>
</div>
<p>Kafka now has a number of topics that are preloaded with data to query.</p>
</div>
<div class="section" id="step-3-make-the-kafka-topics-known-to-presto">
<h3>Step 3: Make the Kafka topics known to Presto</h3>
<p>In your Presto installation, add a catalog properties file
<code class="docutils literal"><span class="pre">etc/catalog/kafka.properties</span></code> for the Kafka connector.
This file lists the Kafka nodes and topics:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>connector.name=kafka
kafka.nodes=localhost:9092
kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region
kafka.hide-internal-columns=false
</pre></div>
</div>
<p>Now start Presto:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>$ bin/launcher start
</pre></div>
</div>
<p>Because the Kafka tables all have the <code class="docutils literal"><span class="pre">tpch.</span></code> prefix in the configuration,
the tables are in the <code class="docutils literal"><span class="pre">tpch</span></code> schema. The connector is mounted into the
<code class="docutils literal"><span class="pre">kafka</span></code> catalog because the properties file is named <code class="docutils literal"><span class="pre">kafka.properties</span></code>.</p>
<p>Start the <a class="reference internal" href="../installation/cli.html"><span class="doc">Presto CLI</span></a>:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>$ ./presto --catalog kafka --schema tpch
</pre></div>
</div>
<p>List the tables to verify that things are working:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>presto:tpch> SHOW TABLES;
Table
----------
customer
lineitem
nation
orders
part
partsupp
region
supplier
(8 rows)
</pre></div>
</div>
</div>
<div class="section" id="step-4-basic-data-querying">
<h3>Step 4: Basic data querying</h3>
<p>Kafka data is unstructured and it has no metadata to describe the format of
the messages. Without further configuration, the Kafka connector can access
the data and map it in raw form but there are no actual columns besides the
built-in ones:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>presto:tpch> DESCRIBE customer;
Column | Type | Null | Partition Key | Comment
-------------------+---------+------+---------------+---------------------------------------------
_partition_id | bigint | true | false | Partition Id
_partition_offset | bigint | true | false | Offset for the message within the partition
_segment_start | bigint | true | false | Segment start offset
_segment_end | bigint | true | false | Segment end offset
_segment_count | bigint | true | false | Running message count per segment
_key | varchar | true | false | Key text
_key_corrupt | boolean | true | false | Key data is corrupt
_key_length | bigint | true | false | Total number of key bytes
_message | varchar | true | false | Message text
_message_corrupt | boolean | true | false | Message data is corrupt
_message_length | bigint | true | false | Total number of message bytes
(11 rows)
presto:tpch> SELECT count(*) FROM customer;
_col0
-------
1500
presto:tpch> SELECT _message FROM customer LIMIT 5;
_message
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
{"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithel
{"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}
{"rowNumber":7,"customerKey":7,"name":"Customer#000000007","address":"TcGe5gaZNgVePxU5kRrvXBfkasDTea","nationKey":18,"phone":"28-190-982-9759","accountBalance":9561.95,"marketSegment":"AUTOMOBILE","comment":"ainst the ironic, express theodolites. express, even pinto bean
{"rowNumber":9,"customerKey":9,"name":"Customer#000000009","address":"xKiAFTjUsCuxfeleNqefumTrjS","nationKey":8,"phone":"18-338-906-3675","accountBalance":8324.07,"marketSegment":"FURNITURE","comment":"r theodolites according to the requests wake thinly excuses: pending
(5 rows)
presto:tpch> SELECT sum(cast(json_extract_scalar(_message, '$.accountBalance') AS double)) FROM customer LIMIT 10;
_col0
------------
6681865.59
(1 row)
</pre></div>
</div>
<p>The data from Kafka can be queried using Presto but it is not yet in
actual table shape. The raw data is available through the <code class="docutils literal"><span class="pre">_message</span></code> and
<code class="docutils literal"><span class="pre">_key</span></code> columns but it is not decoded into columns. As the sample data is
in JSON format, the <a class="reference internal" href="../functions/json.html"><span class="doc">JSON Functions</span></a> built into Presto can be used
to slice the data.</p>
</div>
<div class="section" id="step-5-add-a-topic-decription-file">
<h3>Step 5: Add a topic decription file</h3>
<p>The Kafka connector supports topic description files to turn raw data into
table format. These files are located in the <code class="docutils literal"><span class="pre">etc/kafka</span></code> folder in the
Presto installation and must end with <code class="docutils literal"><span class="pre">.json</span></code>. It is recommended that
the file name matches the table name but this is not necessary.</p>
<p>Add the following file as <code class="docutils literal"><span class="pre">etc/kafka/tpch.customer.json</span></code> and restart Presto:</p>
<div class="highlight-json"><div class="highlight"><pre><span></span><span class="p">{</span>
<span class="nt">"tableName"</span><span class="p">:</span> <span class="s2">"customer"</span><span class="p">,</span>
<span class="nt">"schemaName"</span><span class="p">:</span> <span class="s2">"tpch"</span><span class="p">,</span>
<span class="nt">"topicName"</span><span class="p">:</span> <span class="s2">"tpch.customer"</span><span class="p">,</span>
<span class="nt">"key"</span><span class="p">:</span> <span class="p">{</span>
<span class="nt">"dataFormat"</span><span class="p">:</span> <span class="s2">"raw"</span><span class="p">,</span>
<span class="nt">"fields"</span><span class="p">:</span> <span class="p">[</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"kafka_key"</span><span class="p">,</span>
<span class="nt">"dataFormat"</span><span class="p">:</span> <span class="s2">"LONG"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"BIGINT"</span><span class="p">,</span>
<span class="nt">"hidden"</span><span class="p">:</span> <span class="s2">"false"</span>
<span class="p">}</span>
<span class="p">]</span>
<span class="p">}</span>
<span class="p">}</span>
</pre></div>
</div>
<p>The customer table now has an additional column: <code class="docutils literal"><span class="pre">kafka_key</span></code>.</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>presto:tpch> DESCRIBE customer;
Column | Type | Null | Partition Key | Comment
-------------------+---------+------+---------------+---------------------------------------------
kafka_key | bigint | true | false |
_partition_id | bigint | true | false | Partition Id
_partition_offset | bigint | true | false | Offset for the message within the partition
_segment_start | bigint | true | false | Segment start offset
_segment_end | bigint | true | false | Segment end offset
_segment_count | bigint | true | false | Running message count per segment
_key | varchar | true | false | Key text
_key_corrupt | boolean | true | false | Key data is corrupt
_key_length | bigint | true | false | Total number of key bytes
_message | varchar | true | false | Message text
_message_corrupt | boolean | true | false | Message data is corrupt
_message_length | bigint | true | false | Total number of message bytes
(12 rows)
presto:tpch> SELECT kafka_key FROM customer ORDER BY kafka_key LIMIT 10;
kafka_key
-----------
0
1
2
3
4
5
6
7
8
9
(10 rows)
</pre></div>
</div>
<p>The topic definition file maps the internal Kafka key (which is a raw long
in eight bytes) onto a Presto <code class="docutils literal"><span class="pre">BIGINT</span></code> column.</p>
</div>
<div class="section" id="step-6-map-all-the-values-from-the-topic-message-onto-columns">
<h3>Step 6: Map all the values from the topic message onto columns</h3>
<p>Update the <code class="docutils literal"><span class="pre">etc/kafka/tpch.customer.json</span></code> file to add fields for the
message and restart Presto. As the fields in the message are JSON, it uses
the <code class="docutils literal"><span class="pre">json</span></code> data format. This is an example where different data formats
are used for the key and the message.</p>
<div class="highlight-json"><div class="highlight"><pre><span></span><span class="p">{</span>
<span class="nt">"tableName"</span><span class="p">:</span> <span class="s2">"customer"</span><span class="p">,</span>
<span class="nt">"schemaName"</span><span class="p">:</span> <span class="s2">"tpch"</span><span class="p">,</span>
<span class="nt">"topicName"</span><span class="p">:</span> <span class="s2">"tpch.customer"</span><span class="p">,</span>
<span class="nt">"key"</span><span class="p">:</span> <span class="p">{</span>
<span class="nt">"dataFormat"</span><span class="p">:</span> <span class="s2">"raw"</span><span class="p">,</span>
<span class="nt">"fields"</span><span class="p">:</span> <span class="p">[</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"kafka_key"</span><span class="p">,</span>
<span class="nt">"dataFormat"</span><span class="p">:</span> <span class="s2">"LONG"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"BIGINT"</span><span class="p">,</span>
<span class="nt">"hidden"</span><span class="p">:</span> <span class="s2">"false"</span>
<span class="p">}</span>
<span class="p">]</span>
<span class="p">},</span>
<span class="nt">"message"</span><span class="p">:</span> <span class="p">{</span>
<span class="nt">"dataFormat"</span><span class="p">:</span> <span class="s2">"json"</span><span class="p">,</span>
<span class="nt">"fields"</span><span class="p">:</span> <span class="p">[</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"row_number"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"rowNumber"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"BIGINT"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"customer_key"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"customerKey"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"BIGINT"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"name"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"name"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"address"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"address"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"nation_key"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"nationKey"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"BIGINT"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"phone"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"phone"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"account_balance"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"accountBalance"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"DOUBLE"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"market_segment"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"marketSegment"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"comment"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"comment"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">}</span>
<span class="p">]</span>
<span class="p">}</span>
<span class="p">}</span>
</pre></div>
</div>
<p>Now for all the fields in the JSON of the message, columns are defined and
the sum query from earlier can operate on the <code class="docutils literal"><span class="pre">account_balance</span></code> column directly:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>presto:tpch> DESCRIBE customer;
Column | Type | Null | Partition Key | Comment
-------------------+---------+------+---------------+---------------------------------------------
kafka_key | bigint | true | false |
row_number | bigint | true | false |
customer_key | bigint | true | false |
name | varchar | true | false |
address | varchar | true | false |
nation_key | bigint | true | false |
phone | varchar | true | false |
account_balance | double | true | false |
market_segment | varchar | true | false |
comment | varchar | true | false |
_partition_id | bigint | true | false | Partition Id
_partition_offset | bigint | true | false | Offset for the message within the partition
_segment_start | bigint | true | false | Segment start offset
_segment_end | bigint | true | false | Segment end offset
_segment_count | bigint | true | false | Running message count per segment
_key | varchar | true | false | Key text
_key_corrupt | boolean | true | false | Key data is corrupt
_key_length | bigint | true | false | Total number of key bytes
_message | varchar | true | false | Message text
_message_corrupt | boolean | true | false | Message data is corrupt
_message_length | bigint | true | false | Total number of message bytes
(21 rows)
presto:tpch> SELECT * FROM customer LIMIT 5;
kafka_key | row_number | customer_key | name | address | nation_key | phone | account_balance | market_segment | comment
-----------+------------+--------------+--------------------+---------------------------------------+------------+-----------------+-----------------+----------------+---------------------------------------------------------------------------------------------------------
1 | 2 | 2 | Customer#000000002 | XSTf4,NCwDVaWNe6tEgvwfmRchLXak | 13 | 23-768-687-3665 | 121.65 | AUTOMOBILE | l accounts. blithely ironic theodolites integrate boldly: caref
3 | 4 | 4 | Customer#000000004 | XxVSJsLAGtn | 4 | 14-128-190-5944 | 2866.83 | MACHINERY | requests. final, regular ideas sleep final accou
5 | 6 | 6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn | 20 | 30-114-968-4951 | 7638.57 | AUTOMOBILE | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious
7 | 8 | 8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 | 17 | 27-147-574-9335 | 6819.74 | BUILDING | among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly alon
9 | 10 | 10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 5 | 15-741-346-9870 | 2753.54 | HOUSEHOLD | es regular deposits haggle. fur
(5 rows)
presto:tpch> SELECT sum(account_balance) FROM customer LIMIT 10;
_col0
------------
6681865.59
(1 row)
</pre></div>
</div>
<p>Now all the fields from the <code class="docutils literal"><span class="pre">customer</span></code> topic messages are available as
Presto table columns.</p>
</div>
<div class="section" id="step-7-use-live-data">
<h3>Step 7: Use live data</h3>
<p>Presto can query live data in Kafka as it arrives. To simulate a live feed
of data, this tutorial sets up a feed of live tweets into Kafka.</p>
<div class="section" id="setup-a-live-twitter-feed">
<h4>Setup a live Twitter feed</h4>
<ul class="simple">
<li>Download the twistr tool</li>
</ul>
<div class="highlight-none"><div class="highlight"><pre><span></span>$ curl -o twistr https://repo1.maven.org/maven2/de/softwareforge/twistr_kafka_0811/1.2/twistr_kafka_0811-1.2.sh
$ chmod 755 twistr
</pre></div>
</div>
<ul class="simple">
<li>Create a developer account at <a class="reference external" href="https://dev.twitter.com/">https://dev.twitter.com/</a> and set up an
access and consumer token.</li>
<li>Create a <code class="docutils literal"><span class="pre">twistr.properties</span></code> file and put the access and consumer key
and secrets into it:</li>
</ul>
<div class="highlight-none"><div class="highlight"><pre><span></span>twistr.access-token-key=...
twistr.access-token-secret=...
twistr.consumer-key=...
twistr.consumer-secret=...
twistr.kafka.brokers=localhost:9092
</pre></div>
</div>
</div>
<div class="section" id="create-a-tweets-table-on-presto">
<h4>Create a tweets table on Presto</h4>
<p>Add the tweets table to the <code class="docutils literal"><span class="pre">etc/catalog/kafka.properties</span></code> file:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>connector.name=kafka
kafka.nodes=localhost:9092
kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region,tweets
kafka.hide-internal-columns=false
</pre></div>
</div>
<p>Add a topic definition file for the Twitter feed as <code class="docutils literal"><span class="pre">etc/kafka/tweets.json</span></code>:</p>
<div class="highlight-json"><div class="highlight"><pre><span></span><span class="p">{</span>
<span class="nt">"tableName"</span><span class="p">:</span> <span class="s2">"tweets"</span><span class="p">,</span>
<span class="nt">"topicName"</span><span class="p">:</span> <span class="s2">"twitter_feed"</span><span class="p">,</span>
<span class="nt">"dataFormat"</span><span class="p">:</span> <span class="s2">"json"</span><span class="p">,</span>
<span class="nt">"key"</span><span class="p">:</span> <span class="p">{</span>
<span class="nt">"dataFormat"</span><span class="p">:</span> <span class="s2">"raw"</span><span class="p">,</span>
<span class="nt">"fields"</span><span class="p">:</span> <span class="p">[</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"kafka_key"</span><span class="p">,</span>
<span class="nt">"dataFormat"</span><span class="p">:</span> <span class="s2">"LONG"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"BIGINT"</span><span class="p">,</span>
<span class="nt">"hidden"</span><span class="p">:</span> <span class="s2">"false"</span>
<span class="p">}</span>
<span class="p">]</span>
<span class="p">},</span>
<span class="nt">"message"</span><span class="p">:</span> <span class="p">{</span>
<span class="nt">"dataFormat"</span><span class="p">:</span><span class="s2">"json"</span><span class="p">,</span>
<span class="nt">"fields"</span><span class="p">:</span> <span class="p">[</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"text"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"text"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"user_name"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"user/screen_name"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"lang"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"lang"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"created_at"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"created_at"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"TIMESTAMP"</span><span class="p">,</span>
<span class="nt">"dataFormat"</span><span class="p">:</span> <span class="s2">"rfc2822"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"favorite_count"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"favorite_count"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"BIGINT"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"retweet_count"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"retweet_count"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"BIGINT"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"favorited"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"favorited"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"BOOLEAN"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"id"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"id_str"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"in_reply_to_screen_name"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"in_reply_to_screen_name"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">},</span>
<span class="p">{</span>
<span class="nt">"name"</span><span class="p">:</span> <span class="s2">"place_name"</span><span class="p">,</span>
<span class="nt">"mapping"</span><span class="p">:</span> <span class="s2">"place/full_name"</span><span class="p">,</span>
<span class="nt">"type"</span><span class="p">:</span> <span class="s2">"VARCHAR"</span>
<span class="p">}</span>
<span class="p">]</span>
<span class="p">}</span>
<span class="p">}</span>
</pre></div>
</div>
<p>As this table does not have an explicit schema name, it will be placed
into the <code class="docutils literal"><span class="pre">default</span></code> schema.</p>
</div>
<div class="section" id="feed-live-data">
<h4>Feed live data</h4>
<p>Start the twistr tool:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>$ java -Dness.config.location=file:$(pwd) -Dness.config=twistr -jar ./twistr
</pre></div>
</div>
<p><code class="docutils literal"><span class="pre">twistr</span></code> connects to the Twitter API and feeds the “sample tweet” feed
into a Kafka topic called <code class="docutils literal"><span class="pre">twitter_feed</span></code>.</p>
<p>Now run queries against live data:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>$ ./presto-cli --catalog kafka --schema default
presto:default> SELECT count(*) FROM tweets;
_col0
-------
4467
(1 row)
presto:default> SELECT count(*) FROM tweets;
_col0
-------
4517
(1 row)
presto:default> SELECT count(*) FROM tweets;
_col0
-------
4572
(1 row)
presto:default> SELECT kafka_key, user_name, lang, created_at FROM tweets LIMIT 10;
kafka_key | user_name | lang | created_at
--------------------+-----------------+------+-------------------------
494227746231685121 | burncaniff | en | 2014-07-29 14:07:31.000
494227746214535169 | gu8tn | ja | 2014-07-29 14:07:31.000
494227746219126785 | pequitamedicen | es | 2014-07-29 14:07:31.000
494227746201931777 | josnyS | ht | 2014-07-29 14:07:31.000
494227746219110401 | Cafe510 | en | 2014-07-29 14:07:31.000
494227746210332673 | Da_JuanAnd_Only | en | 2014-07-29 14:07:31.000
494227746193956865 | Smile_Kidrauhl6 | pt | 2014-07-29 14:07:31.000
494227750426017793 | CashforeverCD | en | 2014-07-29 14:07:32.000
494227750396653569 | FilmArsivimiz | tr | 2014-07-29 14:07:32.000
494227750388256769 | jmolas | es | 2014-07-29 14:07:32.000
(10 rows)
</pre></div>
</div>
<p>There is now a live feed into Kafka which can be queried using Presto.</p>
</div>
</div>
<div class="section" id="epilogue-time-stamps">
<h3>Epilogue: Time stamps</h3>
<p>The tweets feed that was set up in the last step contains a time stamp in
RFC 2822 format as <code class="docutils literal"><span class="pre">created_at</span></code> attribute in each tweet.</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>presto:default> SELECT DISTINCT json_extract_scalar(_message, '$.created_at')) AS raw_date
-> FROM tweets LIMIT 5;
raw_date
--------------------------------
Tue Jul 29 21:07:31 +0000 2014
Tue Jul 29 21:07:32 +0000 2014
Tue Jul 29 21:07:33 +0000 2014
Tue Jul 29 21:07:34 +0000 2014
Tue Jul 29 21:07:35 +0000 2014
(5 rows)
</pre></div>
</div>
<p>The topic definition file for the tweets table contains a mapping onto a
timestamp using the <code class="docutils literal"><span class="pre">rfc2822</span></code> converter:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>...
{
"name": "created_at",
"mapping": "created_at",
"type": "TIMESTAMP",
"dataFormat": "rfc2822"
},
...
</pre></div>
</div>
<p>This allows the raw data to be mapped onto a Presto timestamp column:</p>
<div class="highlight-none"><div class="highlight"><pre><span></span>presto:default> SELECT created_at, raw_date FROM (
-> SELECT created_at, json_extract_scalar(_message, '$.created_at') AS raw_date
-> FROM tweets)
-> GROUP BY 1, 2 LIMIT 5;
created_at | raw_date
-------------------------+--------------------------------
2014-07-29 14:07:20.000 | Tue Jul 29 21:07:20 +0000 2014
2014-07-29 14:07:21.000 | Tue Jul 29 21:07:21 +0000 2014
2014-07-29 14:07:22.000 | Tue Jul 29 21:07:22 +0000 2014
2014-07-29 14:07:23.000 | Tue Jul 29 21:07:23 +0000 2014
2014-07-29 14:07:24.000 | Tue Jul 29 21:07:24 +0000 2014
(5 rows)
</pre></div>
</div>
<p>The Kafka connector contains converters for ISO 8601, RFC 2822 text
formats and for number-based timestamps using seconds or miilliseconds
since the epoch. There is also a generic, text-based formatter which uses
Joda-Time format strings to parse text columns.</p>
</div>
</div>
</div>
</div>
<div class="bottomnav">
<p class="nav">
<span class="left">
« <a href="kafka.html">5.7. Kafka Connector</a>
</span>
<span class="right">
<a href="localfile.html">5.9. Local File Connector</a> »
</span>
</p>
</div>
<div class="footer" role="contentinfo">
</div>
</body>
</html>