In [1]:
import collections
import csv
import logging
import os
import re
from typing import Iterable, Any, Callable, Optional

import dateparser
import pandas as pd
from IPython.display import display
from ipywidgets import FileUpload, widgets
from rich.jupyter import print
logging.getLogger().setLevel(logging.DEBUG)
log = logging.getLogger()

cache_dir = "./.cache"
if not os.path.exists(cache_dir):
    os.makedirs(cache_dir)
from diskcache import Cache
cache = Cache(os.path.join(cache_dir, "diskcache"))


In [2]:
import nltk
from nltk.corpus import stopwords as st

def create_sources_data_frame(data=None):
    sources = pd.DataFrame(data, columns=['file_name', 'date'])
    sources.set_index('file_name', inplace=True)

    return sources

sources = create_sources_data_frame()
all_uploaded_files = {}

nltk.download('punkt')
nltk.download('stopwords')

ital_stopwords = st.words('italian')
en_stopwords = st.words('english')

stopwords = []


[nltk_data] Downloading package punkt to /home/markus/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/markus/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [3]:
# widgets
input_widgets = widgets.Output()
input_details_widget = widgets.Output()
data_widget = widgets.Output()

start_input_processing_button = widgets.Button(description="Process input", disabled=True)

upload_data = FileUpload(accept=".txt", multiple=True)
upload_stopword = FileUpload(accept=".csv", multiple=False)
dates = [pd.Timestamp.now(), pd.Timestamp.now()]
options = [(date.strftime(' %d %b %Y '), date) for date in dates]
index = (0, len(options)-1)
selection_range_slider = widgets.SelectionRangeSlider(
    options=options,
    index=index,
    description='Dates',
    orientation='horizontal',
    layout={'width': '500px'},
    continuous_update=False
)
selection_range_slider.disabled = True

options = []
text_preview_select_widget = widgets.Dropdown(
    # placeholder='Choose Someone',
    options=["No sources yet..."],
    description='Select the item to use for preview:',
    disabled=True
)

preview_button = widgets.Button(description="Preview")

filtered_table = widgets.Output()

In [4]:
# pre-processing utils
class ProcessingStep(object):

    def __init__(self, task_name: str, process_function: Callable, always_enabled: bool=False):

        self._always_enabled: bool = always_enabled
        self._process_function = process_function
        self._checkbox = widgets.Checkbox(description=task_name)
        if self._always_enabled:
            self._checkbox.value = True
            self._checkbox.disabled = True
        self._textbox = widgets.Textarea(value="", layout=widgets.Layout(height="150px"))
        self._textbox.disabled = True

        self._container = widgets.VBox((self._checkbox, self._textbox))

        self._result: Any = None
        self._next: Optional[ProcessingStep] = None

    def disable(self, disable: bool):

        if disable:
            self._checkbox.disabled = True
        elif not self._always_enabled:
                self._checkbox.disabled = False

    @property
    def container(self):

        return self._container

    def clear(self):

        self._textbox.value = ""

    @property
    def next_step(self):
        return self._next

    @next_step.setter
    def next_step(self, next_step: "ProcessingStep"):
        self._next = next_step

    def process(self, input_value: Any):

        if input_value is None:
            return

        if not self._checkbox.value:
            self._result = input_value
            self._textbox.value = "-- not enabled --"

            if self._next:
                return self._next.process(self._result)
            else:
                return self._result

        self._textbox.value = "-- processing --"

        self._result = self._process_function(input_value)

        if isinstance(self._result, str):
            result_string = self._result
        elif isinstance(self._result, collections.abc.Iterable):
            result_string = "\n".join(self._result)
        else:
            result_string = str(self._result)

        self._textbox.value = result_string

        if self._next:
            return self._next.process(self._result)
        else:
            return self._result


class ProcessingStepSource(ProcessingStep):

    def __init__(self):

        self._source = None

        super().__init__(task_name="Source", process_function=self._source_func, always_enabled=True)

    def start(self):

        if self._source is None or self._next is None:
            return

        return self._next.process(self._source)

    def set_source(self, source):

        self._source = source
        self._textbox.value = str(source)

    def _source_func(self, input_value):
        return self._source


class ProcessingPipeline(object):

    def __init__(self, *steps: ProcessingStep):

        self._steps: Iterable[ProcessingStep] = steps
        child_containers = []
        previous: ProcessingStep = None
        for step in self._steps:
            child_containers.append(step.container)
            if previous is not None:
                previous.next_step = step
            previous = step
        self._container = widgets.HBox([s.container for s in self._steps])

    @property
    def container(self):
        return self._container

    def set_source(self, source: Any):

        self._steps[0].set_source(source)
        for step in self._steps[1:]:
            step.clear()

    def process(self):

        for step in self._steps:
            step.disable(True)

        self._steps[0].start()

        for step in self._steps:
            step.disable(False)


def generate_preprocessing_preview_pipeline() -> ProcessingPipeline:

    def process(input_value: Any):

        return input_value + "_" + input_value

    s_source = ProcessingStepSource()
    s_tokenize = ProcessingStep(task_name="Tokenize", process_function=nltk.wordpunct_tokenize, always_enabled=True)
    s_lowercase = ProcessingStep(task_name="Lowercase", process_function=lambda x: [w.lower() for w in x if (w.isalpha() and len(w) > 2 )], always_enabled=False)
    s_stopwords = ProcessingStep(task_name="Remove stopwords", process_function=lambda x: [w for w in x if not w in stopwords], always_enabled=False)

    pp = ProcessingPipeline(s_source, s_tokenize, s_lowercase, s_stopwords)
    return pp

preview_processing_pipeline = generate_preprocessing_preview_pipeline()

# helper methods
def input_is_ready() -> bool:

    return len(sources) > 0 and len(stopwords) > 0

def disable_input(disable: bool):

    if disable:
        selection_range_slider.disabled = True
        upload_data.disabled = True
        upload_stopword.disabled = True
        start_input_processing_button.disabled = True

    else:
        ready = input_is_ready()
        start_input_processing_button.disabled = not ready
        if len(sources) > 0:
            selection_range_slider.disabled = False
        else:
            selection_range_slider.disabled = True

        upload_data.disabled = False
        upload_stopword.disabled = False

# event-handling methods
def refresh_input_details():

    disable_input(False)

    start_date = selection_range_slider.value[0]
    end_date = selection_range_slider.value[1] + pd.DateOffset(months=1) - pd.Timedelta(seconds=1)

    filtered_sources = sources[(sources['date']>=start_date) & (sources['date']<=end_date)]

    text_preview_select_widget.options = tuple(sources.index.values)
    text_preview_select_widget.disabled = False

    input_details_widget.clear_output()
    with input_details_widget:
        display(widgets.HTML(value="<h3>Input information</h3>"))
        color_start = ""
        color_end = ""
        if not len(filtered_sources):
            color_start = "[red bold]"
            color_end = "[/]"
        print(f"{color_start}Number of selected datasets: {len(filtered_sources)}{color_end}")
        color_start = ""
        color_end = ""
        if not len(stopwords):
            color_start = "[red bold]"
            color_end = "[/]"
        print(f"{color_start}Number of stopwords: {len(stopwords)}{color_end}")

        display(widgets.HTML(value="<h3>Pre-processing steps</h3>"))
        display(text_preview_select_widget)
        display(preview_processing_pipeline.container)
        display(preview_button)
        display(widgets.HTML(value="<h3>Corpus time-frame</h3>"))
        print("Select the timeframe to use for processing:")
        display(selection_range_slider)
        display(widgets.HTML(value="<h3>Pre-process</h3>"))

        display(start_input_processing_button)

def extract_date(file_details):

    file_name = file_details['metadata']['name']
    _date = dateparser.parse(file_name.split("_")[1], settings={'TIMEZONE': 'Europe/Berlin'})
    return (file_name, _date)

def prepare_data_for_file(row):

    file_name = row.name

    result = process_file_content(file_name)
    result["date"] = row.date

    ref = re.findall(r'(\w+\d+)_\d{4}-\d{2}-\d{2}_', file_name)[0]
    if (ref == 'sn85066408'):
        pub_name = 'L\'Italia'
    elif (ref == '2012271201'):
        pub_name = 'Cronaca Sovversiva'
    else:
        pub_name = None

    result["ref"] = ref
    result["pub_name"] = pub_name
    return result

@cache.memoize(typed=True, tag="tokenize")
def process_file_content(file_name):

    log.debug(f"computing: {file_name}")

    content_bytes = all_uploaded_files[file_name]['content']
    content = ' ' + str(content_bytes, 'utf-8').replace('\n', ' ') + ' '

    tokenized = nltk.word_tokenize(content)  # TODO: language italian?
    doc_prep = [w.lower() for w in tokenized if (w.isalpha() and len(w) > 2 )]
    doc_prep_nonstop = [w for w in doc_prep if not w in stopwords]

    result = {"text": content[0:20], "tokenized": tokenized, "doc_prep": doc_prep, "doc_prep_nonstop": doc_prep_nonstop}
    log.debug(f"finished computing: {file_name}")
    return result

def get_filtered_df(start_date, end_date):
    # we only read the files in the timeframe we are interested in

    # df["text"] = sources.text.astype(str)
    # df["tokenized"] = sources.tokenized.astype('object')
    # df["doc_prep"] = sources.doc_prep.astype('object')
    # df["doc_prep_nonstop"] = sources.doc_prep_nonstop.astype('object')

    processed_data = sources[(sources['date']>=start_date) & (sources['date']<=end_date)].apply(lambda x: prepare_data_for_file(x), axis=1)
    df = pd.DataFrame(processed_data.to_list(), index=processed_data.index)
    return df

    # for k, v in processed_data.items():
    #     if not v:
    #         continue
    #
    #     df.at[k, 'text'] = v['text']
    #     df.at[k, 'tokenized'] = v['tokenized']
    #     df.at[k, 'doc_prep'] = v['doc_prep']
    #     df.at[k, 'doc_prep_nonstop'] = v['doc_prep_nonstop']
    #
    # return df


def display_update(start_date, end_date):

    df = get_filtered_df(start_date, end_date)
    filtered_table.clear_output()
    with filtered_table:
        display(df.describe())
        display(df)

def date_range_change_handler(change):

    disable_input(True)
    # print(change.new)

    # start_date = change.new[0]
    # end_date = change.new[1]

    refresh_input_details()


def input_file_change_handler(change):

    # upload_data.disabled = True
    disable_input(True)

    data = {}
    for file_details in change.new.values():
        file_id, _date = extract_date(file_details)
        data[file_id] = _date
        all_uploaded_files[file_id] = file_details

    for file_id, _date in data.items():
        if file_id in sources:
            log.error(f"Duplicate file: {file_id}")
            continue

        sources.loc[file_id] = (_date,)

    min_date = min(sources['date']) - pd.DateOffset(months=1)
    max_date = max(sources['date'])

    dates = pd.date_range(min_date, max_date, freq='MS')
    if len(dates) == 1:
        dates = (dates[0], dates[0])

    options = [(date.strftime(' %b %Y '), date) for date in dates]

    selection_range_slider.options = options
    selection_range_slider.index = (0, len(options)-1)

    upload_data.value.clear()
    upload_data._counter = 0

    refresh_input_details()

    # upload_data.disabled = False


def stopwords_input_change_handler(change):

    disable_input(True)

    stopwords.clear()
    stopword_content = upload_stopword.data[0]
    stopwords_string = str(stopword_content, "utf-8")

    reader = csv.reader(stopwords_string.split("\n"))
    for row in reader:
        if not row or row == ['stopword']:
            continue
        stopwords.append(row[0])

    stopwords.extend(en_stopwords)

    refresh_input_details()

def start_input_processing(event_source):

    disable_input(True)

    data_widget.clear_output()

    with data_widget:
        print("tokenizing input data...")

    start_date = selection_range_slider.value[0]
    end_date = selection_range_slider.value[1] + pd.DateOffset(months=1) - pd.Timedelta(seconds=1)

    filtered_df = get_filtered_df(start_date=start_date, end_date=end_date)

    data_widget.clear_output()

    with data_widget:

        display(filtered_df)

    disable_input(False)

def set_preview_source(event):
    file_name = text_preview_select_widget.value
    if file_name not in all_uploaded_files.keys():
        return

    content_bytes = all_uploaded_files[file_name]["content"]
    content = ' ' + str(content_bytes, 'utf-8').replace('\n', ' ') + ' '

    preview_processing_pipeline.set_source(content)

def execute_preview(source):
    preview_processing_pipeline.process()


In [5]:
upload_data.observe(input_file_change_handler, names=['value'])
upload_stopword.observe(stopwords_input_change_handler, names=['value'])

with input_widgets:
    print("Input data (text files)")
    display(upload_data)
    print("Stopword (csv format)")
    display(upload_stopword)

display(widgets.HTML("<h2>Upload input data</h2>"))
display(input_widgets)
refresh_input_details()

HTML(value='<h2>Upload input data</h2>')

Output()

In [6]:
text_preview_select_widget.observe(set_preview_source)
selection_range_slider.observe(date_range_change_handler, names=['value'])
preview_button.on_click(execute_preview)
display(widgets.HTML("<h2>Input details</h2>"))
display(input_details_widget)
start_input_processing_button.on_click(start_input_processing)

HTML(value='<h2>Input details</h2>')

Output()

In [7]:
display(widgets.HTML("<h2>Data</h2>"))
display(data_widget)


HTML(value='<h2>Data</h2>')

Output()