-
Notifications
You must be signed in to change notification settings - Fork 63
/
context.py
1114 lines (971 loc) · 46.8 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""CartoContext class for authentication with CARTO and high-level operations
such as reading tables from CARTO into dataframes, writing dataframes to CARTO
tables, and creating custom maps from dataframes and CARTO tables. Future
methods interact with CARTO's services like
`Data Observatory <https://carto.com/data-observatory>`__, and `routing,
geocoding, and isolines <https://carto.com/location-data-services/>`__.
"""
import json
import os
import random
import sys
import time
import collections
import binascii as ba
from warnings import warn
import requests
import IPython
import pandas as pd
from tqdm import tqdm
from carto.auth import APIKeyAuthClient
from carto.sql import SQLClient
from carto.exceptions import CartoException
from .utils import dict_items, normalize_colnames
from .layer import BaseMap
from .maps import non_basemap_layers, get_map_name, get_map_template
if sys.version_info >= (3, 0):
from urllib.parse import urlparse, urlencode
else:
from urlparse import urlparse
from urllib import urlencode
try:
import matplotlib.image as mpi
import matplotlib.pyplot as plt
# set dpi based on CARTO Static Maps API dpi
mpi.rcParams['figure.dpi'] = 72.0
except (ImportError, RuntimeError):
mpi = None
plt = None
HAS_MATPLOTLIB = plt is not None
# Choose constant to avoid overview generation which are triggered at a
# half million rows
MAX_IMPORT_ROWS = 499999
class CartoContext(object):
"""Manages connections with CARTO for data and map operations. Modeled
after `SparkContext
<https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sparkcontext.html>`__.
Example:
Create a CartoContext object::
import cartoframes
cc = cartoframes.CartoContext(BASEURL, APIKEY)
Args:
base_url (str): Base URL of CARTO user account. Cloud-based accounts
are of the form ``https://{username}.carto.com`` (e.g.,
https://eschbacher.carto.com for user ``eschbacher``). On-premises
installation users should ask their admin.
api_key (str): CARTO API key.
session (requests.Session, optional): requests session. See `requests
documentation
<http://docs.python-requests.org/en/master/user/advanced/>`__
for more information:
verbose (bool, optional): Output underlying process states (True), or
suppress (False, default)
Returns:
:obj:`CartoContext`: A CartoContext object that is authenticated
against the user's CARTO account.
"""
def __init__(self, base_url=None, api_key=None, session=None, verbose=0):
self.api_key, self.base_url = _process_credentials(api_key,
base_url)
self.auth_client = APIKeyAuthClient(base_url=self.base_url,
api_key=self.api_key,
session=session)
self.sql_client = SQLClient(self.auth_client)
self.username = self.auth_client.username
self.is_org = self._is_org_user()
self._map_templates = {}
self._srcdoc = None
self._verbose = verbose
def _is_org_user(self):
"""Report whether user is in a multiuser CARTO organization or not"""
res = self.sql_client.send('SHOW search_path')
paths = [p.strip() for p in res['rows'][0]['search_path'].split(',')]
# is an org user if first item is not `public`
return paths[0] != 'public'
def read(self, table_name, limit=None, index='cartodb_id',
decode_geom=False):
"""Read tables from CARTO into pandas DataFrames.
Example:
.. code:: python
import cartoframes
cc = cartoframes.CartoContext(BASEURL, APIKEY)
df = cc.read('acadia_biodiversity')
Args:
table_name (str): Name of table in user's CARTO account.
limit (int, optional): Read only ``limit`` lines from
``table_name``. Defaults to `None`, which reads the full table.
index (str, optional): Not currently in use.
decode_geom (bool, optional): Defaults to `False`, which reads the
table into a pandas DataFrame as is. If `True`, reads table into
a pandas DataFrame with wkb geometries found in column
`the_geom` decoded as shapely geometries in column named
`geometry`.
Returns:
pandas.DataFrame: DataFrame representation of `table_name` from
CARTO.
"""
query = 'SELECT * FROM "{table_name}"'.format(table_name=table_name)
if limit:
if isinstance(limit, int) and (limit >= 0):
query += ' LIMIT {limit}'.format(limit=limit)
else:
raise ValueError("`limit` parameter must an integer >= 0")
return self.query(query, decode_geom=decode_geom)
def write(self, df, table_name, temp_dir='/tmp', overwrite=False,
lnglat=None, encode_geom=False, geom_col=None):
"""Write a DataFrame to a CARTO table.
Example:
.. code:: python
cc.write(df, 'brooklyn_poverty', overwrite=True)
Args:
df (pandas.DataFrame): DataFrame to write to ``table_name`` in user
CARTO account
table_name (str): Table to write ``df`` to in CARTO.
temp_dir (str, optional): Directory for temporary storage of data
that is sent to CARTO. Default is ``/tmp`` (Unix-like systems).
overwrite (bool, optional): Behavior for overwriting ``table_name``
if it exits on CARTO. Defaults to ``False``.
lnglat (tuple, optional): lng/lat pair that can be used for
creating a geometry on CARTO. Defaults to ``None``. In some
cases, geometry will be created without specifying this. See
CARTO's `Import API
<https://carto.com/docs/carto-engine/import-api/standard-tables>`__
for more information.
encode_geom (bool, optional): Whether to write `geom_col` to CARTO
as `the_geom`.
geom_col (str, optional): The name of the column where geometry
information is stored. Used in conjunction with `encode_geom`.
Returns:
None
"""
if encode_geom:
_add_encoded_geom(df, geom_col)
if not overwrite:
# error if table exists and user does not want to overwrite
self._table_exists(table_name)
pgcolnames = normalize_colnames(df.columns)
if df.shape[0] > MAX_IMPORT_ROWS:
# NOTE: schema is set using different method than in _set_schema
final_table_name = self._send_batches(df, table_name, temp_dir,
geom_col, pgcolnames)
else:
final_table_name = self._send_dataframe(df, table_name, temp_dir,
geom_col, pgcolnames)
self._set_schema(df, final_table_name, pgcolnames)
# create geometry column from lat/longs if requested
if lnglat:
# TODO: make this a batch job if it is a large dataframe or move
# inside of _send_dataframe and/or batch
tqdm.write('Creating geometry out of columns '
'`{lng}`/`{lat}`'.format(lng=lnglat[0],
lat=lnglat[1]))
self.sql_client.send('''
UPDATE "{table_name}"
SET the_geom = CDB_LatLng("{lat}"::numeric,
"{lng}"::numeric)
'''.format(table_name=final_table_name,
lng=lnglat[0],
lat=lnglat[1]))
tqdm.write('Table successfully written to CARTO: '
'{base_url}dataset/{table_name}'.format(
base_url=self.base_url,
table_name=final_table_name))
def delete(self, table_name):
"""Delete table
Args:
table_name (str): Table name to delete
Returns:
None
"""
try:
self.auth_client.send(
'api/v1/viz/{table_name}'.format(table_name=table_name),
http_method='DELETE'
)
except CartoException as err:
warn('Failed to delete the following table from CARTO '
'account: `{table_name}`. ({err})'.format(
table_name=table_name,
err=err))
return None
def _table_exists(self, table_name):
"""Checks to see if table exists"""
try:
self.sql_client.send('''
EXPLAIN SELECT * FROM "{table_name}"
'''.format(table_name=table_name))
raise NameError(
'Table `{table_name}` already exists. '
'Run with `overwrite=True` if you wish to replace the '
'table.'.format(table_name=table_name))
except CartoException as err:
# If table doesn't exist, we get an error from the SQL API
self._debug_print(err=err)
return False
return False
def _send_batches(self, df, table_name, temp_dir, geom_col, pgcolnames):
"""Batch sending a dataframe
Args:
df (pandas.DataFrame): DataFrame that will be batched up for
sending to CARTO
table_name (str): Name of table to send DataFrame to
temp_dir (str): Local directory for temporary storage of DataFrame
written to file that will be sent to CARTO
geom_col (str): Name of encoded geometry column (if any) that will
be dropped or converted to `the_geom` column
pgcolnames (list of str): List of SQL-normalized column names
Returns:
final_table_name (str): Final table name on CARTO that the
DataFrame is stored in
Exceptions:
* TODO: add more (Out of storage)
"""
subtables = []
# send dataframe chunks to carto
for chunk_num, chunk in tqdm(df.groupby([i // MAX_IMPORT_ROWS
for i in range(df.shape[0])]),
desc='Uploading in batches: '):
temp_table = '{orig}_cartoframes_temp_{chunk}'.format(
orig=table_name[:40],
chunk=chunk_num)
try:
# send dataframe chunk, get new name if collision
temp_table = self._send_dataframe(chunk, temp_table, temp_dir,
geom_col, pgcolnames)
except CartoException as err:
for table in subtables:
self.delete(table)
raise CartoException(err)
if temp_table:
subtables.append(temp_table)
self._debug_print(chunk_num=chunk_num,
chunk_shape=str(chunk.shape),
temp_table=temp_table)
# combine chunks into final table
try:
select_base = 'SELECT {schema} FROM "{{table}}"'.format(
schema=_df2pg_schema(df, pgcolnames))
unioned_tables = '\nUNION ALL\n'.join([select_base.format(table=t)
for t in subtables])
self._debug_print(unioned=unioned_tables)
drop_tables = '\n'.join(
'DROP TABLE IF EXISTS {table};'.format(table=table)
for table in subtables)
query = '''
DROP TABLE IF EXISTS "{table_name}";
CREATE TABLE "{table_name}" As {unioned_tables};
ALTER TABLE {table_name} DROP COLUMN IF EXISTS cartodb_id;
{drop_tables}
SELECT CDB_CartoDBFYTable('{org}', '{table_name}');
'''.format(table_name=table_name,
unioned_tables=unioned_tables,
org=self.username if self.is_org else 'public',
drop_tables=drop_tables)
self._debug_print(query=query)
_ = self.sql_client.send(query)
except CartoException as err:
for table in subtables:
self.delete(table)
raise Exception('Failed to upload dataframe: {}'.format(err))
return table_name
def _send_dataframe(self, df, table_name, temp_dir, geom_col, pgcolnames):
"""Send a DataFrame to CARTO to be imported as a SQL table.
Note:
Schema from ``df`` is not enforced with this method. Use
``self._set_schema`` to enforce the schema.
Args:
df (pandas.DataFrame): DataFrame that is will be sent to CARTO
table_name (str): Name on CARTO for the table that will have the
data from ``df``
temp_dir (str): Name of directory used for temporarily storing the
DataFrame file to sent to CARTO
geom_col (str): Name of geometry column
Returns:
final_table_name (str): Name of final table. This method will
overwrite the table `table_name` if it already exists.
"""
def remove_tempfile(filepath):
"""removes temporary file"""
os.remove(filepath)
tempfile = '{temp_dir}/{table_name}.csv'.format(temp_dir=temp_dir,
table_name=table_name)
self._debug_print(tempfile=tempfile)
df.drop(geom_col, axis=1, errors='ignore').to_csv(path_or_buf=tempfile,
na_rep='',
header=pgcolnames)
with open(tempfile, 'rb') as f:
res = self._auth_send('api/v1/imports', 'POST',
files={'file': f},
params={'type_guessing': 'false'},
stream=True)
self._debug_print(res=res)
if not res['success']:
remove_tempfile(tempfile)
raise CartoException('Failed to send DataFrame')
import_id = res['item_queue_id']
remove_tempfile(tempfile)
final_table_name = table_name
while True:
import_job = self._check_import(import_id)
self._debug_print(import_job=import_job)
final_table_name = self._handle_import(import_job, table_name)
if import_job['state'] == 'complete':
break
# Wait a second before doing another request
time.sleep(1.0)
return final_table_name
def _set_schema(self, dataframe, table_name, pgcolnames):
"""Update a table associated with a dataframe to have the equivalent
schema
Args:
dataframe (pandas.DataFrame): Dataframe that CARTO table is cloned
from
table_name (str): Table name where schema is being altered
pgcolnames (list of str): List of column names from ``dataframe``
as they appear on the database
Returns:
None
"""
util_cols = ('the_geom', 'the_geom_webmercator', 'cartodb_id')
alter_temp = ('ALTER COLUMN "{col}" TYPE {ctype} USING '
'NULLIF("{col}", \'\')::{ctype}')
# alter non-util columns that are not text type
alter_cols = ', '.join(alter_temp.format(col=c,
ctype=_dtypes2pg(t))
for c, t in zip(pgcolnames,
dataframe.dtypes)
if c not in util_cols and t != 'object')
alter_query = 'ALTER TABLE "{table}" {alter_cols};'.format(
table=table_name,
alter_cols=alter_cols)
self._debug_print(alter_query=alter_query)
try:
_ = self.sql_client.send(alter_query)
except CartoException as err:
warn('DataFrame written to CARTO but the table schema failed to '
'update to match DataFrame. All columns in CARTO table have '
'data type `text`. CARTO error: `{err}`.'.format(
err=err,
query=alter_query))
def _check_import(self, import_id):
"""Check the status of an Import API job"""
res = self._auth_send('api/v1/imports/{}'.format(import_id),
'GET')
return res
def _handle_import(self, import_job, table_name):
"""Handle state of import job"""
if import_job['state'] == 'failure':
if import_job['error_code'] == 8001:
raise CartoException('Over CARTO account storage limit for '
'user `{}`. Try subsetting your '
'DataFrame or dropping columns to reduce '
'the data size.'.format(self.username))
elif import_job['error_code'] == 6668:
raise CartoException('Too many rows in DataFrame. Try '
'subsetting DataFrame before writing to '
'CARTO.')
else:
raise CartoException('Error code: `{}`. See CARTO Import '
'API error documentation for more '
'information: https://carto.com/docs/'
'carto-engine/import-api/import-errors'
''.format(import_job['error_code']))
elif import_job['state'] == 'complete':
self._debug_print(final_table=import_job['table_name'])
if import_job['table_name'] != table_name:
try:
res = self.sql_client.send('''
DROP TABLE IF EXISTS {orig_table};
ALTER TABLE {dupe_table}
RENAME TO {orig_table};
'''.format(
orig_table=table_name,
dupe_table=import_job['table_name']))
self._debug_print(res=res)
except Exception as err:
self._debug_print(err=err)
raise Exception('Cannot overwrite table `{table_name}` '
'({err}). DataFrame was written to '
'`{new_table}` instead.'.format(
table_name=table_name,
err=err,
new_table=import_job['table_name']))
return table_name
def sync(self, dataframe, table_name):
"""Depending on the size of the DataFrame or CARTO table, perform
granular operations on a DataFrame to only update the changed cells
instead of a bulk upload. If on the large side, perform granular
operations, if on the smaller side use Import API.
Note:
Not yet implemented.
"""
pass
def query(self, query, table_name=None, decode_geom=False):
"""Pull the result from an arbitrary SQL query from a CARTO account
into a pandas DataFrame. Can also be used to perform database
operations (creating/dropping tables, adding columns, updates, etc.).
Args:
query (str): Query to run against CARTO user database.
table_name (str, optional): If set, this will create a new
table in the user's CARTO account that is the result of the
query. Defaults to None (no table created).
decode_geom (bool, optional): Defaults to `False`, which does not
decode geometries. If set to `True`, this will decode wkb
geometries into shapely geometries.
Returns:
pandas.DataFrame: DataFrame representation of query supplied.
Pandas data types are inferred from PostgreSQL data types.
In the case of invalid PostgreSQL date types, the data type 'object'
is used.
"""
self._debug_print(query=query)
if table_name:
create_table_query = '''
CREATE TABLE {table_name} As
SELECT *
FROM ({query}) As _wrap;
SELECT CDB_CartodbfyTable('{org}', '{table_name}');
'''.format(table_name=table_name,
query=query,
org=(self.username if self.is_org else 'public'))
self._debug_print(create_table_query=create_table_query)
create_table_res = self.sql_client.send(create_table_query)
self._debug_print(create_table_res=create_table_res)
new_table_name = create_table_res['rows'][0]['cdb_cartodbfytable']
self._debug_print(new_table_name=new_table_name)
select_res = self.sql_client.send(
'SELECT * FROM {table_name}'.format(table_name=new_table_name))
else:
select_res = self.sql_client.send(query)
self._debug_print(select_res=select_res)
fields = select_res['fields']
if not len(fields):
return pd.DataFrame()
df = pd.DataFrame(data=select_res['rows'])
for field in fields:
if fields[field]['type'] == 'date':
df[field] = pd.to_datetime(df[field], errors='ignore')
self._debug_print(columns=df.columns,
dtypes=df.dtypes)
if 'cartodb_id' in fields:
df.set_index('cartodb_id', inplace=True)
if decode_geom:
df['geometry'] = df.the_geom.apply(_decode_geom)
return df
def map(self, layers=None, interactive=True,
zoom=None, lat=None, lng=None, size=(800, 400),
ax=None):
"""Produce a CARTO map visualizing data layers.
Examples:
Create a map with two data layers, and one BaseMap layer::
import cartoframes
from cartoframes import Layer, BaseMap, styling
cc = cartoframes.CartoContext(BASEURL, APIKEY)
cc.map(layers=[BaseMap(),
Layer('acadia_biodiversity',
color={'column': 'simpson_index',
'scheme': styling.tealRose(7)}),
Layer('peregrine_falcon_nest_sites',
size='num_eggs',
color={'column': 'bird_id',
'scheme': styling.vivid(10))],
interactive=True)
Create a snapshot of a map at a specific zoom and center::
cc.map(layers=Layer('acadia_biodiversity',
color='simpson_index'),
interactive=False,
zoom=14,
lng=-68.3823549,
lat=44.3036906)
Args:
layers (list, optional): List of one or more of the following:
- Layer: cartoframes Layer object for visualizing data from a
CARTO table. See `layer.Layer <#layer.Layer>`__ for all
styling options.
- BaseMap: Basemap for contextualizng data layers. See
`layer.BaseMap <#layer.BaseMap>`__ for all styling options.
- QueryLayer: Layer from an arbitrary query. See
`layer.QueryLayer <#layer.QueryLayer>`__ for all styling
options.
interactive (bool, optional): Defaults to ``True`` to show an
interactive slippy map. Setting to ``False`` creates a static
map.
zoom (int, optional): Zoom level of map. Acceptable values are
usually in the range 0 to 19. 0 has the entire earth on a
single tile (256px square). Zoom 19 is the size of a city
block. Must be used in conjunction with ``lng`` and ``lat``.
Defaults to a view to have all data layers in view.
lat (float, optional): Latitude value for the center of the map.
Must be used in conjunction with ``zoom`` and ``lng``. Defaults
to a view to have all data layers in view.
lng (float, optional): Longitude value for the center of the map.
Must be used in conjunction with ``zoom`` and ``lat``. Defaults
to a view to have all data layers in view.
size (tuple, optional): List of pixel dimensions for the map.
Format is ``(width, height)``. Defaults to ``(800, 400)``.
ax: matplotlib axis on which to draw the image. Only used when
``interactive`` is ``False``.
Returns:
IPython.display.HTML: Interactive maps are rendered in an
``iframe``, while static maps are rendered in ``img`` tags.
"""
# TODO: add layers preprocessing method like
# layers = process_layers(layers)
# that uses up to layer limit value error
if not hasattr(IPython, 'display'):
raise NotImplementedError('Nope, cannot display maps at the '
'command line.')
if layers is None:
layers = []
elif not isinstance(layers, collections.Iterable):
layers = [layers]
else:
layers = list(layers)
if len(layers) > 8:
raise ValueError('map can have at most 8 layers')
nullity = [zoom is None, lat is None, lng is None]
if any(nullity) and not all(nullity):
raise ValueError('zoom, lat, and lng must all or none be provided')
# When no layers are passed, set default zoom
if ((len(layers) == 0 and zoom is None) or
(len(layers) == 1 and layers[0].is_basemap)):
[zoom, lat, lng] = [3, 38, -99]
has_zoom = zoom is not None
# Check basemaps, add one if none exist
base_layers = [idx for idx, layer in enumerate(layers)
if layer.is_basemap]
if len(base_layers) > 1:
raise ValueError('map can at most take 1 BaseMap layer')
if len(base_layers) > 0:
layers.insert(0, layers.pop(base_layers[0]))
else:
layers.insert(0, BaseMap())
# Check for a time layer, if it exists move it to the front
time_layers = [idx for idx, layer in enumerate(layers)
if not layer.is_basemap and layer.time]
time_layer = layers[time_layers[0]] if len(time_layers) > 0 else None
if len(time_layers) > 1:
raise ValueError('Map can at most take 1 Layer with time '
'column/field')
if time_layer:
raise NotImplementedError('Animated maps are not yet supported')
if not interactive:
raise ValueError('map cannot display a static image with a '
'time_column')
layers.append(layers.pop(time_layers[0]))
# If basemap labels are on front, add labels layer
basemap = layers[0]
if basemap.is_basic() and basemap.labels == 'front':
layers.append(BaseMap(basemap.source,
labels=basemap.labels,
only_labels=True))
# Setup layers
for idx, layer in enumerate(layers):
layer._setup(layers, idx)
nb_layers = non_basemap_layers(layers)
options = {'basemap_url': basemap.url}
for idx, layer in enumerate(nb_layers):
self._check_query(layer.query,
style_cols=layer.style_cols)
options['cartocss_' + str(idx)] = layer.cartocss
options['sql_' + str(idx)] = layer.query
params = {
'config': json.dumps(options),
'anti_cache': random.random(),
}
if has_zoom:
params.update({'zoom': zoom, 'lat': lat, 'lon': lng})
options.update({'zoom': zoom, 'lat': lat, 'lng': lng})
else:
options.update(self._get_bounds(nb_layers))
map_name = self._send_map_template(layers, has_zoom=has_zoom)
api_url = '{base_url}api/v1/map'.format(base_url=self.base_url)
static_url = ('{api_url}/static/named/{map_name}'
'/{width}/{height}.png?{params}').format(
api_url=api_url,
map_name=map_name,
width=size[0],
height=size[1],
params=urlencode(params))
html = '<img src="{url}" />'.format(url=static_url)
# TODO: write this as a private method
if interactive:
netloc = urlparse(self.base_url).netloc
domain = 'carto.com' if netloc.endswith('.carto.com') else netloc
def safe_quotes(text, escape_single_quotes=False):
"""htmlify string"""
if isinstance(text, str):
safe_text = text.replace('"', """)
if escape_single_quotes:
safe_text = safe_text.replace("'", "\'")
return safe_text.replace('True', 'true')
return text
config = {
'user_name': self.username,
'maps_api_template': self.base_url[:-1],
'sql_api_template': self.base_url[:-1],
'tiler_protocol': 'https',
'tiler_domain': domain,
'tiler_port': '80',
'type': 'torque' if time_layer else 'namedmap',
'named_map': {
'name': map_name,
'params': {
k: safe_quotes(v, escape_single_quotes=True)
for k, v in dict_items(options)
},
},
}
map_options = {
'filter': ['http', 'mapnik', 'torque'],
'https': True,
}
if time_layer:
config.update({
'order': 1,
'options': {
'query': time_layer.query,
'user_name': self.username,
'tile_style': time_layer.torque_cartocss,
}
})
config['named_map'].update({
'layers': [{
'layer_name': 't',
}],
})
map_options.update({
'time_slider': True,
'loop': True,
})
bounds = [] if has_zoom else [[options['north'], options['east']],
[options['south'], options['west']]]
content = self._get_iframe_srcdoc(config=config,
bounds=bounds,
options=options,
map_options=map_options)
img_html = html
html = (
'<iframe srcdoc="{content}" width={width} height={height}>'
' Preview image: {img_html}'
'</iframe>'
).format(content=safe_quotes(content),
width=size[0],
height=size[1],
img_html=img_html)
return IPython.display.HTML(html)
elif HAS_MATPLOTLIB:
raw_data = mpi.imread(static_url)
if ax is None:
dpi = mpi.rcParams['figure.dpi']
mpl_size = (size[0] / dpi, size[1] / dpi)
fig = plt.figure(figsize=mpl_size, dpi=dpi, frameon=False)
fig.subplots_adjust(left=0, right=1, top=1, bottom=0)
ax = plt.gca()
ax.imshow(raw_data)
ax.axis('off')
return ax
else:
return IPython.display.Image(url=static_url,
embed=True,
format='png',
width=size[0],
height=size[1],
metadata=dict(origin_url=static_url))
def data_boundaries(self, df=None, table_name=None):
"""Not currently implemented"""
pass
def data_discovery(self, keywords=None, regex=None, time=None,
boundary=None):
"""Not currently implemented"""
pass
def data_augment(self, table_name, metadata):
"""Augment an existing CARTO table with `Data Observatory
<https://carto.com/data-observatory>`__ measures. See the full `Data
Observatory catalog
<https://cartodb.github.io/bigmetadata/index.html>`__ for all available
measures. The result of this operation is:
1. It updates `table_name` by adding columns from the Data Observatory
2. It returns a pandas DataFrame representation of that newly augmented
table.
Note:
This method alters `table_name` in the user's CARTO database by
adding additional columns. To avoid this, create a copy of the
table first and use the new copy instead.
Example:
Add new measures to a CARTO table and pass it to a pandas
DataFrame. Using the "Median Household Income in the past 12
months" measure from the `Data Observatory Catalog
<https://cartodb.github.io/bigmetadata/united_states/income.html#median-household-income-in-the-past-12-months>`__.
::
import cartoframes
cc = cartoframes.CartoContext(BASEURL, APIKEY)
median_income = [{'numer_id': 'us.census.acs.B19013001',
'geom_id': 'us.census.tiger.block_group',
'numer_timespan': '2011 - 2015'}]
df = cc.data_augment('transaction_events',
median_income)
Args:
table_name (str): Name of table on CARTO account that Data
Observatory measures are to be added to.
metadata (list of dicts): List of all measures to add to
`table_name`. Each `dict` has the following keys:
- `numer_id` (str): The identifier for the desired measurement
- `geom_id` (str, optional): Identifier for a desired
geographic boundary level to use when calculating measures.
Will be automatically assigned if undefined
- `normalization` (str, optional): The desired normalization.
One of 'area', 'prenormalized', or 'denominated'. 'Area' will
normalize the measure per square kilometer, 'prenormalized'
will return the original value, and 'denominated' will
normalize by a denominator.
- `denom_id` (str, optional): Measure ID from DO catalog
- `numer_timespan` (str, optional): The desired timespan for
the measurement. Defaults to most recent timespan available
if left unspecified.
- `geom_timespan` (str, optional): The desired timespan for the
geometry. Defaults to timespan matching `numer_timespan` if
left unspecified.
- `target_area` (str, optional): Instead of aiming to have
`target_geoms` in the area of the geometry passed as extent,
fill this area. Unit is square degrees WGS84. Set this to
`0` if you want to use the smallest source geometry for this
element of metadata, for example if you're passing in points.
- `target_geoms` (str, optional): Override global
`target_geoms` for this element of metadata
- `max_timespan_rank` (str, optional): Override global
`max_timespan_rank` for this element of metadata
- `max_score_rank` (str, optional): Override global
`max_score_rank` for this element of metadata
Returns:
pandas.DataFrame: A DataFrame representation of `table_name` which
has new columns for each measure in `metadata`.
"""
try:
with open(os.path.join(os.path.dirname(__file__),
'assets/data_obs_augment.sql'), 'r') as f:
augment_functions = f.read()
self.sql_client.send(augment_functions)
except Exception as err:
raise CartoException("Could not install `obs_augment_table` onto "
"user account ({})".format(err))
# augment with data observatory metadata
augment_query = '''
select obs_augment_table('{username}.{tablename}',
'{cols_meta}');
'''.format(username=self.username,
tablename=table_name,
cols_meta=json.dumps(metadata))
_ = self.sql_client.send(augment_query)
# read full augmented table
return self.read(table_name)
def _auth_send(self, relative_path, http_method, **kwargs):
self._debug_print(relative_path=relative_path,
http_method=http_method,
kwargs=kwargs)
res = self.auth_client.send(relative_path, http_method, **kwargs)
if isinstance(res.content, str):
return json.loads(res.content)
return json.loads(res.content.decode('utf-8'))
def _check_query(self, query, style_cols=None):
"""Checks if query from Layer or QueryLayer is valid"""
try:
self.sql_client.send('''
EXPLAIN
SELECT
{style_cols}{comma}
the_geom, the_geom_webmercator
FROM ({query}) _wrap;
'''.format(query=query,
comma=',' if style_cols else '',
style_cols=(','.join(style_cols)
if style_cols else '')))
except Exception as err:
raise ValueError(('Layer query `{query}` and/or style column(s) '
'{cols} are not valid: {err}.'
'').format(query=query,
cols=', '.join(['`{}`'.format(c)
for c in style_cols]),
err=err))
def _send_map_template(self, layers, has_zoom):
map_name = get_map_name(layers, has_zoom=has_zoom)
if map_name not in self._map_templates:
try:
self._auth_send('api/v1/map/named', 'POST',
headers={'Content-Type': 'application/json'},
data=get_map_template(layers,
has_zoom=has_zoom))
except ValueError('map already exists'):
pass
self._map_templates[map_name] = True
return map_name
def _get_iframe_srcdoc(self, config, bounds, options, map_options):
if not hasattr(self, '_srcdoc') or self._srcdoc is None:
with open(os.path.join(os.path.dirname(__file__),
'assets/cartoframes.html'), 'r') as f:
self._srcdoc = f.read()
return (self._srcdoc
.replace('@@CONFIG@@', json.dumps(config))
.replace('@@BOUNDS@@', json.dumps(bounds))
.replace('@@OPTIONS@@', json.dumps(map_options))
.replace('@@ZOOM@@', str(options.get('zoom', 3)))
.replace('@@LAT@@', str(options.get('lat', 0)))
.replace('@@LNG@@', str(options.get('lng', 0))))
def _get_bounds(self, layers):
"""Return the bounds of all data layers involved in a cartoframes map.
Args:
layers (list): List of cartoframes layers. See `cartoframes.layers`
for all types.
Returns:
dict: Dictionary of northern, southern, eastern, and western bounds
of the superset of data layers. Keys are `north`, `south`,
`east`, and `west`. Units are in WGS84.
"""
extent_query = ('SELECT ST_EXTENT(the_geom) AS the_geom '
'FROM ({query}) AS t{idx}\n')
union_query = 'UNION ALL\n'.join(
[extent_query.format(query=layer.query, idx=idx)
for idx, layer in enumerate(layers)
if not layer.is_basemap])
extent = self.sql_client.send('''
SELECT
ST_XMIN(ext) AS west,
ST_YMIN(ext) AS south,
ST_XMAX(ext) AS east,
ST_YMAX(ext) AS north
FROM (
SELECT ST_Extent(the_geom) AS ext
FROM ({union_query}) AS _wrap1
) AS _wrap2
'''.format(union_query=union_query))
return extent['rows'][0]
def _debug_print(self, **kwargs):
if self._verbose <= 0:
return
for key, value in dict_items(kwargs):
if isinstance(value, requests.Response):
str_value = ("status_code: {status_code}, "
"content: {content}").format(
status_code=value.status_code,
content=value.content)
else:
str_value = str(value)
if self._verbose < 2 and len(str_value) > 300:
str_value = '{}\n\n...\n\n{}'.format(str_value[:250],
str_value[-50:])
print('{key}: {value}'.format(key=key,
value=str_value))
def _process_credentials(api_key, base_url):
"""process credentials"""
# use stored api key (if present)
if (api_key is None) or (base_url is None):
from cartoframes import credentials as cfcreds
creds = cfcreds.credentials()
api_key = creds['api_key'] if api_key is None else api_key
base_url = creds['base_url'] if base_url is None else base_url
if (api_key == '') and (base_url == ''):
raise ValueError('No credentials are stored on this '
'installation and none were provided. Use '
'`cartoframes.credentials.set_credentials` '
'to store your access URL and API key for '
'this installation.')
if api_key == '':