In [73]:
from os import listdir
from os.path import isfile, join
from multiprocessing import Process
from tqdm import tqdm
import json
import pickle 
import bz2

import time
import datetime

import pandas as pd
import numpy as np

import regex as re
import cld3
from bs4 import BeautifulSoup

from clickhouse_driver import connect
from geopy.geocoders import Nominatim

import math
from scipy.sparse import csr_matrix, vstack, save_npz, load_npz

<h1> Merkmalsextraktion von Tweets </h1>

Dieses Notebook behandelt die Extraktion von Tweetmerkmalen. **Vor Ausführung dieses Skriptes muss bereits ein kMeans-Clustering durchgeführt und persistiert worden sein. Dieses wurde im Notebook 1-CreateLabels.ipynb implementiert.**

Die meisten extrahierbaren Merkmale lassen sich direkt aus den Tweets entnehmen und ohne komplexere Vorverarbeitung in eine numerische Repräsentation umwandeln. Bei anderen Merkmalen ist dies nicht ohne weitere Schritte möglich. Konkret brauchen die Textpassagen des Algorithmus eine zusätzliche Verarbeitung durch ein vortrainiertes Word Embedding Tool und die Ortsangaben aus den Nutzerprofilen eine Konvertierung in Koordinaten durch einen Geocoder.

Die Extraktion wird daher in mehreren getrennten Schritten umgesetzt. Folgende Schritte sind dabei exemplarisch vorimplementiert:

1. Die Konvertierung von Tweets in eine CSV-Repräsentation, welche eine unvollständige und noch nicht vollends numerische Repräsentation aller Merkmale hält.
2. Eine exemplarische Implementierung des Profil-Location Geocoders.
3. Ein Modell zur Bestimmung von lokalen Wörtern aus Texten.
4. Die fastText Skripte zum Training und Labeling von Texten (auf Bash).


<h2>1. Konvertierung von Tweets in eine CSV-Repräsentation relevanter Merkmale</h2>

Die Konvertierung von Tweets ist hier in einer parallel implementierten Umgebung umgesetzt. Es wird eine feste Zahl an Prozessen erzeugt, welchen jeweils ein zugewiesenes Tweet-File umwandeln. Aus einem Geotweet File (vgl. Vorverarbeitung in Bash) werden dann mehrere temporäre Files erzeugt, welche zu jedem Tweet Teilinformationen beinhalten, die dann später getrennt voneinander weiterverarbeitet werden. 

Wie ein Tweet verarbeitet wird, ist in der nachfolgend ersichtlichen *run*-Funktion implementiert. Die Generierung von Arbeiterprozessen und die dynamische Zuordnung von Arbeitspaketen kann dann weiter unten in der nächsten Zelle eingesehen werden.

Zu Beginn der *run()*-Funktion sind einige Hilfsmethoden definiert, welche verschiedene später aus dem Tweet extrahierte Merkmale in einen festen Wertebereich $[-1; 1]$ normieren. Beispielsweise wird die UTC-referenzierte Uhrzeit, wie im Bericht beschrieben, so konvertiert, dass ein Wert von $-1$ auf Mitternacht, $0$ auf Mittags und $1$ auf 24 Uhr des nächsten Tages referenziert. Alle anderen Tageszeiten werden linear auf einen entsprechenden Wert dazwischen interpoliert.

Die weiter unten stehende Hilfsfunktion *tweet_to_dict()* führt die eigentliche Konvertierung von **exakt einem** Tweetobjekt durch. Konkret nimmt die Funktion die JSON-Repräsentation eines Tweets entgegen, d.h. ein Dictionary, und gibt schließlich wieder ein Dictionary aus. Das Ausgabedict ist eine kompaktere Form eines Tweets, welches als Schlüssel nur die, für die Vorhersage relevanten, Merkmale enthält. Die zugehörigen Wertepaare dieser Schlüssel werden jedoch - je nach benötigter Vorverarbeitung - nur teilweise in die schlussendliche numerische Form konvertiert. Für zeitliche oder popularitätsbezogene Merkmale wird dies beispielsweise direkt durchgeführt. Profil-Location von Nutzern oder Tweet-Texte werden zwar extrahiert und persistiert, müssen jedoch nachträglich von anderen Komponenten weiterverarbeitet werden.

Unter soeben beschriebener Hilfsmethode (d.h. ab dem Kommentarblock namens "*continuation of work flow*") findet nun die eigentliche Verarbeitung von zugewiesenen Tweet Files statt. Wie bereits beschrieben wird ein Tweet in mehrere temporäre Ergebnisfiles persistiert, die attributsbezogen gruppiert sind. So entstehen aus einem Tweet-File im *geotagged/*-Ordner drei neue Files:
- ein gleichnamiges **.texts*-File, welches nur eine vorverarbeitete Version des Tweettextes (zur Vorhersage eines Text-Modells) enthält,
- ein gleichnamiges **.geocode*-File, welches nur eine vorverarbeitete Version der Profil-Location (zur Vorhersage eines Geocoders) enthält und
- gleichnamiges **.features*-File, welches alle verbleibenden Merkmale als CSV-Datei ablegt. Numerische Merkmale werden hierin direkt in die entsprechende Feature-Repräsentation überführt. Kategorische Merkmale, bspw. Sprache oder Quellplattform verbleiben im CSV-File zunächst in ihrer String-Repräsentation und werden im nachfolgenden Machine Learning Skript durch entsprechende Encoder in eine numerische Repräsentation überführt. 

Wie im Code ersichtlich, wird jedem der drei Files zunächst eine Header-Line hinzugefügt, sodass die nachträgliche Zuordnung von Werten zu Merkmalen problemlos erfolgen kann. Danach wird der als Parameter übergebene File-Path verarbeitet. Dieser verweist auf das Tweet-File, welches der aktuelle Prozess zu verarbeiten hat. Dieses wird zeilenweise (also Tweet-weise) ausgelesen und der entsprechende Tweet der oben beschriebenen *tweet_to_dict()*-Hilfsfunktion übergeben. Das zurückgelieferte und aufbereitete Tweetfile wird dann Komponentenweise in die drei Ausgabedateien persistiert.

In [36]:
def run(file_path):   
    # in einem parallelisierten Umfeld müssen geteilte Variablen explizit deklariert werden
    global TSTAMP_OFFSET
    global path_to_place_model
    place_model = pickle.load(open(path_to_place_model, "rb"))
    out_file = "data/feature_files/" + file_path[:-6].split("/")[-1]
    
    """
        Define some normalization functions for converting tweets to feature vectors
    """
    def to_hour(timestamp_ms):
        return ((timestamp_ms / 1000) % 86400) / 3600

    def norm_time(timestamp_ms):
        return (to_hour(timestamp_ms) - 12) / 12  # -1 = 0 Uhr UTC, 0 = 12 Uhr UTC, 1 = 24 Uhr UTC
    
    def norm_ego(count, max_exp=5):
        count = np.log10(count + 1)
        count = count if count <= max_exp else max_exp  # max_exp gibt an, ab welchen werten auf 1 skaliert wird
        return (count - max_exp/2) / (max_exp/2)
    
    def predict_language(text):
        pred = cld3.get_language(text)
        try:
            if pred.is_reliable:
                return pred.language
        except:
            pass
        return None
    
    """
        This function extracts the features of a single tweet
    """
    def tweet_to_dict(tweet):
        d = {}
        
        # obtain label
        place = tweet.get("place")
        
        if place is None:
            return None
        
        try:
            place = place.get("bounding_box").get("coordinates")
        except:
            return None
        
        place = np.asarray(place)[0].mean(axis=0).reshape(1, -1)
        place_T = np.asarray([place[0][1], place[0][0]]).reshape(1, -1)
        d["label"] = place_model.predict(place_T)[0]
        
        # Text Preprocessing
        text = re.sub(r'[\s]+', ' ',  # remove multiple space characters
          re.sub(r'\b\w{1,2}\b', '',  # remove tokens with less than 3 characters
          re.sub(r'[^\p{Arabic}\p{Greek}\p{Cyrillic}\p{Latin}\p{Han}\p{Hangul} ]', '',  # some dirty multilingual regex ;-)
          re.sub(r'[\n\t]', ' ',  # remove linebreaks and tabs
          re.sub(r'@\w+', '',  # remove @ annotations
          re.sub(r'https?:\/\/.*[\r\n]*', '',
          tweet.get("text"))))))).strip()  # remove urls
        
        d["text"] = text
        
        # TID, TS, DC, WT
        tid = int(tweet.get("id_str"))
        ts = (tid >> 22) + TSTAMP_OFFSET 
        dc = (tid >> 17) & 0b11111
        wt = (tid >> 12) & 0b11111
        d["ts"] = norm_time(ts)
        d["dc"] = dc
        d["wt"] = wt
                        
        # Language
        lang = tweet.get("lang")
        
        # restrict on some languages for the prototype
        languages = ["en", "de", "ru", "fr", "es", "pt", "ar", "zh", "ko"]  
        
        if lang not in languages:
            return None
        
        d["lang"] = lang
 
        
        # User Obj.
        if tweet.get("user") is None:
            d['user_friends'] = None
            d['user_followers'] = None
            d['user_statuses'] = None
            d['user_lists'] = None
            d['user_account_created'] = None
            d['user_pred_lang'] = None
            d['user_place'] = None
        else:
            user_friends = tweet.get("user", {}).get("friends_count") 
            user_followers = tweet.get("user", {}).get("followers_count") 
            user_statuses = tweet.get("user", {}).get("statuses_count") 
            user_lists = tweet.get("user", {}).get("listed_count")
            user_account_created = tweet.get("user", {}).get("created_at") 
            user_pred_lang = None
            try:
                user_pred_lang = tweet.get("user", {}).get("description", "").strip().replace("\n", " ").replace(";", " ")
            except:
                pass
            user_place = None
            try:
                user_place = tweet.get("user", {}).get("location", "").strip().replace("\n", " ").replace(";", " ").replace(",", " ")
            except:
                pass
            
            if user_friends is None:
                d['user_friends'] = None
            else:
                d['user_friends'] = norm_ego(user_friends)
            if user_followers is None:
                d['user_followers'] = None
            else:
                d['user_followers'] = norm_ego(user_followers)
            if user_statuses is None:
                d['user_statuses'] = None
            else:
                d['user_statuses'] = norm_ego(user_statuses)
            if user_lists is None:
                d['user_lists'] = None
            else:
                d['user_lists'] = norm_ego(user_lists)
            if user_account_created is None:
                d['user_account_created'] = None
            else:
                user_account_created = datetime.datetime.strptime(user_account_created, "%a %b %d %H:%M:%S %z %Y")
                d['user_account_created'] = norm_time(user_account_created.timestamp())
            if user_pred_lang is None:
                d['user_pred_lang'] = None
            else:
                user_pred_lang = predict_language(user_pred_lang)
                d['user_pred_lang'] = user_pred_lang if user_pred_lang is not None else d["lang"]
            if user_place is None:
                d['user_place'] = None
            else:
                d['user_place'] = user_place if len(user_place) > 1 else None

        source = BeautifulSoup(tweet.get("source")).text
        d["source"] = source
        
        return d
    
    """
        Continuation of work flow
    """    
    texts_file = open(out_file + "texts", "w")
    geocode_file = open(out_file + "geocode", "w")
    feature_file = open(out_file + "features", "w")
    label_file = open(out_file + "label", "w")
    
    # Write Headers
    texts_file.write("text\n")
    geocode_file.write("profile_location\n")
    feature_file.write("label;post_created;dc;wt;language;friends;followers;statuses;lists;account_created;profile_language;source\n")
    label_file.write("label\n")
    
    debug_text = False
    
    with bz2.open(file_path, "rt") as file:
        # for line in file:
        for line in file:
            tweet = json.loads(line)
            tweet_dict = tweet_to_dict(tweet)
                        
            if tweet_dict is None:  # out of sample language
                if debug_text:
                    print("< Skipping tweet with out of sample language>")
                continue

            if debug_text:  # if console output for debugging
                for k, v in tweet_dict.items():
                    print(">>{}: {}".format(k, v))
                print("--------------------------")
            
            # persist tweet
            label_file.write("{}\n".format(tweet_dict["label"]))
            texts_file.write("{}\n".format(tweet_dict["text"]))
            geocode_file.write("{}\n".format(tweet_dict["user_place"]))
            del tweet_dict["text"]
            del tweet_dict["user_place"]
            feature_file.write("{}\n".format(";".join([str(v) for v in tweet_dict.values()])))

Nachfolgende Zelle zeigt die Zuordnung von Tweetfiles zu individuellen Prozessen. Alle Geotweets sind hierbei zuvor in den Ordner *data/geotagged/* extrahiert worden (vgl. vorherige Skripte). Alle in diesem Ordner enthaltenene Dateipfade werden zunächst aufgelistet und in der *files* Variable niedergeschrieben. Für jeden Dateipfad wird ein eigener *worker*-Prozess erzeugt, welcher mit Hilfe der oben beschriebenen *run()*-Methode aus den im Dateipfad enthaltenen Tweets drei Ausgabedateien erzeugt.

Nachdem die Prozesse gespawnt wurden, müssen sie mit *worker.start()* gestartet werden. Mit *worker.join()* kann im Hauptprozess ein Wartepunkt gesetzt werden, der die nachfolgende Programmausführt so lange verzögert, bis alle Arbeiterprozesse abgeschlossen sind.

In [37]:
TSTAMP_OFFSET = 1288834974657

folder = "data/geotagged/"
files = sorted([folder + f for f in listdir(folder) if isfile(join(folder, f)) and f[-6:] == "result"])
path_to_place_model = "kmeans_150.pcl"


workers = [Process(target=run, args=(file,)) for file in files]

for w in workers:
    w.start()
    
for w in workers:
    w.join()
    
print("All workers finished")

All workers finished


<h2>2. Geocoding von Profil-Locations</h2>

Geocoding bezieht sich auf das Verfahren, einen Ortstext auf eine konkrete Weltkoordinate abzubilden. Prinzipiell existieren dafür mehrere Dienste, welche wahlweise open source oder bezahlt sind. Bei Open Source besteht prinzipiell die Problematik, dass meist Anfragelimitierungen existieren. Im Projekt wurde das Geocoding über eine interne Datenbank umgesetzt, wie unten gezeigt. Gleichzeitig wird aber exemplarisch eine alternative Implementierung über das OSM-Tool Nominatim angedeutet.

Wir beginnen zunächst mit der internen Lösung mit der Datenbank *clickhouse*.

In [7]:
def getLatLong(synonym_list):
    conn = connect('clickhouse://default:clickhouse2020@localhost/superset')
    cursor = conn.cursor()
    cursor.execute("SELECT latitude, longitude , synonym\
        FROM cities \
        WHERE synonym in %(synonym)s AND population >= 1000 \
        ORDER BY synonym, population DESC \
        LIMIT 1 BY synonym", {'synonym': synonym_list})
    cursor.close()
    conn.close()
    records = cursor.fetchall()
    iResults = len(records)
    return {x[2]:x[:2] for x in records}

Clickhouse erlaubt Batchanfragen, was bei der Verarbeitung größerer Mengen an Tweets von immensem Vorteil gegenüber einzelnen Anfragen ist. Wie unten im Code ersichtlich, ist die Batchgröße auf $200$ gesetzt. 

Die Profil-Locations von Nutzern wurden bereits im vorherigen Skript *1-CreateLabels.ipynb* extrahiert und aufbereitet. Die entsprechenden Location-Strings wurden daraufhin in entsprechenden data/feature_files/\*.geocode-Dateien abgelegt. Diese Dateien werden nun schrittweise ausgelesen und der darin enthaltene Location-Text nach weiterer Vorverarbeitung schrittweise in die Anfragebatches eingefüllt. Wie die Vorverarbeitung aufgebaut ist, wird weiter unten beschrieben.

Nach der Verarbeitung der Datenbank werden die Geocodes als Lat/Long-Paare zurückgeliefert. Die *profile_location*, wie sie auch vom Algorithmus verarbeitet wird, erwartet jedoch keine Koordinatenpaare sondern diskrete Labels als Featurewerte. Wir nutzen hierzu das kMeans-Clustering Modell, welches bereits für die Erstellung der Zielklassen verwendet wird. Mit Hilfe der *predict()* Funktion können somit die Koordinaten auf eines der $150$ Zielklassen abgebildet werden, falls die Geocoding Anfrage zu einem Ergebnis geführt hat. In jedem anderen Fall (bspw. dann, wenn der Profilort leer war), wird ein Defaultwert von $-1$ zugeordnet. Die Ergebnisse aller Tweets werden dann schrittweise in einem neu angelegten File **.loc* abgelegt.

Hier noch einige Notizen zur Vorverarbeitung. Clickhouse (als verwendeter Geocoder) ist in der Lage, Synonyme von Ortsnamen entgegen zu nehmen. Bei der Disambiguierung mehrdeutiger Namen referenziert die Datenbank jedoch immer auf den Ort mit der größten Übereinstimmung oder bei mehreren Orten auf denjenigen mit den meisten Einwohnern. Umgekehrt ist die Datenbank (anders als viele andere Geocoder) jedoch nicht immer in der Lage, Landeskürzel oder variierende Schreibweisen verlässlich vorherzusagen, insbesondere dann, wenn der Ort aus mehreren Wörtern besteht. Um den Output des Geocoders zu verbessern, wurde ein provisorisches (jedoch nicht ausgereiftes) Preprocessing durchgeführt. Hierzu wurde in unten stehendem Codefragment schrittweise versucht, aus dem Ortstext zunächst einen Ort mit drei, dann mit zwei und zuletzt mit einem Wort zu extrahieren und vorherzusagen. Dediziertere Geocoder kommen auch ohne eine derartige Form der Vorverarbeitung aus.

In [85]:
batch_size = 200

folder = "data/feature_files/"
files = sorted([folder + f for f in listdir(folder) if isfile(join(folder, f)) and f[-7:] == "geocode"])
path_to_place_model = "kmeans_150.pcl"

place_model = pickle.load(open(path_to_place_model, "rb"))

# Keep the model sequential to not overload our database
for file in tqdm(files):
    out_file = open(folder + file.split("/")[-1][:-7] + "loc", "w")
    out_file.write("profile_location\n")
    
    # geocode one file
    with open(file, "r") as f:
        next(f)  # skip header
        
        # iterate as long as new batches are available
        data_available = True
        while data_available:

            # get next batch
            batch_raw = []
            try:
                # try to access next line until either batch_size or EOF is reached
                for _ in range(batch_size):
                    # some text preprocessing
                    text = next(f).strip()
                    text = re.sub(r'[\s]+', ' ',  # remove multiple space characters
                      re.sub(r'[^\p{Arabic}\p{Greek}\p{Cyrillic}\p{Latin}\p{Han}\p{Hangul} ]', '', text))
                    # add to batch
                    batch_raw.append(text)
            except StopIteration:
                # EOF reached
                data_available = False
                
            result = {}

            """
                First Iteration: resolve location descriptions with three words
            """
            # initialize some variables for request and mapping
            batch = []
            indices = []

            # Iterate over all location descriptions
            for i, item in enumerate(batch_raw): 
                splits = item.split(" ")
                # and filter on those with three or more words
                if len(splits) > 2:
                    # utilize the first three words while capitalizing the first and third one
                    # example: Rio de Janeiro
                    batch.append(" ".join([splits[0].capitalize(), splits[1], splits[2].capitalize()]))
                    # add index of location description for mapping the position within the batch
                    indices.append(i)

            # result is a dictionary of the form {"requested_item_name": (lat, long), ...}
            res = getLatLong(batch)
            # map from item name back to its position within the batch
            for k, v in res.items():
                result[indices[batch.index(k)]] = v

            """
                Second Iteration: resolve location descriptions with two words
            """

            # reset variables
            batch = []
            indices = []

            # iterate over all location descriptions
            for i, item in enumerate(batch_raw):
                splits = batch_raw[i].split(" ")
                # and filter on those with two or more words (that also yielded no results for three words)
                if len(splits) > 1 and i not in result:
                    # utilize the first two words and capitalizing both
                    # example: Sao Paolo, New York
                    batch.append(" ".join(splits[:2]).title())
                    # map from item name back to its position within the batch
                    indices.append(i)

            res = getLatLong(batch)
            for k, v in res.items():
                result[indices[batch.index(k)]] = v

            """
                Third Iteration: resolve location descriptions with one word
            """
            # reset variables
            batch = []
            indices = []

            # Iterate over all location descriptions
            for i, item in enumerate(batch_raw): 
                if i not in result:
                    batch.append(item.split(" ")[0].capitalize())
                    indices.append(i)

            res = getLatLong(batch)

            for k, v in res.items():
                result[indices[batch.index(k)]] = v

            """
                Convert (lat, long)-coordinates to labels
            """
            for k, v in result.items():
                result[k] = place_model.predict(np.asarray(v).reshape(1, -1))[0]

            """
                Persist results
            """

            # write one line per item in the batch
            for i in range(len(batch_raw)):
                # write predicted label if present
                if i in result:
                    out_file.write("{}\n".format(result[i]))
                else:
                # else write -1
                    out_file.write("-1\n")

            # force the file to write lines
            out_file.flush()
             
        # end while data available
    # end with open
# end for file

100%|██████████| 25/25 [00:50<00:00,  2.03s/it]


Anbei ist ein Codebeispiel mit Nominatim zu sehen. Zunächst muss unter der Angabe eines aussagekräftigen *user_agent* Namens ein Nominatim-Client erstellt werden. Mit der Schnittstelle *client*.geocode(*str*) kann ein einzelner übergebener String geparsed werden. Existiert eine Rückgabe $\neq$ *None*, so kann durch die beiden Attribute *tweet_location.latitude* und *tweet_location.longitude* die Weltkoordinate entnommen werden. Diese wird, äquivalent zu obigem Codefragment, mit dem kMeans-Clusteringmodell in eine Polygon-ID vorhergesagt, welche dann alternativ in das **.loc*-File als Ausgabe geschrieben werden könnte.

In [32]:
geolocator = Nominatim(user_agent="Tweetlocator_workshop")

folder = "data/feature_files/"
files = sorted([folder + f for f in listdir(folder) if isfile(join(folder, f)) and f[-7:] == "geocode"])

path_to_place_model = "kmeans_150.pcl"
kmeans = pickle.load(open(path_to_place_model, "rb"))
max_count = 5


# geocode one file
with open(files[0], "r") as f:
    next(f)  # skip header

    # iterate as long as new batches are available
    data_available = True
    while data_available:
        try:
            # try to access next line until either batch_size or EOF is reached
            text = next(f).strip()
            text = re.sub(r'[\s]+', ' ',  # remove multiple space characters
              re.sub(r'[^\p{Arabic}\p{Greek}\p{Cyrillic}\p{Latin}\p{Han}\p{Hangul} ]', '', text))
            if str(text) != "None":
                tweet_location = geolocator.geocode(text)
                predicted_cluster = kmeans.predict(np.asarray((tweet_location.latitude, tweet_location.longitude)).reshape(1, -1))

                print("Location '{}' is translated to coordinates ({}, {}). Cluster Prediction: {}".format(text, 
                                                                                   tweet_location.latitude, 
                                                                                   tweet_location.longitude,
                                                                                    predicted_cluster))
                max_count -= 1

            if max_count == 0:
                data_available = False
        except StopIteration:
            # EOF reached
            data_available = False

Location 'Meath' is translated to coordinates (53.649784350000004, -6.588529492009938). Cluster Prediction: [72]
Location 'Mossoró Brasil' is translated to coordinates (-5.1904332, -37.3443872). Cluster Prediction: [25]
Location 'London' is translated to coordinates (51.5073219, -0.1276474). Cluster Prediction: [85]
Location 'Kuala Terengganu Terengganu' is translated to coordinates (5.3296461, 103.1383265). Cluster Prediction: [59]
Location 'Jersey City NJ' is translated to coordinates (40.7215682, -74.047455). Cluster Prediction: [29]


<h2>3. Extraktion von lokalen Worten</h2>

Für die optionale Extraktion bzw. das Filtering von lokalen Worten kann beispielsweise ein Inverse Location Frequency (ILF) Score pro Wort im Vokabular berechnet werden. In obigem Arbeitsablauf ist diese Komponente nicht implementiert, nachfolgend soll aber gezeigt werden, wie eine derartige Filterliste erstellt werden kann. Diese könnte dann im obigen Ablauf eingebaut werden.

Zur Erstellung der Filterliste wird wieder ein Trainingskorpus mit gelabelten Texten benötigt. Im unten stehenden Code verwenden wir sehr wenige Tweetfiles (für ein sauberes Anwendungsszenario werden natürlich mehr Daten benötigt).

In [62]:
word_dict = {}
N_documents = 0
min_th = 15

folder = "data/feature_files/"
files = sorted([folder + f for f in listdir(folder) if isfile(join(folder, f)) and f[-5:] == "texts"])

for file in tqdm(files):
    text_file = open(file, "r")
    label_file = open(file.split("texts")[0] + "label", "r")

    next(text_file)
    next(label_file)


    # Sammle Informationen über Wörter des Textes
    data_available = True
    while data_available:
        try:
            text = next(text_file)
            label = next(label_file)

            words = text.split(" ")
            if label is not "-1":
                for word in words:
                    if word not in word_dict:
                        word_dict[word] = np.zeros(150)
                    word_dict[word][int(label)] += 1
                N_documents += 1
        except StopIteration:
            # EOF
            data_available = False
        
# Berechne ILF Score für Wörter des Textes
ILF_scores = []
for k, v in word_dict.items():
    #  Wort muss mindestens min_th mal vorgekommen sein, damit es nicht als Noise angesehen wird
    if np.sum(v) > min_th:  
        ILF_scores.append((np.log(N_documents/np.count_nonzero(v)), k))
        
# sortiere und filtere nach den ortsbezogensten Wörtern
ILF_scores = sorted(ILF_scores, reverse=True)
max_local_words = 20
for elt in ILF_scores[:max_local_words]:
    print(elt)


100%|██████████| 25/25 [00:00<00:00, 54.29it/s]


(10.745808764210109, 'pdx\n')
(10.745808764210109, 'Rom')
(10.745808764210109, 'PORT')
(10.745808764210109, 'IMFC\n')
(10.745808764210109, 'Buenos')
(10.052661583650163, 'بالخاطركلام')
(10.052661583650163, 'HOPE')
(10.052661583650163, 'Acción')
(9.647196475542, 'ايه')
(9.647196475542, 'tragedy')
(9.647196475542, 'Portland')
(9.647196475542, 'Florida\n')
(9.647196475542, 'Avenue')
(9.359514403090218, 'und')
(9.359514403090218, 'satélites')
(9.359514403090218, 'passando')
(9.359514403090218, 'kkkkkkkkkk\n')
(9.359514403090218, 'Sem')
(9.359514403090218, 'Janeiro\n')
(9.359514403090218, 'California\n')


<h2> 4. Language Ensemble </h2>

<h2>5. FastText Skripte</h2>

Die FastText Skripte werden auf der Kommandozeile ausgeführt. Prinzipiell kann auch das Python-Modul fastText mit identischer Funktionalität importiert werden. Hierdurch müssen jedoch Performanceeinbußen in Kauf genommen werden. Die nachfolgende Anleitung zeigt, wie ein beispielhafter Ablauf stattdessen auf der Kommandozeile durchgeführt werden kann. Es wird im Folgenden vorausgesetzt, dass fastText bereits installiert ist. Sämtliche nachfolgenden Schritte können im Ordner *text_embeddings* durchgeführt werden.

*cd text_embeddings/*

**Trainingspart**

Die Umwandlung von Texten erfolgt in zwei Schritten. Zunächst wird der Tweet-Text in eine Satzvektorrepräsentation überführt und diese im nächsten Schritt vom neuronalen Netz des fastText Modells vorhergesagt. Die Satzvektorrepräsentation muss entweder in einem unüberwachten Trainingsschritt gelernt werden. Alternativ kann eine vortrainierte Version bereits heruntergeladen werden. Wie im Bericht beschrieben, arbeiten wir mit vortrainierten Vektoren verschiedener sprachen, welche sich in einem ausgerichteten Vektorraum befinden.

In der Datei *emb_locations.txt* sind die URLs von vortrainierten, ausgerichteten Vektorfiles enthalten. Mit Hilfe des Skriptes *crawl_pretrained_embeddings.sh* können diese heruntergeladen werden (Vorsicht: Größe aller Daten zusammen etwa 24 GB).

*chmod +x crawl_pretrained_embeddings.sh*

*./crawl_pretrained_embeddings.sh*

Wir arbeiten mit einem kombinierten, d.h. sprachunabhängigen Vektorfile. Daher müssen alle Teilsprachen zusammengefügt werden:

*cat *.vec > multi.vec*

Das neu enstandende *multi.vec* File enthält alle Embedding-Repräsentationen von Wörtern aller gewünschter Sprachen. Mit diesem kann nun der zweite Schritt durchgeführt werden, d.h. in einem überwachten Lernprozess können Wortvektoren auf eines der 150 Labels abgebildet werden. Der Übersichtlichkeit halber wird bereits ein Trainingsfile von Tweettexten mit zugehörigen Labels bereit gestellt, siehe die Dateien *train.texts* bzw. *train.labels*.

*head tweets.texts*

*head tweets.labels*

Für den supervised Part erwartet FastText jedoch ein einziges Inputfile, welches Labels und Text kombiniert in folgender Form bereit stellt:

\_\_label\_\_*label*$_1$ *text*$_1$ 

\_\_label\_\_*label*$_2$ *text*$_2$ ...

Wir führen also eine kurze Konvertierung durch:

*paste -d ' ' tweets.labels tweets.texts > tweets.combined*

*sed -i -e 's/^/\_\_label\_\_/' tweets.combined*



Bei Betrachtung sieht das entstandene Files gut aus:

*head tweets.combined*

Ein sauberer Trainings- und Evaluationsprozess benötigt eigentlich eine saubere Trennung von Trainings-, Test- und Validierungsdaten. Das Skript *train_test_split.sh* hat eine solche Trennung für eine 10-fache CV bereits vorimplementiert. Prinzipiell wird der anfängliche Datensatz aus *train.combined* in zehn Teile zertrennt und diese im Verhältnis von 8:1:1 auf Trainings-, Test- und Validierungsdaten für eben 10 Folds aufgeteilt. Für eine möglichst faire Verteilung bietet sich zuvor noch ein Shuffling des Datensatzes an.

*shuf tweets.combined -o tweets.combined*

*chmod +x train_test_split.sh*

*./train_test_split.sh*

Es wird hierdurch eine Menge an temporären Files im aktuellen Ordner erstellt. Die *tweets-split.**-Dateien beziehen sich auf die 10 Splits, *tweets.trainN*, *tweets.testN* und *tweets.validN* auf die drei Sets für den Split Nummer $N$.

Das eigentliche (supervised) Training findet im nächsten Schritt statt. Theoretisch müsste nur ein Modell trainiert werden, will man jedoch gleich die Modellgüte per Kreuzvalidierung prüfen, so müssen natürlich alle zehn Splits betrachtet werden. Das Skript *train.sh* fasst den Trainingsprozess in einem File zusammen.

*chmod +x train.sh*

*./train.sh*

Hier soll noch kurz auf die Parameter des Trainings eingegangen werden. Betrachten wir beispielsweise die erste Zeile des *train.sh* Skripts:

*head -n1 train.sh* liefert

fasttext supervised -input tweets.train1 -output tweets.model1 -pretrainedVectors multi.vec -dim 300 -autotune-validation tweets.valid1 -autotune-duration 86400 -thread 40

Beim Aufruf des fasttext-Tools kann mit dem Parameter *supervised* der überwachte Trainingsprozess angestoßen werden. Dieser erwartet in jedem Fall (via **-input**) ein Trainings- und (via **-output**) ein Outputfile. 

Da wir bereits mit fertigen Wortvektoren arbeiten, müssen diese auch übergeben werden via **-pretrainedVectors**. Ohne Parametertuning arbeitet FastText mit genau $100$ Dimensionen für die Embeddings. Die vortrainierten Embeddings bestehen jedoch aus 300 Dimensionen, weswegen auch dies als Parameter angegeben werden muss: **-dim 300**. 

Schlussendlich kann vor dem Training ein Hyperparametertuning mit einem optionalen Validierungsset durchgeführt werden. Hierzu muss das entsprechende File übergeben werden **-autotune-validation**, sowie eine Zeit in Sekunden, welche für das Tuning verwendet werden darf, hier bspw. **-autotune-duration**.

Output des überwachten Trainings sind einerseits neue **.vec* Vektordateien (werden nicht benötigt, da wir ja bereits unser eigenes File besitzen), sowie die **fertigen *.bin Modelle**. Diese brauchen wir natürlich schon für die Vorhersage.

Optional können die trainierten Modelle an dieser Stelle noch evaluiert werden. Fasttext bietet dazu das Kommandozeilenargument **fasttext test [model] [test-data][k][th]**. **model** bezieht sich auf eines der soeben erhaltenen **.bin*-Modelle, **test-data** auf eines der *tweets.testN*-Files, $k$ und $th$ auf optionale Optimierungen mit der Vorhersage der $k$ besten Labels, welche jeweils einen minimalen Wahrscheinelichkeits-Grenzwert von $th$ erreichen müssen. Das Skript *test.sh* führt für alle zehn Folds eine Reihe von Tests mit variierenden Werten für $k\in[1;3]$ und $th=0.15\cdot i$ mit $i\in[0;4]$ durch.

*chmod +x test.sh*

*./test.sh*

Die Ergebnisse können in Textform in den neu entstehenden *tweets.evalN* Dateien ausgelesen werden.

**Anwendungspart**

Für die Konvertierung und Vorhersage von Texten aus Tweets wird das soeben trainierte *tweets.modelN.bin* File benötigt.

Ein derartiges Skript ist auch für diesen Task in der Datei *predict_texts.sh* vorimplementiert. Der Aufbau ist hier ähnlich wie zum bereits ausgeführten *0-filter_on_geotagged_tweets.sh*-Preprocessingskript.

Zunächst werden sämtliche *.texts*-Dateien aus dem am Anfang dieses Notebooks angegebenen *feature_files*-Ordners aufgelistet und jeder Dateipfad an die Funktion *processfile()* übergeben. In dieser wird jede Zeile des Dokuments mit dem Modell vorhergesagt, wobei als optimierte Hyperparameter $k=10$ und $th=0.05$ gesetzt sind. Es kann sinnvoll sein, dass mehr Labels mit niedrigeren Grenzwerten für $th$ zurückgeliefert werden. Hier können die Parameter nach eigenem Ermessen variiert werden. Die Ausgabe der probabilistischen Vorhersage wird zeilenweise in ein gleichnamiges **.pred* File geschrieben.

*head -n3 *.pred* liefert beispielsweise

\_\_label\_\_84 0.951783

\_\_label\_\_56 0.741892 \_\_label\_\_28 0.133467 \_\_label\_\_84 0.100436

\_\_label\_\_56 0.630091 \_\_label\_\_23 0.155819 \_\_label\_\_42 0.140562

Diese Ergebnisse sind also bereits die Vorhersageergebnisse des Algorithmus, aber müssen noch etwas aufbereitet werden, bspw. in eine Matrixdarstellung. Wir verwenden dazu wieder eine ähnliche dreistufige Pipeline wie bereits bekannt:
1. zunächst werden sämtliche **.pred* Files aufgelistet und diese als Work-Batches auf verschiedene Arbeiterprozesse aufgeteilt
2. Jeder Arbeiterprozess verarbeitet ein File, d.h. einige Tausend Elemente. Er tut dies, indem er das File zeilenweise ausliest, d.h. jeweils einen Tweet individuell betrachtet, und diesen 
3. gemäß einer festen Verarbeitungsvorschrift konvertiert.

In [86]:
# 3. Konvertierung eines Tweets (bzw. des Vorhersageergebnisses eines Tweets) in Vektorrepräsentation

def embedding_prediction_to_distribution(line):
    vec = np.zeros(N_classes + 1)  # probability distribution

    splits = line.split("__label__")

    if len(splits) == 1:  # no prediction possible
        vec[-1] = 1
    else:
        for split in splits[1:]:
            probs = split.split(" ")
            vec[int(probs[0])-1] = float(probs[1])
        vec[-1] = 1 - np.sum(vec)
    
    return vec

In [100]:
# 2. Konvertierung eines Text-Vorhersagefiles in eine Matrixrepräsentation (+Persistierung)
def run(file_list, show_progress=False):
    parts = []
    
    if show_progress:
        for file in tqdm(file_list):
            X = np.empty((0, N_classes + 1))
            X = csr_matrix(X)

            n_rows = 0
            with open(file, "r") as f:
                has_next = True

                while has_next:

                    try:
                        line = next(f).strip()
                        parts.append(embedding_prediction_to_distribution(line))
                        n_rows += 1
                    except StopIteration:
                        has_next = False
                        continue

            X = vstack((X, parts))
            save_npz(file + ".npz", X)
            if X.shape[0] != n_rows:
                print(X.shape, n_rows, file)
            parts = []
    else:
        for file in file_list:
            X = np.empty((0, N_classes + 1))
            X = csr_matrix(X)
            n_rows = 0

            with open(file, "r") as f:
                has_next = True

                while has_next:
                    try:
                        n_rows += 1
                        line = next(f).strip()
                        parts.append(embedding_prediction_to_distribution(line))

                    except StopIteration:
                        has_next = False
                        continue

            X = vstack((X, parts))
            save_npz(file + ".npz", X)
            print(X.shape, n_rows)
            parts = []

In [101]:
# 1. Identifikation von Vorhersage-Files und Zuordnung der Workbatches
N_cores = 1
path = "data/feature_files/"
N_classes = 150

files = [path + f for f in listdir(path) if f[-4:] == "pred" and isfile(join(path, f))]

batch_size = math.ceil(len(files) / N_cores)

workers = [Process(target=run, args=(files[i*batch_size:(i+1)*batch_size], i == 0)) for i in range(N_cores)]

for w in workers:
    w.start()
    
for w in workers:
    w.join()

100%|██████████| 25/25 [00:00<00:00, 43.56it/s]


Hier kann das Ergebnis der Text-Vorhersage eines einzelnen Tweets eingesehen werden. Die Ergebnismatrix für das komplette File ist als Sparse Matrix implementiert, um Speicherplatz zu sparen. Um die Matrix einzusehen, empfiehlt es sich daher zunächst, mit der Funktion *todense()* die Konvertierung in eine herkömmliche (dicht besetzte) Matrix durchzuführen.

Wie unten ersichtlich, ist der Output ein $151$-dimensionaler Vektor. Die ersten $150$ Dimensionen korrespondieren zu den Wahrscheinlichkeiten, dass der Text dem Modell zufolge einer der $150$ Zielklassen entstammt, **falls** der Wahrscheinlichkeitswert über $0.05$ liegt (sonst ist er auf $0$ gesetzt). Dei letzte, d.h. $151.$ Dimension gibt die Gegenwahrscheinlichkeit, also den Rauschfaktor des Ergebnisses an. Dieser Wert ist letztlich die Summe der Wahrscheinlichkeiten aller Zielklassen $< 0.05$.

In [84]:
X = load_npz(path + "tweets-2020-02-01_00:03:35.xz.texts.pred.npz")
X.todense()[2]

matrix([[0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.140562,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.630091, 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.      ,
         0.      , 0.      , 0.      , 0.      , 0.      , 0.155819,
         0.      , 0.      , 0.   