In [None]:
# default_exp db
# hide
_FNAME='db'

import unittest
from unittest import mock
from nbdev.export import notebook2script
import os

TESTCASE = unittest.TestCase()
_nbpath = os.path.join(_dh[0], _FNAME+'.ipynb')

In [None]:
#export
import time
from neo4j.exceptions import ClientError
from mranderson.db import Query, fetch
import mranderson.node as node
import mranderson.constraints as constraints

In [None]:
from mranderson.sandbox import start_neo4j, start_fresh, create_totem

In [None]:
container_name = 'sewing_test_db'
container, driver = start_neo4j(container_name=container_name)

In [None]:
#export
def ensure_unique_nodes():
    results = {}
    for n in ('Guild', 'User', 'Channel'):
        constraints.uniqueness(n, 'id')
    return results
    

In [None]:
TESTCASE.assertTrue(start_fresh())
create_totem()
ensure_unique_nodes()
ensure_unique_nodes()

{}

In [None]:
#export

def guild_node(guild, variable_name='guild'):
    gid = guild.id
    return node.txt('Guild', variable=variable_name, id=gid) 

def user_node(user, variable_name='user'):
    name = user.name
    uid = user.id
    discrim = user.discriminator
    return node.txt('User', variable=variable_name, id=uid, name=name, discriminator=discrim)

def channel_node(channel, variable_name='channel'):
    return node.txt('Channel', variable=variable_name, id=channel.id, name=channel.name)

In [None]:
from dataclasses import dataclass

@dataclass
class FakeGuild:
    id='abc123'
    name='fake'

@dataclass
class FakeUser:
    id='user789'
    name='user#789'
    discriminator='789'
    
@dataclass
class FakeChannel:
    id='89123'
    name='fakechannel'
    
TESTCASE.assertEqual(guild_node(FakeGuild()), '(guild:Guild {id: "abc123"})')
TESTCASE.assertEqual(user_node(FakeUser()), '(user:User {id: "user789",name: "user#789",discriminator: "789"})')


In [None]:
#export

def ensure_guild(guild):
    gnode = guild_node(guild)
    node.ensure('Guild', 
                search_properties={'id': guild.id},
                node_properties={'name': guild.name})
    
def guilds() -> list:
    q = Query()
    q.add("MATCH (guild:Guild) RETURN guild")
    return q.data()

In [None]:
def test_ensure_guild():
    TESTCASE.assertTrue(start_fresh())
    create_totem()
    fg = FakeGuild()
    ensure_guild(fg)
    ensure_guild(fg)    
    ensure_guild(fg)
    
    q = Query()
    q.add("MATCH")
    q.add(guild_node(fg))
    q.add('RETURN count(guild) as num')    
    resp = q.single()
    TESTCASE.assertEqual(resp, 1)
    print("SUCCESS")
test_ensure_guild()

SUCCESS


In [None]:
#export

def register_user(user, guild):
    ensure_guild(guild)
    
    unode = user_node(user)
    if guild:
        gnode = guild_node(guild)
    q = Query()
    q.add("MERGE")
    q.add(unode)
    q.add("WITH user")
    q.add("MATCH")
    q.add(gnode)
    q.add("WITH user, guild")
    q.add("MERGE (user)-[:BELONGS_TO]->(guild)")
    return q.create()

def unregister_user(user, guild):
    q = Query()
    guild_n = guild_node(guild)
    q.add('MATCH')
    q.add(user_node(user))
    q.add('-[belongs:BELONGS_TO]->')
    q.add(guild_n)
    q.add("DELETE belongs")
    return q.create()
    

In [None]:
#export
def set_email(user, email):
    unode = user_node(user)    
    q = Query()
    q.add("MERGE")
    q.add(unode)
    q.add("WITH user")
    q.add("SET user.email='{}'".format(email))
    return q.create()

def get_email(user):
    unode = user_node(user)    
    q = Query()
    q.add("MATCH")
    q.add(unode)
    q.add("RETURN user.email as email")
    return q.only() 

def get_guild_emails(guild):
    gnode = guild_node(guild)
    q = Query()
    q.add("MATCH")
    q.add(gnode)
    q.add('<-[:BELONGS_TO]-(user:User)')
    q.add("RETURN user.email as email")
    data = q.data() 
    return [entry['email'] for entry in data]
    

In [None]:
#export
def seeall():
    q = Query()
    q.add("MATCH (n) return n")
    return q.data()

#seeall()


In [None]:
def test_register():
    TESTCASE.assertTrue(start_fresh())
    create_totem()
    
    user = FakeUser()
    
    ensure_guild(FakeGuild())
    print(seeall())
    
    resp = register_user(user, FakeGuild())
    print(seeall())
    TESTCASE.assertEqual(resp.relationships_created, 1)
    TESTCASE.assertEqual(resp.nodes_created, 1)

    TESTCASE.assertIsNone(get_email(user)['email'])
    #run it again
    resp = register_user(user, FakeGuild())
    TESTCASE.assertEqual(resp.relationships_created, 0)
    TESTCASE.assertEqual(resp.nodes_created, 0)
    TESTCASE.assertEqual(resp.properties_set, 0)
    
    set_email(user, 'adifferent@email.com')

    TESTCASE.assertEqual(fetch('User', id='user789')['email'], 'adifferent@email.com')
    resp = unregister_user(FakeUser(), FakeGuild())
    TESTCASE.assertEqual(resp.relationships_deleted, 1)
    
    TESTCASE.assertEqual(get_email(user)['email'], 'adifferent@email.com')
    print("SUCCESS")
test_register()

[{'n': {'eid': 'totem', 'is_test_data': True}}, {'n': {'name': 'fake', 'id': 'abc123'}}]
[{'n': {'name': 'user#789', 'id': 'user789', 'discriminator': '789'}}, {'n': {'eid': 'totem', 'is_test_data': True}}, {'n': {'name': 'fake', 'id': 'abc123'}}]
SUCCESS


In [None]:
def test_unregister():
    TESTCASE.assertTrue(start_fresh())
    create_totem()
    resp = unregister_user(FakeUser(), FakeGuild())    
    TESTCASE.assertEqual(resp.relationships_deleted, 0)
    print("SUCCESS")
test_unregister()


SUCCESS


In [None]:
#export
def add_channel(channel, guild):
    ensure_guild(guild)
    
    q = Query()
    q.add("MERGE")
    q.add(channel_node(channel))
    q.add("WITH channel")
    q.add("MATCH")
    q.add(guild_node(guild))
    q.add("WITH channel, guild")
    q.add("MERGE (channel)-[:IN]->(guild)")
    return q.create()    

def remove_channel(channel, guild):
    q = Query()
    q.add("MATCH")
    q.add(channel_node(channel))
    q.add('-[:IN]->')
    q.add(guild_node(guild))
    q.add("DETACH DELETE channel")
    return q.create()    

def summarized_channels(guild, names_only=True):
    q = Query()
    q.add("MATCH")
    q.add(guild_node(guild))
    q.add('<-[:IN]-')
    q.add('(channel:Channel)')
    q.add("RETURN channel")
    resp = q.data()
    if names_only:
        return [chan['channel']['name'] for chan in resp]
    else:
        return resp

In [None]:
def test_add_channel():
    TESTCASE.assertTrue(start_fresh())
    create_totem()
    
    resp = add_channel(FakeChannel(), FakeGuild())
    TESTCASE.assertEqual(resp.relationships_created, 1)
    TESTCASE.assertEqual(resp.nodes_created, 1)

    #run it again
    resp = add_channel(FakeChannel(), FakeGuild())
    TESTCASE.assertEqual(resp.relationships_created, 0)
    TESTCASE.assertEqual(resp.nodes_created, 0)
    TESTCASE.assertEqual(resp.properties_set, 0)

    TESTCASE.assertEqual(fetch('Channel', id='89123')['name'], 'fakechannel')
    
    print(summarized_channels(FakeGuild(), names_only=False))

    
    resp = remove_channel(FakeChannel(), FakeGuild())
    TESTCASE.assertEqual(resp.nodes_deleted, 1)
    TESTCASE.assertEqual(resp.relationships_deleted, 1)
    print("SUCCESS")
test_add_channel()

[{'channel': {'name': 'fakechannel', 'id': '89123'}}]
SUCCESS


In [None]:
#export
def add_suggestion(user, *args, ts=None):
    ts = ts or time.time()
    suggestion = ' '.join(args)
    q = Query()
    q = Query()
    q.add("MERGE")
    q.add(user_node(user))
    q.add("MERGE")
    q.add(node.txt('Suggestion', 'suggestion', body=suggestion, ts=ts))
    q.add("WITH user, suggestion")
    q.add("MERGE (user)-[:MADE]->(suggestion)")
    return q.create()

In [None]:
def test_add_suggestion():
    TESTCASE.assertTrue(start_fresh())
    create_totem()
    resp = add_suggestion(FakeUser(), 'make', 'it', 'good')
    TESTCASE.assertTrue(resp.nodes_created, 2)
    resp = add_suggestion(FakeUser(), 'make it better')
    TESTCASE.assertTrue(resp.nodes_created, 1)

    q = Query()
    q.add("MATCH (s:Suggestion) return count(s) as num")
    TESTCASE.assertEqual(q.only()['num'], 2)
test_add_suggestion()

In [None]:
#export
def add_report(guild, ts):
    gnode = guild_node(guild)
    q = Query()
    q.add("MATCH")
    q.add(gnode)
    q.add("SET guild.last_report={}".format(ts))
    return q.create()

def last_report(guild):
    gnode = guild_node(guild)
    q = Query()
    q.add("MATCH")
    q.add(gnode)
    q.add("RETURN guild.last_report as last_report")
    return q.single()
    

In [None]:
def test_last_report():
    TESTCASE.assertTrue(start_fresh())
    create_totem()
    fg = FakeGuild()
    ensure_guild(fg)

    TESTCASE.assertIsNone(last_report(fg))
    add_report(fg, 100)
    TESTCASE.assertEqual(last_report(fg), 100)
    print("SUCCESS")
test_last_report()

SUCCESS


In [None]:
#export
import logging
logger = logging.getLogger()
from sewing import is_main, start_log
import pandas as pd

def set_last_report(guild_name, ts):
    """
    Sets the last report time to be ts
    
    :param guild_name: the name of the guild
    :param ts: The time to set as a pandas to_datetime legible string.
    """
    lr_time = pd.to_datetime(ts)
    q = Query()
    q.add("MATCH")
    q.add('(guild:Guild) where guild.name="{}"'.format(guild_name))
    q.add("SET guild.last_report={}".format(lr_time.value/10**9))
    q.add("RETURN guild.last_report as last_report")
    return q.only()

if is_main(globals()):
    logger = start_log()
    logger.setLevel(logging.INFO)
    from clize import run
    run(set_last_report)
    
    

In [None]:
notebook2script(_nbpath)

Converted db.ipynb.
