In [2]:
# Compliance Cow open sourced library. See https://github.com/ContiNube/compliancecow-data-library
from compliancecow.models import cowlib, cowreport
from compliancecow.utils import utils, dictutils, cowreportsutils


In [17]:
#   You can connect with the ComplianceCow API by following ways.

#   Way 1   :   You can connect with authtoken by login in UI or You can create the JWT token from the endpoint provided to you
client = cowlib.Client(auth_token="")

#   Way 2   :   You can connect with client_id and client secret which is provided to you.
# client = cowlib.Client(
#     client_id="", client_secret="")


In [4]:
# Each call will returns proper response and error(if any) objects. And every object in compliancecow library follows
# Python object pattern. Which means, you can get the dict view by calling to_dict() method in objects.


# To fetch the available plans for you, You can use this method.

plans, error = client.get_plans()




In [5]:
aws_iam_plan_id=None
for plan in plans:
    if plan.name=="Plan Name":
        aws_iam_plan_id = plan.id

# To fetch the plan instances for a specific plan.

plan_instances, error = client.get_plan_instances(
    plan_id=str(aws_iam_plan_id), is_base_fields_only=True)


In [6]:
# Get Any plan instances
plan_instance = plan_instances[1]


# Available filters - If you pass the filters, that will apply as AND condition

# along_with_heirarchy=False    -   return the leaf controls with heirarchy
# having_evidences=False    -   return the leaf controls which contains evidences
# having_notes=False    -   return the leaf controls which contains notes
# having_attachments=False  -   return the leaf controls which contains attachments
# having_checklists=False   -   return the leaf controls which contains checklists
# automated=True    -   return the leaf controls which is automated


# Fetch the controls with evidence
controlswithevidence = plan_instance.get_plan_instance_controls(
    having_evidences=True)



In [7]:
# Fetch the evidence data. It'll return data as Pandas DataFrame. So you can play around it easily

evidence_ = None

for control_with_evide in controlswithevidence:
    is_data_found = False
    for evidence in control_with_evide.evidences:
        if evidence.file_name == "ReportName":
            evidence_ = evidence
            is_data_found = True
            break
    if is_data_found:
        break

evidence_df, error = client.get_evidence_data(
    evidence_)



In [None]:
# grouping the data based on a column(user).
user_group = evidence_df.groupby(['user'])['user'].size()

# convert the result as dictionary
user_group = user_group.to_dict()



In [None]:
# create a simple pie-chart using bokeh

from math import pi

import pandas as pd

from bokeh.palettes import Category20c
from bokeh.plotting import figure, show
from bokeh.transform import cumsum
from bokeh.io import output_notebook

data = pd.Series(user_group).reset_index(name='value').rename(
    columns={'index': 'user'})
data['angle'] = data['value']/data['value'].sum() * 2*pi
data['color'] = Category20c[len(user_group)]

p = figure(height=350, title="Report Chart", toolbar_location=None,
           tools="hover", tooltips="@user: @value", x_range=(-0.5, 1.0))

p.wedge(x=0, y=1, radius=0.4,
        start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'),
        line_color="white", fill_color='color', legend_field='user', source=data)

p.axis.axis_label = None
p.axis.visible = False
p.grid.grid_line_color = None

output_notebook()
show(p)


In [None]:
#   Now we're going to combine the above steps to generate a sample report. So you can consume the reports from each run.

#   We're having solid structure to generate a report. So you can create your own reports and can integrate with the system.
#   After that you can consume it with various plan runs and the filters which u'r passing.

#   Currently you need to focus on creating the reports by consuming the evidence and generate a bokeh chart and related data as O/P.

#   Every report in ComplianceCow will have a mock data file. So you can play around even the data is not available.

#   Methods you need to override.

#   __get_data__()  -   This is where you need to produce the data. Here you need to consume the evidence data and also should have a switch to mock data

#   GenerateReportData()    -   Return the data as DataFrame
#   GenerateReportDataAsDataTable() -   Return the data as DataTable
#   GenerateReportDataAsParquet()   -   Return the data as PARQUET bytes
#   GenerateReportDataAsJSON()  -   Return the data as JSON
#   GenerateReportDataAsHTMLContent()   -   Return the data as HTML content which includes visualisation
#   GenerateReportDataAsBokehHTMLObject()   -   Return the data as Bokeh Object(As JSON object)
#   GenerateReportDataAsMDFile()    -   Return the data as markdown bytes
#   GenerateReportDataAsPDFFile()   -   Return the data as pdf bytes

import pandas as pd
from bokeh.embed import json_item


class CowAWSMFAReportEvidenceReport(cowreport.CowReportsBaseModel):
    def __init__(self, req_obj, *args, **kwargs):
        cowreport.CowReportsBaseModel.__init__(
            self, req_obj, *args, **kwargs)
        self.plan_instance = None
        self.req_params = None
        self.auth_token = None
        self.kwargs = kwargs

        if dictutils.is_valid_key(req_obj, "plan_instance_id"):
            self.plan_instance = req_obj['plan_instance_id']
        self.__init_data()

    def WhoAmI(self, *args, **kwargs):
        ''' Standard method present in all Synthesizer Filters. This will provide the context of the Filter and its outputs to the Synthesizer Orchestrator'''

        return None

    def GenerateReportData(self, df=pd.DataFrame()):
        '''
        This method will produce the base ouput as DataFrame, so other ouput types can consume this data and
        present back to the client in the required fromat
        '''
        return {'data': cowreportsutils.getdfasdict(self.report_data)}

    def GenerateReportDataAsDataTable(self, df=pd.DataFrame()):
        '''
        This method will produce report data as DataTable
        '''
        df = self.__get_data__(df)
        return {'data': cowreportsutils.getdatatableview(df)}

    def GenerateReportDataAsParquet(self, df=pd.DataFrame()):
        '''
        This method will produce report data as DataTable
        '''

        df = self.__get_data__(df)
        return {'data': cowreportsutils.getdfasstrencoded(df)}


    def GenerateReportDataAsJSON(self, df=pd.DataFrame()):
        '''
        This method will produce report data as JSON
        '''
        df = self.__get_data__(df)
        return {'data': cowreportsutils.getdfasdict(df)}

    def GenerateReportDataAsHTMLContent(self, df=pd.DataFrame()):
        '''
        This method will produce report data as HTML Content
        '''

        return {'widgets': None, "charts": None}

    def GenerateReportDataAsBokehHTMLObject(self, df=pd.DataFrame()):
        '''
        This method will produce report data as Bokeh obj
        '''
        charts = {
            "piechart": self.__pie_chart(),
        }
        data = {
            "charts": charts,
            "data": None
        }

        return data

    def GenerateReportDataAsMDFile(self, df=pd.DataFrame()):
        '''
        This method will produce report data as MarkDown file content
        '''
        df = self.__get_data__(df)

        data = cowreportsutils.dfToMarkDownStr(df, None, self.markdown)

        return {'data': data}

    def GenerateReportDataAsPDFFile(self):
        '''
        This method will produce report data as PDF file content
        '''
        return None

    def __init_data(self):
        self.__get_data__()

    def __get_data__(self, df=pd.DataFrame()):
        if self.is_mock:           
            self.report_data = {'xxx@continube.com': 10,
                                 'yyyy@continube.com': 10,
                                 'zzzzz': 10}
        else:
            if isinstance(self.report_data, pd.DataFrame):
                if not self.report_data.empty:
                    return self.report_data
            else:
                if self.report_data and bool(self.report_data):
                    return self.report_data

            client = cowlib.Client(
                auth_token="")
            plans, error = client.get_plans()
            if error is None:
                aws_iam_plan_id = None
                for plan in plans:
                    if plan.name == "PlanName":
                        aws_iam_plan_id = plan.id

                if aws_iam_plan_id is not None:
                    plan_instances, error = client.get_plan_instances(
                        plan_id=str(aws_iam_plan_id), is_base_fields_only=True)

                    if error is None:
                        plan_instance = plan_instances[1]
                        controlswithevidence = plan_instance.get_plan_instance_controls(
                            having_evidences=True)

                        if controlswithevidence:

                            evidence_ = None

                            for control_with_evide in controlswithevidence:
                                is_data_found=False
                                for evidence in control_with_evide.evidences:
                                    if evidence.file_name == "ReportName":
                                        evidence_ = evidence
                                        is_data_found = True
                                        break
                                if is_data_found:
                                    break
                            
                            if evidence_:
                                evidence_df, error = client.get_evidence_data(
                                    evidence_)
                                if error is None:
                                    self.report_data = evidence_df

        return self.report_data

    def __pie_chart(self, is_plot_return_call=False):

        pie_chart_json = None
        p = figure(height=350, title="Report Chart", toolbar_location=None,
                   tools="hover", tooltips="@user: @value", x_range=(-0.5, 1.0))

        if not self.report_data.empty:

            # grouping the data based on a column(user).
            user_group = self.report_data.groupby(['user'])['user'].size()

            # convert the result as dictionary
            user_group = user_group.to_dict()

            data = pd.Series(user_group).reset_index(name='value').rename(
                columns={'index': 'user'})


            data['angle'] = data['value']/data['value'].sum() * 2*pi
            data['color'] = Category20c[len(user_group)]

            p.wedge(x=0, y=1, radius=0.4,
                    start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'),
                    line_color="white", fill_color='color', legend_field='user', source=data)

            p.axis.axis_label = None
            p.axis.visible = False
            p.grid.grid_line_color = None

            
        if is_plot_return_call:
            return p

        pie_chart_json = json_item(p)

        return pie_chart_json


In [18]:
# Each reports will have an uinque id and name. You can consume the available reports by passing report id/name. 
# And also you can pass some filters. You can pass 'format_type' to get the specific data(by default it'll be JSON)

report_data, error = client.get_report_data(report_id='',
                                            plan_instance_id='', is_mock=False)



In [22]:
# Sample report generation from report data summary
import pydash

compliance_status_data = []
control_type_data = []
evidence_data = []
over_all_data = {}
if not error and report_data.data:

    if dictutils.is_valid_key(report_data.data, "planRun") and dictutils.is_valid_array(report_data.data["planRun"], "controls"):
        over_all_data = {
            "in_place": pydash.get(
                report_data.data, 'planRun.controlsCount.complianceStatus.compliant.count', default=0),
            "not_in_place": pydash.get(
                report_data.data, 'planRun.controlsCount.complianceStatus.notCompliant.count', default=0),
            "n_a": pydash.get(
                report_data.data, 'planRun.controlsCount.complianceStatus.notDetermined.count', default=0),
            "automated": pydash.get(
                report_data.data, 'planRun.controlsCount.automationStatus.automated.count', default=0),
            "manual": pydash.get(
                report_data.data, 'planRun.controlsCount.automationStatus.manual.count', default=0),
            "evidence_count": pydash.get(
                report_data.data, 'planRun.controlsCount.evidenceStatus.withEvidence.count', default=0),
            "without_evidence_count": pydash.get(
                report_data.data, 'planRun.controlsCount.evidenceStatus.withoutEvidence.count', default=0),
        }
        for control in report_data.data["planRun"]['controls']:

            # compliance_status_data handle

            in_place_count = pydash.get(
                control, 'controlsCount.complianceStatus.compliant.count', default=0)
            not_in_place_count = pydash.get(
                control, 'controlsCount.complianceStatus.notCompliant.count', default=0)
            n_a_count = pydash.get(
                control, 'controlsCount.complianceStatus.notDetermined.count', default=0)

            compliance_status_data.append(
                {"control_name": control['name'], "displayable": control['displayable'], "in_place": in_place_count, "not_in_place": not_in_place_count, "n_a": n_a_count})

            # control_type_data handle

            automated_count = pydash.get(
                control, 'controlsCount.automationStatus.automated.count', default=0)
            manual_count = pydash.get(
                control, 'controlsCount.automationStatus.manual.count', default=0)

            control_type_data.append(
                {"control_name": control['name'], "displayable": control['displayable'], "automated": automated_count, "manual": manual_count})

            # evidence_count

            evidence_count = pydash.get(
                control, 'controlsCount.evidenceStatus.withEvidence.count', default=0)

            without_evidence_count = pydash.get(
                control, 'controlsCount.evidenceStatus.withoutEvidence.count', default=0)

            evidence_data.append(
                {"control_name": control['name'], "displayable": control['displayable'], "evidence_count": evidence_count, "without_evidence_count": without_evidence_count})


In [20]:
from bokeh.plotting import figure, show
from bokeh.io import output_notebook

control_type_data_df = pd.DataFrame(control_type_data)
controls = list(control_type_data_df['displayable'])
evidence_type = ["automated", "manual"]


# fruits = ['Apples', 'Pears', 'Nectarines', 'Plums', 'Grapes', 'Strawberries']
# years = ["2015", "2016", "2017"]
colors = ["#c9d9d3", "#718dbf"]

data = {'controls': controls,
        'automated': list(control_type_data_df['automated']),
        'manual': list(control_type_data_df['manual'])}

p = figure(x_range=controls, height=250, title="Evidence count per control",
           toolbar_location=None, tools="hover", tooltips="$name @controls: @$name")

p.vbar_stack(evidence_type, x='controls', width=0.9, color=colors, source=data,
             legend_label=evidence_type)

p.y_range.start = 0
p.x_range.range_padding = 0.1
p.xgrid.grid_line_color = None
p.axis.minor_tick_line_color = None
p.outline_line_color = None
p.legend.location = "top_left"
p.legend.orientation = "horizontal"
output_notebook()
show(p)
