In [None]:
from IPython import display
display.HTML('''<script>
code_show=true;
function code_toggle() {
 if (code_show){
 $('div.input').hide();
 } else {
 $('div.input').show();
 }
 code_show = !code_show
}
$( document ).ready(code_toggle);
</script>
<a href="javascript:code_toggle()">Toggle raw code</a>.''')

In [None]:
import os
import threading
import time

import ipywidgets as widgets

def _display(out: widgets.Output, content, clear_output=True):
    @out.capture(clear_output=clear_output, wait=True)
    def _display_out():
        display.display(content)
        time.sleep(1)

    _display_out()


class Runner:
    def __init__(self,
                 url,
                 filter_value,
                 update_interval,
                 interrupt_refresh_plots,
                 outs: widgets.Output
                 ):
        self.url = url
        self.filter_value = filter_value
        self.update_interval = update_interval
        self.interrupt_refresh_plots = interrupt_refresh_plots
        self.outs = outs
        self.db = GradesDB()

    def __call__(self):
        self.interrupt_refresh_plots.clear()
        threading.Timer(3600, lambda _: self.interrupt_refresh_plots.set).start()
        interval_timeout = threading.Event()

        figures = Figures()
        while not self.interrupt_refresh_plots.is_set():
            interval_timeout.clear()
            threading.Timer(self.update_interval, interval_timeout.set).start()
            self.show_figures(figures)
            while True:
                if interval_timeout.wait(timeout=1):
                    break
                if self.interrupt_refresh_plots.wait(timeout=1):
                    break

    def _pull_and_filter_db(self):
        self.db.pull_db(self.url)
        if self.filter_value == 'All':
            return self.db.filter_db('')
        else:
            return self.db.filter_db(self.filter_value)
    def show_figures(self, figures):
        filtered_db = self._pull_and_filter_db()
        if not os.path.exists('./figs'):
            os.mkdir('./figs')
        self._show_figure(
            fig=figures.num_passing_per_test(filtered_db),
            filename='fig0.png',
            out=self.outs[0]
        )
        self._show_figure(
            fig=figures.num_passing_per_test(filtered_db),
            filename='fig1.png',
            out=self.outs[1]
        )
        self._show_figure(
            fig=figures.num_passing_per_test(filtered_db),
            filename='fig2.png',
            out=self.outs[2]
        )
        self._show_figure(
            fig=figures.num_passing_per_test(filtered_db),
            filename='fig3.png',
            out=self.outs[3]
        )

    def _show_figure(self, fig, filename, out):
        fullpath = f'./figs/{filename}'
        try:
            os.remove(fullpath)
        except FileNotFoundError as e:
            pass
        try:
            fig.savefig(fullpath, bbox_inches='tight')
            plt.close(fig)
            time.sleep(0.1)
            img = display.Image(filename=fullpath)
            _display(
                out,
                img,
                clear_output=False
            )
        except FileNotFoundError as e:
            pass

class Controller:
    def __init__(self, ui_out, plots_out):
        self.ui_out = ui_out
        self.plots_out = plots_out
        self.wg = GradesWidgets()
        self.db = GradesDB()
        self.interrupt_refresh_plots = threading.Event()
        self.wg['search_filter']
        self.wg['update_url'].on_click(
            lambda _: self.apply_url(),
        )
        self.wg['search_filter'].observe(
            handler=lambda _: self.apply_filter(),
            names=['value']
        )
        self.wg['interrupt_button'].on_click(lambda _: self.apply_interrupt())
        self.wg['resume_button'].on_click(lambda _: self.apply_resume())
        self.wg._display(self.ui_out)

    def apply_interrupt(self):
        self.interrupt_refresh_plots.set()
        self.set_widgets_interaction(disabled=False)

    def apply_filter(self):
        filter_value = self.wg['search_filter'].value
        filtered_db = self.db.filter_db(filter_value)
        choices = list(filtered_db.keys())
        self.wg.update_dropdown(choices)

    def apply_url(self):
        try:
            self.db.pull_db(self.wg['url'].value)
            self.wg['request_status'].value = True
            self.wg['request_status'].description = f'Success'
        except Exception as e:
            self.wg['request_status'].value = False
            self.wg['request_status'].description = f'{e}'
        finally:
            self.apply_filter()

    def pull_and_filter_db(self):
        self.db.pull_db(self.wg['url'].value)
        if self.wg['dropdown'].value == 'All':
            return self.db.filter_db(self.wg['search_filter'].value)
        else:
            return self.db.filter_db(self.wg['dropdown'].value)

    def set_widgets_interaction(self, disabled: bool):
        self.wg['url'].disabled = disabled
        self.wg['update_url'].disabled = disabled
        self.wg['search_filter'].disabled = disabled
        self.wg['dropdown'].disabled = disabled
        self.wg['update_interval'].disabled = disabled
        self.wg['resume_button'].disabled = disabled
        self.wg['interrupt_button'].disabled = not disabled

    def apply_resume(self):
        for thread in threading.enumerate():
            if thread.getName() == 'refresh_plots':
                thread.join(timeout=0)
        self.set_widgets_interaction(disabled=True)
        filter_value = self.wg['search_filter'].value \
            if self.wg['dropdown'].value == 'All' \
            else self.wg['dropdown'].value
        runner = Runner(
            url=self.wg['url'].value,
            filter_value=filter_value,
            update_interval=self.wg['update_interval'].value,
            interrupt_refresh_plots=self.interrupt_refresh_plots,
            outs=self.plots_out
        )
        threading.Thread(name='refresh_plots', target=runner, daemon=True).start()


In [None]:
import itertools

import matplotlib.pyplot as plt


class Figures:
    def __init__(self):
        self.counter = itertools.count()

    def _plot_barh(self, title, y, width):
        plt.ion()
        plt.clf()
        plt.style.use('ggplot')
        fig = plt.figure()
        plt.barh(y=y, width=width)
        plt.title(title)
        return fig

    def num_passing_per_test(self, filtered_db):
        group_results = dict()
        for repo_name, test_dict in filtered_db.items():
            for test_name, test_result in test_dict.items():
                group_results.setdefault(test_name, 0)
                if test_result['passing']:
                    group_results[test_name] += 1
        fig = self._plot_barh(
            title=f'Réussite en fonction des critères (n={len(filtered_db.keys())}) | ID#{next(self.counter)}',
            y=list(group_results.keys()),
            width=group_results.values()
        )
        return fig


In [None]:
import ipywidgets as widgets


class GradesWidgets:
    def __init__(self):
        self.wg = dict()
        self.create()

    def create(self):
        default_layout = widgets.Layout(width='500px')
        self.wg['url'] = widgets.Text(
            value='',
            description='URL: ',
            layout=default_layout
        )
        self.wg['update_url'] = widgets.Button(description='Update')
        self.wg['request_status'] = widgets.Valid(layout=default_layout)
        self.wg['search_filter'] = widgets.Text(
            value='',
            placeholder='Must contain',
            description='Filter',
            layout=default_layout
        )
        self.wg['dropdown'] = widgets.Dropdown(
            description='Select',
            layout=default_layout
        )
        self.wg['num_results'] = widgets.Label()

        self.wg['update_interval'] = widgets.IntSlider(
            value=10,
            min=1,
            max=60,
            step=1,
            layout=default_layout,
            continuous_update=True
        )
        self.wg['interrupt_button'] = widgets.Button(description='Interrupt', disabled=True)
        self.wg['resume_button'] = widgets.Button(description='Resume')

    def _display(self, out: widgets.Output):
        @out.capture()
        def _display():
            display.display(widgets.HTML('<h1>View submissions tests results</h1>'))
            display.display(widgets.HBox((self.wg['url'], self.wg['update_url'], self.wg['request_status'])))
            display.display(self.wg['search_filter'])
            display.display(widgets.HBox((self.wg['dropdown'], self.wg['num_results'])))
            display.display(widgets.HBox((widgets.Label('Update interval (seconds)'), self.wg['update_interval'])))
            display.display(widgets.HBox((self.wg['interrupt_button'], self.wg['resume_button'])))

        _display()

    def __getitem__(self, item):
        return self.wg[item]

    def update_dropdown(self, choices):
        self.wg['dropdown'].options = ['All'] + choices
        self.wg['num_results'].value = f'({len(choices)} results)'


In [None]:

class GradesDB:
    def __init__(self):
        self.db = dict()

    def pull_db(self, url):
        try:
            res = requests.get(url)
            if res.ok:
                self.db.clear()
                self.db.update(res.json())
            else:
                self.db.clear()
                raise Exception(f'Bad request: {res.text}')
        except:
            self.db.clear()
            raise Exception(f'Bad url')

    def filter_db(self, filter_value):
        filtered_db = dict()
        filtered_keys = list(filter(lambda name: str(name).find(filter_value) != -1, self.db.keys()))
        for key in filtered_keys:
            if key != 'null' and key != 'scripts':
                filtered_db[key] = self.db[key]
        return filtered_db

    def __getitem__(self, item):
        return self.db[item]


In [None]:
from ipywidgets import widgets


class FilesWidgets:
    def __init__(self, out: widgets.Output, get_filtered_db):
        self.wg = dict()
        self.files = dict()
        self.scores = dict()
        self._get_filtered_db = get_filtered_db
        self._create()
        self._add_functionality()
        self._display(out)
        self.update_selection_options()

    def _create(self):
        default_layout = widgets.Layout(width='auto', height='auto')
        self.wg['organization'] = widgets.Text(
            value='TestOrgJustAymeric',
            description='Organization',
            layout=default_layout
        )
        self.wg['filter'] = widgets.Text(
            description='Filter',
            placeholder='Must contain',
            layout=default_layout
        )
        self.wg['filename'] = widgets.Text(
            value='exercice.py',
            description='Filename',
            layout=default_layout
        )
        self.wg['request_url'] = widgets.Text(
            description='Request',
            value=f'https://raw.githubusercontent.com/{self.wg["organization"].value}/%RepositoryName%/master/{self.wg["filename"].value}',
            layout=default_layout,
            disabled=True
        )
        self.wg['get_files'] = widgets.Button(description='Fetch submissions')
        self.wg['get_files_status'] = widgets.Valid(value=True, description='Ready', layout=default_layout)
        self.wg['previous_button'] = widgets.Button(description='Previous')
        self.wg['next_button'] = widgets.Button(description='Next')
        self.wg['open_in_browser'] = widgets.Button(description='Open in GitHub', layout=default_layout)
        self.wg['open_file'] = widgets.Checkbox(description='File only', layout=default_layout)
        self.wg['repository_select'] = widgets.Dropdown(
            description='Select',
            layout=widgets.Layout(width='600px'))
        self.wg['max_preview_lines'] = widgets.IntText(
            value=100,
            disabled=False,
            layout=widgets.Layout(width='50px')
        )
        self.wg['preview_lines_range'] = widgets.IntRangeSlider(
            value=[0, 20],
            min=0,
            max=self.wg['max_preview_lines'].value,
            step=1,
            description='Lines range:',
            continuous_update=True,
            orientation='horizontal',
            readout=True,
            readout_format='d',
            layout=widgets.Layout(width='500px')
        )
        self.wg['repository_grading'] = widgets.HTML(
            layout=widgets.Layout(
                width='auto',
                height='auto',
                border='solid 1px',
                padding='2px 10px 2px 10px'
            )
        )
        html_layout = widgets.Layout(width='auto', height='auto', padding='20px 100px 0px 20px')
        self.wg['file_preview_stats'] = widgets.HTML(layout=html_layout)
        self.wg['file_preview'] = widgets.HTML(layout=html_layout)
        self.wg['file_view_stats'] = widgets.HTML(layout=html_layout)
        self.wg['file_view'] = widgets.HTML(layout=html_layout)
        file_preview_box = widgets.HBox((self.wg['file_preview'], self.wg['file_preview_stats']))
        file_view_box = widgets.HBox((self.wg['file_view'], self.wg['file_view_stats']))
        lines_range_box = widgets.HBox((self.wg['preview_lines_range'], self.wg['max_preview_lines']))
        self.wg['accordion'] = widgets.Accordion(
            children=[
                widgets.VBox((lines_range_box, file_preview_box)),
                file_view_box
            ]
        )
        self.wg['accordion'].set_title(0, 'Preview')
        self.wg['accordion'].set_title(1, 'File')

    def _add_functionality(self):
        self.wg['organization'].observe(lambda _: self._update_request_url())
        self.wg['filename'].observe(lambda _: self._update_request_url())
        self.wg['filter'].observe(lambda _: self._apply_filter())
        self.wg['max_preview_lines'].observe(lambda _: self._update_max_preview_lines())
        self.wg['get_files'].on_click(lambda _: self.update_files_and_scores())
        self.wg['previous_button'].on_click(lambda _: self._select_previous())
        self.wg['next_button'].on_click(lambda _: self._select_next())
        self.wg['open_in_browser'].on_click(lambda _: self._open_browser())
        self.wg['repository_select'].observe(lambda _: self._apply_select())
        self.wg['preview_lines_range'].observe(lambda _: self._update_file_preview())

    def _display(self, out: widgets.Output):
        @out.capture()
        def _display():
            display.display(widgets.HTML('<h1>View submissions files</h1>'))
            display.display(self.wg['organization'])
            display.display(self.wg['filename'])
            display.display(self.wg['filter'])
            display.display(self.wg['request_url'])
            display.display(widgets.HBox((self.wg['get_files'], self.wg['get_files_status'])))
            display.display(
                widgets.HBox((self.wg['repository_select'], self.wg['open_in_browser'], self.wg['open_file'])))
            display.display(widgets.HBox((self.wg['previous_button'], self.wg['next_button'])))
            display.display(self.wg['repository_grading'])
            display.display(self.wg['accordion'])

        _display()

    def _apply_filter(self):
        self._update_request_url()
        self.update_selection_options()
        self._apply_select()

    def _update_max_preview_lines(self):
        self.wg['preview_lines_range'].max = self.wg['max_preview_lines'].value

    def _open_browser(self):
        org = self.wg['organization'].value
        repo = self.wg['repository_select'].value
        filename = self.wg['filename'].value
        if self.wg['open_file'].value == True:
            webbrowser.open(f'https://github.com//{org}/{repo}/blob/master/{filename}')
        else:
            webbrowser.open(f'https://github.com//{org}/{repo}')

    def _update_request_url(self):
        org = self.wg['organization'].value
        filename = self.wg['filename'].value
        repository_filter = self.wg['filter'].value
        self.wg[
            'request_url'].value = f'https://raw.githubusercontent.com/{org}/%Repository name containing: {repository_filter}%/master/{filename}'

    def _update_file_view(self, file=None, line_scores=None):
        selection = self.wg['repository_select'].value
        try:
            file = self.files[selection]
            line_scores = self.scores[selection]
        except KeyError:
            self.wg['file_view_stats'].value = '<p>Couldn\'t get stats</p>'
            self.wg['file_view'].value = '<p>Couldn\'t get file</p>'
        else:
            self.wg['file_view_stats'].value = '<pre><code>' + ''.join(line_scores) + '</code></pre>'
            self.wg['file_view'].value = '<pre><code>' + file + '</code></pre>'

    def _update_file_preview(self):
        selection = self.wg['repository_select'].value
        try:
            file = self.files[selection]
            line_scores = self.scores[selection]
        except KeyError:
            self.wg['file_preview_stats'].value = '<p>Couldn\'t get stats</p>'
            self.wg['file_preview'].value = '<p>Couldn\'t get file</p>'
        else:
            lines_range = self.wg['preview_lines_range']
            file_lines = [line + '\n' for line in file.split('\n')]
            selected_lines = file_lines[lines_range.lower: min(lines_range.upper, len(file_lines))]
            selected_scores = line_scores[lines_range.lower: min(lines_range.upper, len(line_scores))]
            self.wg['file_preview_stats'].value = '<pre><code>' + ''.join(selected_scores) + '</code></pre>'
            self.wg['file_preview'].value = '<pre><code>' + ''.join(selected_lines) + '</code></pre>'

    def _get_request_urls(self, repository_names):
        org = self.wg['organization'].value
        filename = self.wg['filename'].value
        urls = list(map(
            lambda
                repo: f'https://raw.githubusercontent.com/{org}/{repo}/master/{filename}',
            repository_names
        ))
        return urls

    def _get_files(self, urls):
        files = []
        self.wg['get_files_status'].value = True
        self.wg['get_files_status'].description = f'Success'
        for url in urls:
            try:
                file = get_file(url)
                files.append(file)
            except Exception as e:
                files.append('Couldn\'t get file')
                self.wg['get_files_status'].value = False
                self.wg['get_files_status'].description = f'{e}'
        return files

    def update_selection_options(self):
        repository_names = self._get_filtered_repository_names()
        if len(repository_names) > 0:
            self.wg['repository_select'].options = repository_names
            self.wg['repository_select'].value = repository_names[0]
        else:
            self.wg['repository_select'].options = ['Not found']
            self.wg['repository_select'].value = 'Not found'

    def update_files_and_scores(self):
        self.update_selection_options()
        repository_names = self._get_filtered_repository_names()
        urls = self._get_request_urls(repository_names)
        files = self._get_files(urls)
        formatted_scores = get_formatted_scores(files)
        self.files = dict(zip(repository_names, files))
        self.scores = dict(zip(repository_names, formatted_scores))
        self._apply_select()

    def _select_previous(self):
        options = list(self.wg['repository_select'].options)
        index = options.index(self.wg['repository_select'].value)
        new_index = max(0, index - 1)
        self.wg['repository_select'].value = options[new_index]
        self._apply_select()

    def _select_next(self):
        options = list(self.wg['repository_select'].options)
        index = options.index(self.wg['repository_select'].value)
        new_index = min(len(options) - 1, index + 1)
        self.wg['repository_select'].value = options[new_index]
        self._apply_select()

    def _apply_select(self):
        self._update_repository_grading()
        self._update_file_preview()
        self._update_file_view()

    def _update_repository_grading(self):
        filter_value = self.wg['filter'].value
        filtered_db = self._get_filtered_db(filter_value)
        try:
            test_dict = filtered_db[self.wg['repository_select'].value]
            grading_html = 'Test results:<br>'
            for test_name, test_result in test_dict.items():
                result = "ok" if test_result["passing"] else "fail"
                grading_html += f'- {test_name}: {result}<br>'
            self.wg['repository_grading'].value = grading_html
        except:
            self.wg['repository_grading'].value = '<p>Couldn\'t apply grading</p>'

    def _get_filtered_repository_names(self):
        filter_value = self.wg['filter'].value
        filtered_db = self._get_filtered_db(filter_value)
        return list(filtered_db.keys())


In [None]:
import requests


def get_file(url):
    try:
        res = requests.get(url)
        if res.ok:
            return res.content.decode('utf-8')
        else:
            raise Exception(f'Bad request: {res.text}')
    except:
        raise Exception(f'Bad url')


def get_line_counts(files):
    line_counts = dict()
    for f in files:
        lines = f.split('\n')
        unique_lines = set(lines)
        scores_per_file = []
        for line in unique_lines:
            line_counts.setdefault(line, 0)
            line_counts[line] += 1
    return line_counts


def score_lines(file, line_counts):
    lines = file.split('\n')
    scores = list(map(lambda line: line_counts[line], lines))
    return scores


def _get_scores(files):
    line_counts = get_line_counts(files)
    files_scores = list(map(lambda file: score_lines(file, line_counts), files))
    return files_scores


def get_formatted_scores(files):
    files_scores = _get_scores(files)
    formatted_scores = []
    for scores in files_scores:
        formatted_scores.append(
            list(map(
                lambda score: f'{100 * score / len(files_scores):.2f}% ({score}/{len(files_scores)})\n',
                scores
            ))
        )
    return formatted_scores


In [None]:
import collections

from IPython import display
from ipywidgets import widgets

out = collections.defaultdict(widgets.Output)
plots_out = collections.defaultdict(
    lambda: widgets.Output(layout={'height': '300px', 'width': '600px', 'overflow': 'scroll'}))

In [None]:
display.display(out['plots_UI'])

In [None]:
display.display(plots_out['0'])

In [None]:
display.display(plots_out['1'])

In [None]:
display.display(plots_out['2'])

In [None]:
display.display(plots_out['3'])

In [None]:
display.display(out['submissions_viewer'])

In [None]:
controller = Controller(
    ui_out=out['plots_UI'],
    plots_out=[
        plots_out['0'],
        plots_out['1'],
        plots_out['2'],
        plots_out['3'],
    ])
db_url = 'https://teststudentgradesdb.firebaseio.com/.json?'
controller.wg['url'].value = db_url
controller.apply_url()

In [None]:
import webbrowser

_ = webbrowser.open_new('figures.html')

In [None]:
wg = FilesWidgets(out['submissions_viewer'], controller.db.filter_db)