In [2]:
#ODM stands of Object Document mapper
#Just like ORM like SQLAlchemy was for relational databases. ODM is for document databases like MongoDB

# It maps documents to python objects.
# MongoEngine is the ODM we will work with in our day to day lives. similar to Django ORM API

# Data modelling with document model classes.

# We will use python now to model the database instead of JSON

#ORMs work with realtional databases. These database mostly are 2 diemnsional tables. This allows SQLAclhemy to be used with multiple relational databases.
# ODMs like MongoEngine usually works for NoSQL database, specfically MongoDb

#MongoEngine API is very similar to Django ORM API. However, it is not a replacement for Django ORM API

#MongoEngine does not do anything to the MongoDB database. You could still interoperably use any other tool with MonoDb

#Install MongoEngine

%pip install mongoengine

Collecting mongoengine
  Downloading mongoengine-0.29.1-py3-none-any.whl.metadata (6.7 kB)
Downloading mongoengine-0.29.1-py3-none-any.whl (112 kB)
Installing collected packages: mongoengine
Successfully installed mongoengine-0.29.1
Note: you may need to restart the kernel to use updated packages.


In [None]:
#Just like SQLAlchemy where you had to create a base class from DerivativeClass to then subclass inot model classes.
# mongoengine also has a similar concept. You need to subclass Document class to start modeling collections,
# Unlike SQLAlchemy, you can directly use subclass to model collection.

from mongoengine import Document
#Field types for Mongo DB to immport from MongoEngine
from mongoengine import fields #fields submodule has most of the MongoDB Field types
import datetime

# will create a collection by name 'investments' by default in the Db
class Investment(Document):
    coin = fields.StringField(max_length=32)
    currency = fields.StringField(max_length=3)
    amount = fields.FloatField(min_value=0.00001)
    timestamp = fields.DateTimeField(default=datetime.datetime.now())
    sell = fields.BooleanField(default=False)
    
#Above was a simple model class using Mongoengine

#Let's now create a document from it
bitcoin = Investment(coin="bitcoin", currency="USD", amount=1.0)

#Adding to the database is just to call the save() method, it's that easy
bitcoin.save()

#To retreive the document from database we can use a class attribute from the document model called "objects".
# The objects class attribute is special attribute that can return all the instances of the documemt

#Retreives all documents
for investment in Investment.objects:
    print(investment.coin)
    
#query a document

usd_investment = Investment.objects.filter(currency="USD")
print(usd_investment)

#There are numerous operators that you append to the field name using double underscore
small_investment = Investment.objects.filter(amount__lt-1.0) #__lt is appended to the fieldname amount. this means return doucments that have amounts less that 1.0

#If you want to update an document. you can call update method, no need to call save()
usd_investment.update(amount=1.5)

#delete method will remove the document, again no need to call save()

usd_investment.delete()

ConnectionFailure: You have not defined a default connection

In [None]:
import datetime
import random

from mongoengine import connect, Document, EmbeddedDocument
from mongoengine import fields
import click

from utils import get_coin_prices


class Investment(Document):
    coin = fields.StringField(max_length=32)
    currency = fields.StringField(max_length=3)
    amount = fields.FloatField(min_value=0.00001)
    timestamp = fields.DateTimeField(default=datetime.datetime.now)
    sell = fields.BooleanField(default=False)

    def __str__(self):
        return f"<Investment | coin: {self.coin}, currency: {self.currency}, amount: {self.amount}>"


class WatchlistMetadata(EmbeddedDocument):
    currency = fields.StringField(max_length=3)
    description = fields.StringField()
    date_created = fields.DateField(default=datetime.datetime.now().date)


class WatchlistCoin(EmbeddedDocument):
    coin = fields.StringField(max_length=32)
    note = fields.StringField()
    date_added = fields.DateField(default=datetime.datetime.now().date)


class Watchlist(Document):
    name = fields.StringField(max_length=256)
    metadata = fields.EmbeddedDocumentField(WatchlistMetadata)
    coins = fields.EmbeddedDocumentListField(WatchlistCoin)

    def __str__(self):
        return f"<Watchlist name={self.name}, currency={self.metadata.currency} with {len(self.coins)} coin(s)>"


def _select_investment():
    investment_coins = Investment.objects.all().fields(coin=1)
    for index, coin in enumerate(investment_coins):
        print(f"{index + 1}: {coin.coin}")
    selected_investment_index = int(input("Select an investment: ")) - 1
    selected_investment_oid = investment_coins[selected_investment_index].id
    return Investment.objects(id=selected_investment_oid).first()


def _select_watchlist():
    watchlist_names = Watchlist.objects.all().fields(name=1)
    for index, name in enumerate(watchlist_names):
        print(f"{index + 1}: {name.name}")
    selected_watchlist_index = int(input("Select a watchlist: ")) - 1
    selected_watchlist_oid = watchlist_names[selected_watchlist_index].id
    return Watchlist.objects(id=selected_watchlist_oid).first()


def _seed_data():
    data = [
        ("bitcoin", "USD", 1.0, False),
        ("ethereum", "GBP", 10.0, True),
        ("dogecoin", "EUR", 100.0, False)
    ]

    watchlist_data = [
        ("Bulls", "Coins to buy", "USD", [
         ("bitcoin", "Bitcoin is number one!"), ("ethereum", "Ethereum is number two!")]),
        ("Bears", "Coins to sell", "GBP", [("solana", "Meh ...")])
    ]

    for row in data:
        Investment(
            coin=row[0],
            currency=row[1],
            amount=row[2],
            sell=row[3],
            timestamp=datetime.datetime.now() - datetime.timedelta(
                days=random.randint(0, 7), minutes=random.randint(0, 60), seconds=random.randint(0, 60)
            )).save()

    for row in watchlist_data:
        Watchlist(
            name=row[0],
            metadata=WatchlistMetadata(description=row[1], currency=row[2]),
            coins=[WatchlistCoin(coin=coin[0], note=coin[1])
                   for coin in row[3]]
        ).save()


@click.group()
def cli():
    pass


@click.command(help="Clear the database")
def clear_data():
    Investment.drop_collection()
    Watchlist.drop_collection()

    print("Cleared data!")


@click.command(help="Seed the database with sample data, use the --force flag to ignore existing data")
@click.option("--force", is_flag=True, default=False)
def seed_data(force):
    if force:
        _seed_data()
    elif Investment.objects.count() > 0:
        print("Data not empty!  Use --force flag to seed database")
    else:
        _seed_data()


@click.command(help="Add a new investment to the portfolio")
@click.option("--coin", prompt=True, help="The name of the coin")
@click.option("--currency", prompt=True, help="The fiat currency to show prices in")
@click.option("--amount", prompt=True, help="The purchase amount")
@click.option("--sell", is_flag=True, default=False, help="If this is a sell (default is False)")
def add_investment(coin, currency, amount, sell):
    investment = Investment(
        coin=coin,
        currency=currency,
        amount=amount,
        sell=sell
    )
    investment.save()

    print(
        f"Added {'buy' if not sell else 'sell'} for {amount} {coin} in {currency}")


@click.command(help="See the details of an investment")
def view_investment():
    selected_investment = _select_investment()
    coin_price = get_coin_prices([selected_investment.coin], selected_investment.currency.lower())[
        selected_investment.coin]
    print(f"You {'bought' if not selected_investment.sell else 'sold'} {selected_investment.amount} {selected_investment.coin} for {coin_price * selected_investment.amount} {selected_investment.currency}")


@click.command(help="Add a new watchlist to the portfolio")
@click.option("--name", help="The name of the watchlist", prompt=True)
@click.option("--description", help="The description of the watchlist", prompt=True)
@click.option("--currency", help="The currency to display coin prices in", prompt=True)
def add_watchlist(name, description, currency):
    metadata = WatchlistMetadata(currency=currency, description=description)
    watchlist = Watchlist(name=name, metadata=metadata, coins=[])
    watchlist.save()

    print(f"Added watchlist {name}")


@click.command(help="View the coins in a watchlist")
def view_watchlist():
    selected_watchlist = _select_watchlist()
    coins = [coin.coin for coin in selected_watchlist.coins]
    coin_prices = get_coin_prices(
        coins, selected_watchlist.metadata.currency.lower())
    print(
        f"Watchlist: {selected_watchlist.name} in {selected_watchlist.metadata.currency}")
    print(f"{selected_watchlist.metadata.description}")
    print("Coins: ")
    for index, coin in enumerate(coins):
        print(f"{index + 1}: {coin} | {coin_prices[coin]}")
    print("Prices provided by CoinGecko")


@click.command(help="Add a coin to a watchlist")
@click.option("--coin", help="The coin to add", prompt=True)
@click.option("--note", help="A note", prompt=True)
def add_coin(coin, note):
    selected_watchlist = _select_watchlist()
    selected_watchlist.coins.append(
        WatchlistCoin(coin=coin, note=note)
    )
    selected_watchlist.save()

    print(f"Added {coin} to {selected_watchlist.name}")


cli.add_command(add_coin)
cli.add_command(add_investment)
cli.add_command(add_watchlist)
cli.add_command(clear_data)
cli.add_command(seed_data)
cli.add_command(view_watchlist)
cli.add_command(view_investment)

if __name__ == "__main__":
    connect("portfolio_me")
    cli()


1: bitcoin
2: ethereum
3: dogecoin
4: solana
You bought 1000.0 solana for 13234950.0 inr


In [3]:
#Embedded documents and lists

#for model that is to be embedded in another doucment or be in a list, the document model class should derive from EmbeddedDocument
#If you try to embed document class you saw earlier, it will throw an error

from mongoengine import Document, EmbeddedDocument, fields
import datetime

class WatchlistMetadata(EmbeddedDocument):
    #now model embedded fields as needed like shown below
    currency= fields.StringField(max_length=3)
    description= fields.StringField()
    date_created= fields.DateField(default=datetime.datetime.now().date)
    

class WatchlistCoins(EmbeddedDocument):
    coin= fields.StringField(max_length=32)
    note= fields.StringField()
    date_added = fields.DateField(default=datetime.datetime.now().date)
    
class Watchlist(Document):
    name = fields.StringField(max_length=256)
    metadata = fields.EmbeddedDocumentField(WatchlistMetadata) #Metadata is an embedded document field, we have to give it a EmbeddedDocument model class type to expect
    coins= fields.EmbeddedDocumentListField(WatchlistCoins) #coins is an list of embedded documents of type Watchcoin
    
#creation of embedded document is no different than creating a document 
metadata = WatchlistMetadata(description="It's a watchlist", currency="USD")

#Assigning an embedded document to a doucment
watchlist = Watchlist(name="My Watchlist", metadata=metadata, coins=[])

#Embedded document list fieilds support methods like append

watchlist.coins.append(WatchlistCoins(coin="dogecoin", note="It's a joke"))

watchlist.save() #This will save all the document, EmbeddedDocuementsFields and EmbeddedDocuemntListFields

#Querying embedded documents

one_week = datetime.datetime.now().date - datetime.timedelta(days=7)

recent_lists = watchlist.objects(metadata__date_created__gte=one_week) #The filter passed to the objects was like this DocumentField__EmbeddedDocumentField__operator = value

#Querying embedded document fields
bitcoin_watchlist = watchlist.objects(coins__coin="bitcoin").first() # coins__coin is equivalent to saying coins.coin first() returns the first Document that matched the filter.



ConnectionFailure: You have not defined a default connection

In [None]:
# Final implementation including embedded documents and relationships.
# We now have a watchlist document in our database that has a list of coins as investments, and some metadata.

import click, datetime
from mongoengine import Document, EmbeddedDocument, fields, connect
from bson import ObjectId
from MongoDemoUtils import get_coin_price, seed_data as utils_seed_data


connect("watchlists")


#Document model classes
class WatchlistMetadata(EmbeddedDocument):
    #now model embedded fields as needed like shown below
    currency= fields.StringField(max_length=3)
    description= fields.StringField()
    date_created= fields.DateField(default=datetime.datetime.now().date)
    

class WatchlistCoins(EmbeddedDocument):
    coin= fields.StringField(max_length=32)
    note= fields.StringField()
    date_added = fields.DateField(default=datetime.datetime.now().date)
    
class Watchlist(Document):
    name = fields.StringField(max_length=256)
    metadata = fields.EmbeddedDocumentField(WatchlistMetadata) #Metadata is an embedded document field, we have to give it a EmbeddedDocument model class type to expect
    coins= fields.EmbeddedDocumentListField(WatchlistCoins) #coins is an list of embedded documents of type Watchcoin


#Helper methods

def _get_all_watchlist_names():
    return list(watchlists.find({},{"name": 1})) #Even though you set a projection document specefying you just want a name attribute, it will still have object Id of the docuemnt in the Cursor object

def _select_watchlist(watchlist_names): #Takes in a cursor object with just names of the watchlist
    for index, watchlist in enumerate(watchlist_names):
        print(f"{index + 1}: {watchlist['name']} ")
    selected_watchlist_index = int(input("Select a watchlist: ")) - 1
    selected_watchlist_id = watchlist_names[selected_watchlist_index]["_id"] #_id attribute stores the document ID
    return watchlists.find_one({"_id": ObjectId(selected_watchlist_id)}) #find_one returns a dictionary directly rather than a Cursor object.

def _select_coin_from_watchlist(watchlist):
    for index, coin in enumerate(watchlist['coins']):
        print(f"{index + 1}: {coin['coin']} ")
    selected_coin_index = int(input("Select a coin: ")) - 1
    return watchlist['coins'][selected_coin_index]

def _add_coin_to_watchlist(watchlist_oid, coin):
    filter = {"_id": ObjectId(watchlist_oid)}
    update = {"$push": {"coins": coin}} # $push operator actually adds the document passed to a list within the doucment
    watchlists.update_one(filter,update)    
    
def _remove_a_coin_from_watchlist(watchlist_oid, coin):
    filter = {"_id": ObjectId(watchlist_oid)}
    update = {"$pull": {"coins": coin}}
    watchlists.update_one(filter,update)

@click.group()
def cli():
    pass

@cli.command(help="Seed the database.")
@click.option("--force", is_flag=True, help="Seed even if database is not empty.")
def seed_data(force):
    if force:
        utils_seed_data(watchlists)
    elif watchlists.count_documents({}) > 0:
        print("The database is not empty, use the --force option or the clear-data command")
    else:
        utils_seed_data(watchlists)
    
@cli.command(help = "View the coins and current prices of the data")
def view_watchlists():
    selected_watchlist = _select_watchlist(_get_all_watchlist_names())
    
    print(f"Watchlist: {selected_watchlist['name']} in {selected_watchlist['metadata']['currency']}")
    print(f"Description: {selected_watchlist['metadata']['description']}")
    print(f"{"-" * 25}")
    print("Coins:")
    coin_prices = get_coin_price([coin['coin'] for coin in selected_watchlist['coins']], selected_watchlist['metadata']['currency'])
    for index, coin in enumerate(selected_watchlist['coins']):
        print(f"{str(index + 1).rjust(3, " ")}: {coin['coin']} = {coin['note']}")
        print(f"       Current price: {coin_prices[coin['coin']]} ")
    print("Prices provided by CoinGecko")
        

@cli.command(help="Add a new watchlist to the portfolio")
@click.option("--name", help="Name of the watchlist")
@click.option("--description", help="Description of the watchlist")
@click.option("--currency", help="Currency to display prices in", default="usd")
def add_watchlist(name, description, currency):
    metadata = {
        "description": description,
        "currency": currency,
        "date_added": datetime.datetime.now()
    }
    watchlist = {
        "name": name,
        "metadata": metadata,
        "coins": []
    }
    watchlists.insert_one(watchlist)
    
    print(f"Added {name} watchlist")
    
@cli.command(help="Add a coin to a watchlist")
@click.option("--coin", help="The coin id to add")
@click.option("--note", help="A description about the coin")
def add_coin(coin, note):
    selected_watchlist = _select_watchlist(_get_all_watchlist_names())
    coin_to_add = {
        "coin": coin,
        "note": note,
        "date_added": datetime.datetime.now()
    }
    _add_coin_to_watchlist(selected_watchlist['_id'], coin_to_add)
    print(f"Added {coin} to the {selected_watchlist['name']} watchlist")
    
@cli.command(help="Remove a coin from the watchlist")
def remove_coin():
    selected_watchlist = _select_watchlist(_get_all_watchlist_names())
    selected_coin = _select_coin_from_watchlist(selected_watchlist)
    
    _remove_a_coin_from_watchlist(selected_watchlist["_id"], selected_coin)
    print(f"Removed {selected_coin['coin']} from {selected_watchlist['name']} watchlist")
    
@cli.command()
def show_db():
    results = investments.find({})
    [print(result) for result  in results]

# Function to run commands in Jupyter
def run_command(command, *args):
    try:
        cli.main(args=[command] + list(args), standalone_mode=False)
    except SystemExit:
        pass
    
@cli.command()
def clear_data():
    database.drop_collection(watchlists)
    print("All data cleared from the watchlists collection")

# # Example usage in Jupyter
run_command('clear-data')
run_command('seed-data')
run_command('view-watchlists')
run_command('add-watchlist', '--name', 'My Watchlist', '--currency', 'usd', '--description', 'My Watchlist')
run_command('add-coin', '--coin', 'dogecoin', '--note', "It's a joke")
run_command('remove-coin')

NameError: name 'database' is not defined