<a href="https://colab.research.google.com/github/diverhaze/Overton_Pipeline/blob/main/BERT_test_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Installer für BERT


In [None]:
pip install germansentiment

## Leicht modifizierter BERT Code  

Modifizierung des generierten Outputs. Anstatt nur labels ausgegeben zu bekommen werden weitere Informationen ausgegeben. Diese beinhalten:  
* tensor logits
* argmax
* labels


**Hinweis:** Cuda erhöht die Performance signifikant, wenn eine GPU Unterstützung vorhanden ist, ist es äußerst empfehlenswert diese auch zu nutzen.

In [None]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from typing import List
import torch
import re


class SentimentModel2():
    def __init__(self, model_name: str = "oliverguhr/german-sentiment-bert"):
        if torch.cuda.is_available():
            self.device = 'cuda'
        else:
            self.device = 'cpu'

        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.model = self.model.to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)

        self.clean_chars = re.compile(r'[^A-Za-züöäÖÜÄß ]', re.MULTILINE)
        self.clean_http_urls = re.compile(r'https*\S+', re.MULTILINE)
        self.clean_at_mentions = re.compile(r'@\S+', re.MULTILINE)

    def predict_sentiment(self, texts: List[str]) -> List[str]:
        output = []             # modifiziert

        texts = [self.clean_text(text) for text in texts]
        # Add special tokens takes care of adding [CLS], [SEP], <s>... tokens in the right way for each model.
        # limit number of tokens to model's limitations (512)
        input_ids = self.tokenizer.batch_encode_plus(texts, padding=True, add_special_tokens=True, truncation=True)
        input_ids = torch.tensor(input_ids["input_ids"])
        input_ids = input_ids.to(self.device)

        with torch.no_grad():
            logits = self.model(input_ids)

        output.append(logits)   # modifiziert

        label_ids = torch.argmax(logits[0], axis=1)
        output.append(label_ids)# modifiziert

        labels = [self.model.config.id2label[label_id] for label_id in label_ids.tolist()]
        output.append(labels)   # modifiziert

        return output           # modifiziert

    def replace_numbers(self, text: str) -> str:
        return text.replace("0", " null").replace("1", " eins").replace("2", " zwei") \
            .replace("3", " drei").replace("4", " vier").replace("5", " fünf") \
            .replace("6", " sechs").replace("7", " sieben").replace("8", " acht") \
            .replace("9", " neun")

    def clean_text(self, text: str) -> str:
        text = text.replace("\n", " ")
        text = self.clean_http_urls.sub('', text)
        text = self.clean_at_mentions.sub('', text)
        text = self.replace_numbers(text)
        text = self.clean_chars.sub('', text)  # use only text chars
        text = ' '.join(text.split())  # substitute multiple whitespace with single whitespace
        text = text.strip().lower()
        return text


## Selbst geschriebene Pipeline 
(buggy (3 Warnings) und weiterhin in bearbeitung) Datarefiner(find_amplitude) funktioniert noch nicht

**To do:**
* cuda anpassen  (**DONE**)
* Objektorientiert (**DONE**)
* Logging (**DONE**)
* testen um wieviel % Cuda schneller läuft
* schauen warum \*e zahlen dabei sind (*Können nun verarbeitet werden, ist also egal warum*) 
* max und mim Werte herausfinden   
* durch die max werte Prozente implementieren 
* confidence berechnen die von BERT ausgegeben wird
* Koorelation der labels erörtern
* Vorfilter implementieren, welcher nach Keywords filtert
* Dokumente in Sätze splitten und filtern
* Boxplott erstellen, ggf. andere Visualisierungsmöglichkeiten ausprobieren
* Visualizer Klasser erstellen
* CSVHandler erweitern um mehr Dateiformate
* auf git pushen


***Documenter*** erstellt eine Logging Datei um besser nachvollziehen zu können wo ggf. Probleme entstehen

In [None]:
import logging

# Creates a logging file for easy debugging and failure searching -> pipeline.log
class Documenter:

    def __init__(self):
        logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', datefmt='%d/%m/%Y %H:%M:%S', # Date in German format, if desired change it
                            filename='pipeline.log', filemode='w', level=logging.DEBUG)


***DataRefiner*** soll als säuberungs und auswertungs Klasse dienen


In [None]:
from typing import List
import re
import logging


class DataRefiner:

    def __init__(self):
        self.max_value_pos: float = 0  # pos = positive
        self.min_value_pos: float = 0
        self.max_value_neu: float = 0  # methods missing
        self.min_value_neu: float = 0  # methods missing
        self.max_value_neg: float = 0  # methods missing
        self.min_value_neg: float = 0  # methods missing
        self.max_value_title_pos: float = 0  # methods missing
        self.min_value_title_pos: float = 0  # methods missing
        self.max_value_title_neu: float = 0  # methods missing
        self.min_value_title_neu: float = 0  # methods missing
        self.max_value_title_neg: float = 0  # methods missing
        self.min_value_title_neg: float = 0  # methods missing
        self.line_count_raw_data = 0
        self.line_count_raw_data_results = 0

# Setter
    def set_max_pos(self, value): #ggf. nicht benötigt
        self.max_value_pos = value

    def set_min_pos(self, value): #ggf. nicht benötigt
        self.min_value_pos = value

    def set_line_count_data(self, value):
        self.line_count_raw_data = value

    def set_line_count_result(self, value):
        self.line_count_raw_data_results = value

# Getter
    def get_max_pos(self): #ggf. nicht benötigt
        return self.max_value_pos

    def get_min_pos(self): #ggf. nicht benötigt
        return self.min_value_pos

    def get_line_count_data(self):
        return self.line_count_raw_data

    def get_line_count_result(self):
        return self.line_count_raw_data_results

# Methods
    # check if the amount of lines is equal between raw data and result
    def check_line_count(self) -> bool:
        if self.line_count_raw_data_results == self.line_count_raw_data:
            logging.debug("Line count equal: continue computation")
            return True
        else:
            logging.debug("Line Count unequal: double check filenames and make sure the code is not interrupted")
            print_warning()
            return False

    # parses the SequenceClassifierObject into a 'clean' string // Edit from wiser David: NO it doesn't it returns a list in a list, need to fix
    @staticmethod
    def clear_logits(logits): # -> tuple[List, List]:
        raw = str(logits).split('\n')
        clean_title_logits = re.findall('[-]?\d+[.]\d+|[-]?\d+[.]\d+e[+]*[-]*\d', raw[0])
        clean_text_logits = re.findall('[-]?\d+[.]\d+|[-]?\d+[.]\d+e[+]*[-]*\d', raw[1])
        return clean_title_logits, clean_text_logits

    @staticmethod
    def clear_value(value: [str]) -> List:
        return (re.findall('[-]?\d+[.]\d+|[-]?\d+[.]\d+e[+]*[-]*\d', value))

    def find_amplitude(self, tensors: List[List[str]]):
        for values in tensors:
            value = values[0].split(',')
            try:
                for x in range(3):
                    value[x] = self.clear_value(value[x])
            except IndexError as error:
                logging.error("Tensor values missing for title", error)
                print_warning()

            if float(value[0][0]) > self.max_value_title_pos:          # clear_value gives back a list in a list, need to fix !! NOT WORKING SO FAR (REGEX macht mich fertig...)
                self.max_value_title_pos = float(value[0][0])          # the matrices are a hack but it should work with it
            if float(value[0][0]) < self.min_value_title_pos:
                self.min_value_title_pos = float(value[0][0])
            if float(value[1][0]) > self.max_value_title_neg:
                self.max_value_title_neg = float(value[1][0])
            if float(value[1[0]]) < self.min_value_title_neg:
                self.min_value_title_neg = float(value[1][0])
            if float(value[2][0]) > self.max_value_title_neu:
                self.max_value_title_neu = float(value[2][0])
            if float(value[2][0]) < self.min_value_title_neu:
                self.min_value_title_neu = float(value[2][0])

            value = values[1].split(',')
            try:      # meckert wegen duplizierung, mir fällt aber im moment nicht ein wie man das anders lösen sollte da es eben nicht genau der gleiche code ist
                for x in range(3):
                    value[x] = self.clear_value(value[x])
            except IndexError as error:
                logging.error("Tensor values missing for body", error)
                print_warning()

                if float(value[0][0]) > self.max_value_pos:
                    self.max_value_pos = float(value[0][0])
                if float(value[0][0]) < self.min_value_pos:
                    self.min_value_pos = float(value[0][0])
                if float(value[1][0]) > self.max_value_neg:
                    self.max_value_neg = float(value[1][0])
                if float(value[1][0]) < self.min_value_neg:
                    self.min_value_neg = float(value[1][0])
                if float(value[2][0]) > self.max_value_neu:
                    self.max_value_neu = float(value[2][0])
                if float(value[2][0]) < self.min_value_neu:
                    self.min_value_neu = float(value[2][0])
        logging.info("Found all amplitudes")


def print_warning():
    print("Warning: check 'pipeline.log'")

Der ***CSVHandler*** liest und schreibt alle erforderlichen CSV Dateien (Dateiformate sollen erweitert werden) und sendet dem DataRefiner wichtige Meta-Daten.
Maximalwerte werden nicht gemerkt, da diese auch ohne einlesen einer Rohdaten-CSV gefunden werden sollen. (Falls man die Tensor Werte schon gespeichert hat)

In [None]:
from typing import List
import csv
import sys
import logging


class CsvHandler:

    def __init__(self, datarefiner):
        self.output = []
        self.line_count = 0
        self.row_count = 0
        self.headline_row = 0
        self.target_row = 0
        self.id_row = 0
        self.date_row = 0
        self.refiner = datarefiner

# Load a CSV file into a list
# Mode will determine which kind of CSV is committed
# Mode = 0 for the raw data CSV, Mode = 1 for the resulted CSV
    def load_csv(self, source: [str], mode: [int]) -> List[List[str]]:

        self.output = []
        self.line_count = 0
        self.row_count = 0
        self.headline_row = 0
        self.target_row = 0
        self.id_row = 0
        self.date_row = 0                                    # reset counter and output on every new method call

        with open(source, encoding='utf-8') as csv_file:     # try to open CSV file
            try:
                csv_reader = csv.reader(csv_file, delimiter=';')
            except csv.Error as error:
                sys.exit("file {}, line {}: {}".format(source, csv_reader.line_num, error))
            logging.info("CSV successfully loaded")

            if mode == 0:
                try:                                          # try to get the first line
                    headline = csv_reader.__next__()
                except csv.Error as error:
                    sys.exit("file {}, line {}: {}".format(source, csv_reader.line_num, error))

                logging.info("Searching desired data")
                for x in headline:                            # find desired rows in CSV
                    if x == '"title"' or x == 'title':
                        self.headline_row = self.row_count
                        self.row_count += 1
                    elif x == '"body"' or x == 'body':
                        self.target_row = self.row_count
                        self.row_count += 1
                    elif x == '"id"' or x == 'id':
                        self.id_row = self.row_count
                        self.row_count += 1
                    elif x == '"date"' or x == 'date':
                        self.date_row = self.row_count
                        self.row_count += 1
                    else:
                        self.row_count += 1
                logging.info("Desired data found")

                logging.info("Convert CSV into List")                # clear data of unnecessary information
                for row in csv_reader:
                    if not row:                 # catch empty row
                        continue
                    self.output.append([row[self.id_row], row[self.date_row],
                                        row[self.headline_row], row[self.target_row]])
                    self.line_count += 1

                logging.info(f"Converting was successful: {self.line_count} Lines")
                self.refiner.set_line_count_data(self.line_count)  # push line_count to the DataRefiner Class
                if not self.output:
                    logging.error("Seems like your data CSV is empty, double check that CSV file has data in it")
                    sys.exit("data CSV file empty")
                else:
                    return self.output

            if mode == 1:
                try:  # try to get the first line
                    headline = csv_reader.__next__()
                except csv.Error as error:
                    sys.exit("file {}, line {}: {}".format(source, csv_reader.line_num, error))

                logging.info("Searching desired tensors")
                for x in headline:  # find desired rows in CSV
                    if x == 'title':
                        self.headline_row = self.row_count
                        self.row_count += 1
                    elif x == 'body':
                        self.target_row = self.row_count
                        self.row_count += 1
                    else:
                        self.row_count += 1
                logging.info("Desired tensors found")

                logging.info("Convert CSV into List")  # clear data of unnecessary information
                for row in csv_reader:
                    if not row:                         # catch empty row
                        continue
                    self.output.append([row[self.headline_row], row[self.target_row]])
                    self.line_count += 1
                logging.info("Tensors successfully converted")

                self.refiner.set_line_count_result(self.line_count)    # push line_count to the DataRefiner Class
                if not self.output:
                    logging.error("Seems like your result CSV is empty, double check that CSV file has data in it")
                    sys.exit("Result CSV file empty")
                else:
                    return self.output

            else:
                logging.error("Wrong MODE in [load_csv]: Mode must be 0 or 1 {load_csv('Filename','Mode')}")
                sys.exit("Wrong Mode to load any CSV, check 'pipeline.log'")

# Create a CSV file for the results
    @staticmethod
    def create_result_csv(filename: [str]):
        try:
            with open(filename, 'w', encoding='utf-8') as csv_file:
                csv_writer = csv.writer(csv_file, delimiter=';')
                csv_writer.writerow(["id", "date", "title", "body"])
        except csv.Error as error:
            logging.error("Could not create CSV file for results [create_result_csv]")
            sys.exit("file {}, line {}: {}".format(filename, csv_writer, error))

# Create a CSV file for the results
    @staticmethod
    def create_value_csv(filename: [str]):
        try:
            with open(filename, 'w', encoding='utf-8') as csv_file:
                csv_writer = csv.writer(csv_file, delimiter=';')
                csv_writer.writerow(["max_positive", "min_positive", "max_neutral",
                                     "min_neutral", "max_negative", "min_negative"])
        except csv.Error as error:
            logging.error("Could not create CSV file for values [create_value_csv]")
            sys.exit("file {}, line {}: {}".format(filename, csv_writer, error))

# Write a List into a CSV file (appending)
    @staticmethod
    def write_result_csv(filename: [str], arg1: [str], arg2: [str], arg3: [str], arg4: [str]):
        try:
            with open(filename, 'a+', encoding='utf-8', newline='') as csv_file:
                csv_writer = csv.writer(csv_file, delimiter=';')
                csv_writer.writerow([arg1, arg2, arg3, arg4])
        except csv.Error as error:
            logging.error("Could not find CSV file for results [write_result_csv]")
            sys.exit("file {}, line {}: {}".format(filename, csv_writer, error))
        logging.info("Result saved")


Die ***Main***


In [None]:
from germansentiment import SentimentModel
#import csvhandler
#import datarefiner
#import documenter
import logging

# Initialisation
doc = documenter.Documenter()       # Logging
model = SentimentModel()            # Actual ML-Model by Oliver Guhr
dr = datarefiner.DataRefiner()      # data refining
csv = csvhandler.CsvHandler(dr)     # csv reader/writer; CSV_Handler must get a DataRefiner Object

string_source = "taz_filtered_christian_kahmann.csv"
string_results = "results_1.csv"
string_test = "test_2.csv"

logging.info("Model and classes loaded")


def write_data(information_list, filename):
    for text in information_list:
        article_id = text.pop(0)                # get article ID
        date = text.pop(0)                     # get article date
        result = model.predict_sentiment(text)  # call BERT

        tensors_title, tensors_text = dr.clear_logits(result[0].logits)   # get tensor values
        print(f"Id: {article_id} /// Title: {tensors_title} /// Text: {tensors_text}", end=': ')

        csv.write_result_csv(filename, article_id, date, tensors_title, tensors_text)  # write output csv
        # CSV schreiber muss wahrscheinlich noch von Liste in String geparsed werden


if __name__ == '__main__':

    data = csv.load_csv(string_source, 0)                   # load CSV and convert into list
    csv.create_result_csv(string_test)                  # create output CSV
    write_data(data, string_test)                                     # Run BERT and write output CSV

    results = csv.load_csv(string_test, 1)               # load result CSV and convert into list
    print(dr.check_line_counts())                            # check if code ran smoothly so far
    #dr.find_amplitude(results)
