# Models

In [127]:
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base

from pathlib import Path
from datetime import datetime
from sqlalchemy.orm import sessionmaker

In [128]:
Base = declarative_base()
class Repo( Base ):
    __tablename__ = 'repos'
    id              = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
    repo_name       = sa.Column(sa.String,   nullable=False)
    ssh_url         = sa.Column(sa.String,   nullable=False, unique=True) 
    onboarded_snyk  = sa.Column(sa.Boolean,  default=False)
    onboarded_ghas  = sa.Column(sa.Boolean,  default=False)
    active          = sa.Column(sa.Boolean,  default=True)
    archived        = sa.Column(sa.Boolean,  default=False)
    created         = sa.Column(sa.DateTime, nullable=False)
    updated         = sa.Column(sa.DateTime, nullable=False)

    snyk_issues = sa.relationship("snyk_findings", back_populates="parent")

    def __repr__(self):
        msg = """<Repo(id={}, repo_name='{}', ssh_url='{}', onboarded_snyk={}, onboarded_ghas={}, created='{}', updated='{}')>"""
        msg = msg.format(self.id, self.repo_name, self.ssh_url, self.onboarded_snyk, self.onboarded_ghas, self.created, self.updated)
        return msg


class SnykFinding( Base ):
    __tablename__ = 'snyk_findings'

    id               = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
    ssh_url          = sa.Column(sa.String, sa.ForeignKey("repos.ssh_url"))
    status_onboarded = sa.Column(sa.Boolean)
    finding          = sa.Column(sa.String)
    severity         = sa.Column(sa.Integer)
    created          = sa.Column(sa.DateTime)
    updated          = sa.Column(sa.DateTime)

    parent = sa.relationship("repos", back_populates="snyk_issues")

    def __repr__(self):
        msg = """<ResultIndexItem(id={}, ssh_url='{}', finding='{}',  severity='{}',  created='{}',  updated='{}')>"""
        msg = msg.format(self.id, self.ssh_url, self.finding, self.created, self.updated)
        return msg

In [124]:

from collections import namedtuple
SnykFindingSeverity = namedtuple("SnykFindingSeverity", "critical high medium low info")
SNYKSEVERITY = SnykFindingSeverity(1, 2, 3, 4, 5)

class DbTester:
    
    def __init__(self, drop_all=False):
        sql_file = Path('./test.sqlite3')
        engine = sa.create_engine('sqlite:///' + str(sql_file.resolve()), echo=False)
        
        if (drop_all):
            Base.metadata.drop_all(engine)
        
        Base.metadata.create_all(engine)
        self.Session = sa.orm.sessionmaker(bind=engine)


    def CreateRepo(self, repo_name, ssh_url ):
        now = datetime.now()
        item = Repo(repo_name=repo_name, ssh_url=ssh_url, created=now, updated=now) 
        with self.Session() as session: 
            session.add(item)
            session.commit()
        return

    def ReadRepos(self, ssh_url=None):
        results = None
        with self.Session() as session: 
            results = session.query(Repo)
            if (ssh_url):
                results = results.filter(ssh_url == ssh_url)

        return results

    def UpdateRepo(self):
        pass

    def DeleteRepo(self, ssh_url=None):
        with self.Session() as session: 
            results = session.query(Repo)
            if (ssh_url):
                results = results.filter(ssh_url == ssh_url)
            session.delete(results)
            session.commit()
        return


    def CreateSnykFinding(self, ssh_url, finding, severity ):
        now = datetime.now()
        item = SnykFinding(ssh_url=ssh_url, finding=finding, severity=severity, created=now, updated=now) 
        with self.Session() as session: 
            session.add(item)
            session.commit()
        return

    # def ReadSnykFindings(self, ssh_url):
    #     results = None
    #     with self.Session() as session: 
    #         results = session.query(Repo).filter(ssh_url == ssh_url)
    #     return results        

    # def UpdateSnykFinding(self):
    #     pass

    # def DeleteSnykFinding(self):
    #     pass


In [126]:
db = DbTester(drop_all=True)
db.CreateRepo(ssh_url='git@github.com:/abcd.git', repo_name='abcd')
db.ReadRepos() 


<Repo(id=None, repo_name='abcd', ssh_url='git@github.com:/abcd.git', onboarded_snyk=None, onboarded_ghas=None, created='2022-06-15 18:05:29.184026', updated='2022-06-15 18:05:29.184026')>


<sqlalchemy.orm.query.Query at 0x1d198b58a30>

# My Code

In [114]:
Base = declarative_base()

class Repo( Base ):
    __tablename__ = 'repos'

    #id              = Column(Integer, primary_key=True)
    id              = sa.Column(sa.Integer, primary_key=True, autoincrement=True)  #
    repo_name       = sa.Column(sa.String,   nullable=False)
    ssh_url         = sa.Column(sa.String,   nullable=False) #, ForeignKey("snyk_findings.ssh_url"))
    onboarded_snyk  = sa.Column(sa.Boolean,  nullable=False, default=False)
    onboarded_ghas  = sa.Column(sa.Boolean,  nullable=False, default=False)
    created         = sa.Column(sa.DateTime, nullable=False)
    updated         = sa.Column(sa.DateTime, nullable=False)

    # children = relationship("SnykFinding", back_populates="parent")

    def __repr__(self):
        msg = """<ResultIndexItem(id={}, repo_name='{}', ssh_url='{}', onboarded_snyk='{}', onboarded_ghas='{}', created='{}', updated='{}')>"""
        msg = msg.format(self.id, self.repo_name, self.ssh_url, self.onboarded_snyk, self.onboarded_ghas, self.created, self.updated)
        return msg


class test: 
    def __init__(self):
        # self.engine = sa.create_engine('sqlite://', echo=False)
        # Base.metadata.create_all(self.engine)
        # self.Session = orm.sessionmaker(self.engine)

        sql_file = Path('./test.sqlite3')
        engine = sa.create_engine('sqlite:///' + str(sql_file.resolve()), echo=False)
        Base.metadata.create_all(engine)
        self.Session = sessionmaker(bind=engine)



    def CreateRepo(self, repo_name=None, ssh_url=None): 
        now = datetime.now()
        item = Repo(repo_name=repo_name, ssh_url=ssh_url, created=now, updated=now) 
        print( item )

        with self.Session() as session: 
            session.add(item)
            session.commit()

        #---

tst = test() 
tst.CreateRepo(repo_name='abc', ssh_url='git@github.com:/abcd.git')

<ResultIndexItem(id=None, repo_name='abc', ssh_url='git@github.com:/abcd.git', onboarded_snyk='None', onboarded_ghas='None', created='2022-06-15 17:57:28.630849', updated='2022-06-15 17:57:28.630849')>


In [46]:
with Session() as session: 
    result = session.query(Repo).first()

print(result)

<ResultIndexItem(id=1, repo_name='abc', ssh_url='git@github.com:/abcd.git', onboarded_snyk='False', onboarded_ghas='False', created='2022-06-15 17:23:12.953072', updated='2022-06-15 17:23:12.953072')>


In [47]:
with Session() as session: 
    result = session.query(Repo).first()

print(result)
with Session() as session: 
    session.delete(result)
    session.commit()

with Session() as session: 
    result = session.query(Repo).first()

print(result)

None
