In [1]:
from json import loads
from pymongo import MongoClient, GEOSPHERE
from os import getenv

In [2]:
password = getenv('MONGODB_ROOT_PASSWORD')
client = MongoClient('mongodb://root:{}@my-mongodb:27017'.format(password))
db = client['geotest-db']

In [4]:
geo_collection = db['2020_pop_geojson']

Create an index on the geometry property, which will contain all geographical data. Makes searching faster.

In [5]:
geo_collection.create_index([('geometry', GEOSPHERE)])

'geometry_2dsphere'

Consume all Kafka messages one-by-one and insert values into database.

In [None]:
from kafka import KafkaConsumer
from tqdm.notebook import tqdm

consumer = KafkaConsumer(
  '2020_pop_points',
  bootstrap_servers=['my-kafka:9092'], 
  auto_offset_reset='earliest',
  enable_auto_commit=False,
  value_deserializer= lambda x: loads(x.decode('utf-8'))
)

for message in tqdm(consumer):
    geo_collection.insert_one(message.value) # deserialized to json

In [22]:
features = geo_collection.find({'geometry' : {'$within': {'$box' : [[100, 100],[0,0]]}}}).limit(5)

for f in features:
    print(f)

{'_id': ObjectId('5e7716c61340a85849bbed77'), 'type': 'Feature', 'properties': {'raster_val': '0.00079989445'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}
{'_id': ObjectId('5e7716d31340a85849bbed78'), 'type': 'Feature', 'properties': {'raster_val': '0.00849228'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}
{'_id': ObjectId('5e7716d41340a85849bbed79'), 'type': 'Feature', 'properties': {'raster_val': '0.017656343'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}
{'_id': ObjectId('5e7716d51340a85849bbed7a'), 'type': 'Feature', 'properties': {'raster_val': '0.0330576'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}
{'_id': ObjectId('5e7716d61340a85849bbed7b'), 'type': 'Feature', 'properties': {'raster_val': '0.027320117'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}


In [21]:
features = geo_collection.find({'geometry' : {'$nearSphere': [62.21541, 34.34125]}}).limit(5)

for f in features:
    print(f)

{'_id': ObjectId('5e7716c61340a85849bbed77'), 'type': 'Feature', 'properties': {'raster_val': '0.00079989445'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}
{'_id': ObjectId('5e7716d41340a85849bbed79'), 'type': 'Feature', 'properties': {'raster_val': '0.017656343'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}
{'_id': ObjectId('5e7716d71340a85849bbed7d'), 'type': 'Feature', 'properties': {'raster_val': '0.04882768'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}
{'_id': ObjectId('5e77189e1340a85849bbed85'), 'type': 'Feature', 'properties': {'raster_val': '0.027320117'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}
{'_id': ObjectId('5e77189e1340a85849bbed95'), 'type': 'Feature', 'properties': {'raster_val': '0.14413558'}, 'geometry': {'type': 'Point', 'coordinates': [62.13208309980001, 33.86625006591001]}}
