In [64]:
from datetime import datetime
import ssl
from config import datalake_url
from pymongo import MongoClient
from pprint import pprint

# Create connection objects

In [65]:
dl_atlas_conn = MongoClient(datalake_url, ssl_cert_reqs=ssl.CERT_NONE)['esteininger-personal-datalake-atlas']
dl_s3_conn = MongoClient(datalake_url, ssl_cert_reqs=ssl.CERT_NONE)['esteininger-personal-datalake-s3']

# Out to S3

In [None]:
# splitting collection in half
pipeline = [
    {
        '$match': {
            'inc': {
                '$lt': 25
            }
        }
    },
    {
        '$out': {
            's3': {
                'bucket': 'esteininger-personal-datalake',
                'filename': 'analytics',
                "region": "us-east-2",
                'format': {
                    'name': 'json'
                }
            }
        }
    }
]

c = dl_atlas_conn.clickstream.aggregate(pipeline)
pprint(list(c))
pprint('Archive created!')


# Create the schema

In [75]:
# optionally, can exclude some key value pairs (defaults to taking a sample size of 1000 docs)

s3_schema = dl_s3_conn.command({'sqlGenerateSchema': 1, 'setSchemas': True})
atlas_schema = dl_atlas_conn.command({'sqlGenerateSchema': 1, 'setSchemas': True})

print("s3 sql schema", s3_schema)
print("atlas sql schema", atlas_schema)

s3 sql schema {'ok': 1, 'schemas': [{'databaseName': 'esteininger-personal-datalake-s3', 'namespaces': [{'name': 'analytics.1', 'schema': {'version': 1, 'jsonSchema': {'bsonType': ['object'], 'properties': {'options': {'bsonType': ['object']}, '_id': {'bsonType': ['objectId']}, 'event': {'bsonType': ['string']}, 'properties': {'bsonType': ['object'], 'properties': {'url': {'bsonType': ['string']}, 'path': {'bsonType': ['string']}, 'referrer': {'bsonType': ['string']}, 'foo': {'bsonType': ['string']}, 'lorem': {'bsonType': ['string']}, 'title': {'bsonType': ['string']}}}, 'user_id': {'bsonType': ['string']}, 'meta': {'bsonType': ['object'], 'properties': {'timestamp': {'bsonType': ['double']}}}, 'inc': {'bsonType': ['int']}}}}}, {'name': 'less_than_25.1', 'schema': {'version': 1, 'jsonSchema': {'bsonType': ['object'], 'properties': {'event': {'bsonType': ['string']}, 'properties': {'bsonType': ['object'], 'properties': {'url': {'bsonType': ['string']}, 'path': {'bsonType': ['string']}, 

# Show the Schema was created

In [76]:
atlas_clickstream = dl_atlas_conn.command({'sqlGetSchema': "clickstream"})

s3_analytics = dl_atlas_conn.command({'sqlGetSchema': "clickstream"})

print(atlas_clickstream)
print(s3_analytics)

{'ok': 1, 'metadata': {'description': 'set using sqlGenerateSchema with setSchemas = true'}, 'schema': {'version': 1, 'jsonSchema': {'bsonType': ['object'], 'properties': {'meta': {'bsonType': ['object'], 'properties': {'timestamp': {'bsonType': ['double']}}}, 'inc': {'bsonType': ['int']}, 'options': {'bsonType': ['object']}, '_id': {'bsonType': ['objectId']}, 'event': {'bsonType': ['string']}, 'properties': {'bsonType': ['object'], 'properties': {'url': {'bsonType': ['string']}, 'path': {'bsonType': ['string']}, 'referrer': {'bsonType': ['string']}, 'foo': {'bsonType': ['string']}, 'lorem': {'bsonType': ['string']}, 'title': {'bsonType': ['string']}}}, 'user_id': {'bsonType': ['string']}}}}}
{'ok': 1, 'metadata': {'description': 'set using sqlGenerateSchema with setSchemas = true'}, 'schema': {'version': 1, 'jsonSchema': {'bsonType': ['object'], 'properties': {'meta': {'bsonType': ['object'], 'properties': {'timestamp': {'bsonType': ['double']}}}, 'inc': {'bsonType': ['int']}, 'option

# Query MongoDB cluster

In [77]:
sql = "select * from `clickstream` limit 2"

pipeline = [
    {
        '$sql': {
            'statement': sql,
            'format': "jdbc",
            'dialect': "mysql",
        }
    }
]

a_sql_q = dl_atlas_conn.aggregate(pipeline)
pprint(list(a_sql_q))

[{'values': [{'bsonType': 'objectId',
              'column': '_id',
              'columnAlias': '_id',
              'database': 'esteininger-personal-datalake-atlas',
              'table': 'clickstream',
              'tableAlias': 'clickstream',
              'value': ObjectId('5f68ba9d7f2fce2fedd6d17d')},
             {'bsonType': 'string',
              'column': 'event',
              'columnAlias': 'event',
              'database': 'esteininger-personal-datalake-atlas',
              'table': 'clickstream',
              'tableAlias': 'clickstream',
              'value': 'pageStart'},
             {'bsonType': 'long',
              'column': 'inc',
              'columnAlias': 'inc',
              'database': 'esteininger-personal-datalake-atlas',
              'table': 'clickstream',
              'tableAlias': 'clickstream',
              'value': 0},
             {'bsonType': 'double',
              'column': 'meta.timestamp',
              'columnAlias': 'meta.timestamp'

# Query S3 via Atlas Data Lake

In [78]:
sql = "select * from `analytics.1` limit 2"

pipeline = [
    {
        '$sql': {
            'statement': sql,
            'format': "jdbc",
            'dialect': "mysql",
        }
    }
]

r = dl_s3_conn.aggregate(pipeline)
pprint(list(r))

[{'values': [{'bsonType': 'objectId',
              'column': '_id',
              'columnAlias': '_id',
              'database': 'esteininger-personal-datalake-s3',
              'table': 'analytics.1',
              'tableAlias': 'analytics.1',
              'value': ObjectId('5f68ba9d7f2fce2fedd6d17d')},
             {'bsonType': 'string',
              'column': 'event',
              'columnAlias': 'event',
              'database': 'esteininger-personal-datalake-s3',
              'table': 'analytics.1',
              'tableAlias': 'analytics.1',
              'value': 'pageStart'},
             {'bsonType': 'long',
              'column': 'inc',
              'columnAlias': 'inc',
              'database': 'esteininger-personal-datalake-s3',
              'table': 'analytics.1',
              'tableAlias': 'analytics.1',
              'value': 0},
             {'bsonType': 'double',
              'column': 'meta.timestamp',
              'columnAlias': 'meta.timestamp',
       