# Benzinga-Nachrichten-Verarbeitung

## Setup up

### Cluster spin up

In [1]:
import sys, io, tarfile, os
import subprocess
from src.cloud_manager import spin_up_cluster, fn_to_targz_string
from distributed.diagnostics.plugin import WorkerPlugin
from dask.distributed import Client
import dask
from concurrent.futures import ProcessPoolExecutor

# using dask.config.set doesnt seem to work and other config files seem to take precedence in the scheduler
# ttl is 5 minutes, even when setting ttl to None or something else using dask.set.config (executed on the scheduler)
dask_configs = {
    "distributed.comm.retry.count": 10,
    "distributed.comm.timeouts.connect":60,
    #3 To prevent warnings: Event loop was unresponsive, see dask issue #3484
    'admin.tick.interval': "20ms",
    'admin.tick.limit': "30min",
    'distributed.scheduler.worker-ttl':None
 }
env_vars = dict()
for key in dask_configs:
    env = key.upper().replace(".", "__").replace("-", "_")
    env_var = f"DASK__{env}"
    env_vars[env_var] = str(dask_configs[key])
env_vars

{'DASK__DISTRIBUTED__COMM__RETRY__COUNT': '10',
 'DASK__DISTRIBUTED__COMM__TIMEOUTS__CONNECT': '60',
 'DASK__ADMIN__TICK__INTERVAL': '20ms',
 'DASK__ADMIN__TICK__LIMIT': '30min',
 'DASK__DISTRIBUTED__SCHEDULER__WORKER_TTL': 'None'}

In [2]:
# Cant extract this out of the notebook as then import would be missing...
def extract_targz_string(s,*args,**kwargs):
    with io.BytesIO() as bt:
        bt.write(s)
        bt.seek(0)
        with tarfile.open(fileobj=bt,mode='r:gz') as tf:
            tf.extractall(*args,**kwargs)


class AddProcessPool(WorkerPlugin):
    def setup(self, worker):
        executor = ProcessPoolExecutor(max_workers=worker.nthreads)
        worker.executors["processes"] = executor


class SourceDirectoryCopier(WorkerPlugin):
    def __init__(self, targz):
      self.targz = targz
      super().__init__()

    def setup(self, worker=None, **kwargs):
        self.worker = worker
        extract_targz_string(self.targz)
        sys.path.append("/")
        print("Added a new worker at:", worker)
        subprocess.run([sys.executable, "-m", "pip", "install", "loky"], check=True)


cluster, token = spin_up_cluster()
targz = fn_to_targz_string("src")
plugin = SourceDirectoryCopier(targz)
client: Client = Client(cluster)
client.run_on_scheduler(plugin.setup)
client.register_plugin(plugin)
client.register_plugin(AddProcessPool())

{}

In [4]:
client

0,1
Connection method: Cluster object,Cluster type: dask_kubernetes.KubeCluster
Dashboard: http://localhost:55831/status,

0,1
Dashboard: http://localhost:55831/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.2.18:8786,Workers: 0
Dashboard: http://10.12.2.18:8787/status,Total threads: 0
Started: 2 minutes ago,Total memory: 0 B


In [5]:
client.run_on_scheduler(os.environ.get, "DASK__DISTRIBUTED__SCHEDULER__WORKER_TTL")

In [6]:
client.run_on_scheduler(dask.config.get, "distributed.comm.timeouts.connect")
# distributed.comm.timeouts.connect
# distributed.scheduler.worker-ttl

'30s'

### Source File Copy Preparation

In [None]:
# client.run_on_scheduler(lambda x: sys.path, x=2)
# client.run_on_scheduler(subprocess.run, args='export PYTHONPATH=$PYTHONPATH:', shell=True)
# client.run_on_scheduler(subprocess.run, args='echo "', shell=True)

In [None]:
# def myfunc():
#     from src.preprocessing.news_parser import filter_body
#     return filter_body

# client.run(myfunc)
# client.run_on_scheduler(myfunc)
# x = client.submit(myfunc)
# x.result()

# client.run_on_scheduler()

### System settings

In [None]:
using_colab = False
using_laptop = True
import os

%%capture
if using_colab:
  from google.colab import drive
  drive.mount('/content/drive')

### CUDF für hardware acceleration

In [None]:
# !git clone https://github.com/rapidsai/rapidsai-csp-utils.git
# !python rapidsai-csp-utils/colab/pip-install.py
# import cudf

### Imports

In [None]:
%%capture
if using_colab:
  !sudo apt update
  !sudo apt install maven;

  !pip install pyarrow==11.0.0
  !pip install html2text
  !pip install datefinder
  !pip install -U dask[complete]
  !pip install nltk
  !pip install dateparser
  !pip install pyngrok
  !pip install sutime
  !pip install pyngrok

  # This is required for sutime
  !mvn dependency:copy-dependencies -DoutputDirectory=./jars -f $(python3 -c 'import importlib; import pathlib; print(pathlib.Path(importlib.util.find_spec("sutime").origin).parent / "pom.xml")');

In [7]:
%load_ext autoreload
%autoreload 2
import dask.dataframe as dd
import pandas as pd
from src.preprocessing.news_parser import get_company_abbreviation, yahoo_get_wrapper, \
                                          infer_author, filter_body
# import nltk

# Mainly for topic modeling
# from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.feature_extraction.text import TfidfVectorizer
# import gcsfs
import re

Task exception was never retrieved
future: <Task finished name='Task-48' coro=<PortForward._sync_sockets() done, defined at /opt/conda/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/opt/conda/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/opt/conda/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 664, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/opt/conda/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.Conne

### Setup NGROK tunnel

In [None]:
# ------- Only required when working on google colab, not required when working with google cloud engine -----------------
if using_colab:
  from pyngrok import ngrok, conf
  conf.get_default().auth_token = "2WntwErWDt9LxQ2Jfp6C8OxDAMK_7iZVdC1utyZET1PE8cuUg"

  public_url = ngrok.connect(8787).public_url
  print(" * ngrok tunnel \"{}\" -> \"http://127.0.0.1:{}\"".format(public_url, 8787))

In [None]:
%%capture
if 'client' not in globals():
  !python -m pip install jupyter-server-proxy
  client = Client()

## Grobes HTML-Parsing
Als erstes müssen wir die HTML-Dokumente zu normalem Text umwandeln, ansonsten sind die Text-Zellen zu groß und führen zu Problemen mit PyArrow/Dask.

In [None]:
dask.config.set(scheduler="threads")

In [None]:
input_dir = "data/raw_bzg/"
output_dir = 'data/unraw1_bzg/'

In [None]:
# for year in range(2019, 2020):
#     print(year)
#     df = pd.read_parquet(f"{input_dir}story_df_raw_{year}.parquet")
#     df = dd.from_pandas(df, npartitions=12)
#     df["html_body"] = df["html_body"].apply(body_formatter, meta=pd.Series(dtype="str"))
#     df = df.rename(columns={"html_body":"body"})
#     name_function = lambda x: f"data-{year}-{x}.parquet"
#     df.to_parquet(output_dir, name_function=name_function)

## Neu-Partitionierug
Sodass alle Partitionen etwa die gleiche Größe haben.

In [None]:
input_dir = 'data/unraw1_bzg/'
output_dir = 'data/unraw2_bzg/'

# ddf = dd.read_parquet(input_dir+"*.parquet")
# ddf2 = ddf.repartition(npartitions=50)
# name_function = lambda x: f"data-{x}.parquet"
# ddf2.to_parquet(output_dir, name_function=name_function)

## Author-Inferenz

Ein bisschen die Daten säubern...

In [None]:
input_dir = cwd+'/data/unraw2_bzg/'
output_dir = cwd+'/data/unraw3_bzg/'

In [None]:
ddf = dd.read_parquet(input_dir+"*.parquet")

In [None]:
# Remove rows for which noo stock ticker is recorded
ddf = ddf[ddf.stocks != '']

In [None]:
# Convert `channels`  datatype from string to list
ddf["channels"] = ddf["channels"].apply(eval, meta=pd.Series(dtype='object'))

Untersuche als nächstes die Behauptung, dass **PRNewswire** und **Businesswire** den gesamten Markt für Pressemeldungen in den USA kontrollieren. Wenn dem so ist, und sie nicht noch weitere, unwichtige Meldungen veröffentlichen, dann können wir einfach die Newsartikel nach diesen Autoren filtern und uns viel Arbeit ersparen.

In [None]:
dask.config.set(scheduler="processes")
ddf["inferred_author"] = None
ddf["inferred_author"] = ddf.body.apply(infer_author, meta=pd.Series(dtype="string"))

In [None]:
# value_counts for authors
auhtor_value_counts = pd.concat([ddf.author.value_counts().head(10), ddf.inferred_author.value_counts().head(10)], axis=1)

In [None]:
auhtor_value_counts

In [None]:
auhtor_value_counts.sum().diff()

Ungefähr 650k Nachrichten werden ausgelassen, wenn nur die vier Hauptvertreiber von Pressemeldungen berücksichtigt werden.

In [None]:
ddf = ddf[~ddf.inferred_author.isna()]

In [None]:
ddf["inferred_author"] = ddf["inferred_author"].astype("string")

In [None]:
ddf["channels"] = ddf.channels.apply(lambda x: str(x), meta=pd.Series(dtype="string"))

In [None]:
ddf.inferred_author.value_counts().compute()

In [None]:
ddf.inferred_author.value_counts().sum().compute()

In [None]:
name_function = lambda x: f"data-{x}.parquet"
ddf.to_parquet(output_dir, name_function=name_function)

In [None]:
# Contains 100k rows
earnings_ddf = ddf[ddf.channels.apply(lambda x: "Earnings" in x, meta=pd.Series(dtype=bool))]

In [None]:
# value counts for authors of earnings reports (contrast to value counts of all news articles)
earnings_ddf.inferred_author.value_counts().head(10)

Hier sehen wir, dass es keine einzige Pressemeldung von **Business Wire** gibt, die mit *Earnings* gekennzeichnet sind. Trotzdem gibt es relevante *Earnings* reports von Business Wire. Dies habe ich kurz verifiziert...

## Vorgehen

Benötigt:
---------
- data/tickers.pkl (alt) \\
- data_shared/corporation_endings.txt \\
- input_dir = data/unraw3_bzg/ \\
- output_dir = data/unraw4_bzg/

Produziert:
-----------
- data_shared/all_ticker_name_mapper.parquet

--------------

Wie viele Nachrichten bleiben, wenn wir auf relevante Ticker filtern? Wir wollen nicht(!) - so ist es momentan - auf die momentane Russell 3k-Zusammensetzung filtern, denn wir wollen auch ungelistete bzw. ehemalige Russell-Aktien beachten.


**1. Full-Name-Discovery:**

Herausfinden des vollen Namens des Unternehmens für jeden Ticker, damit 1. der Text richtig geparst werden kann und 2. damit wir einen Anhaltspunkt für das Ticker-Grouping haben.


**2. Ticker-Filtering:**

Alle Ticker herausfiltern, die wir nicht brauchen. Wenn wir aber ein großes Aktienuniversum (mit inzwischen ungelisteten Aktien) benutzen, werden wir fast alle Nachrichten behalten können. Allerdings lassen sich so Fehlerhafte Nachrichten/Ticker etc. herausfiltern.


**3. Ticker-Grouping:**

Was machen wir, wenn wir mehrerer Aktiengattungen für ein Unternehmen haben? Z.B. Vorzugs- und Stammaktien. Wir können i.A. die Stammaktie nehmen, da diese normalerweise ein höheres Handelsvolumen aufweist. D.h. wir bilden alle Ticker der Benzinga-Nachrichten auf den Ticker der Stammaktie ab.


**4. Firmennamen-Nachrichtenkörper-Verifikation:**

Da Ticker wiederverwendet werden können bzw. sich verändern können wollen wir sicherstellen, dass der Unternehmensname im Nachrichtenkörper vorkommt! Bzw. generell ist das eine gute Datensäuberungs-Maßnahme.

In [None]:
input_dir = cwd+'/data/unraw3_bzg/'
output_dir = cwd+'/data/unraw4_bzg/'
ddf = dd.read_parquet(input_dir)

In [None]:
all_tickers = ddf.stocks.unique().compute()

### Full-Name-Discovery

In [None]:
company_names = all_tickers.apply(lambda x: yahoo_get_wrapper(x))

In [None]:
all_mapper = pd.concat([all_tickers, company_names], axis=1)

In [None]:
all_mapper.columns = ["ticker", "company_name"]

In [None]:
all_mapper = all_mapper.dropna() # This drops like half of the tickers.

In [None]:
all_mapper[all_mapper.company_name.apply(lambda x: "Alphabet" in x)]

In [None]:
print(len(all_mapper))
print(len(all_mapper.company_name.unique()))

In [None]:
vcs = all_mapper.company_name.value_counts()
vcs = vcs[vcs >= 2]

In [None]:
# Ticker-Grouping
all_mapper[all_mapper.company_name.isin(vcs.index)].sort_values("company_name")

Es ist nicht leicht zu sagen, welchen von den Tickern wir bevorzugen sollten. Abgleichen mit den Aktientickern des Kursdatensatzes notwendig, um zu sehen, ob überhaupt nur ein Ticker übereinstimmt. Wenn es für beide Ticker eine Kurszeitreihe gibt, dann sollten wir die nehmen, die ein höheres historisches Volumen hat. Dies ist allerdings etwas, was wir später machen und nicht jetzt. Hier wollen wir zunächst nur die Nachrichten verarbeiten, weswegen wir nur die NaN-Unternehmen rausnehmen und den Rest - *ohne Ticker-Filtering* - weiterverarbeiten.

In [None]:
mapper = all_mapper

In [None]:
mapper.columns = ["ticker", "company_names"]
mapper = mapper[mapper.isna().sum(axis=1) == 0]

In [None]:
mapper = mapper.set_index("ticker")

In [None]:
company_endings = pd.read_table("data_shared/corporation_endings.txt").iloc[:, 0]
# Apply get_company_abbreviation twice in order to get rid of Enterprise, Ltd.
# Otherwise , Ltd. remains.
mapper["short_name"] = mapper.company_names.apply(lambda x: get_company_abbreviation(x, company_endings=company_endings))

In [None]:
print(mapper.short_name.isna().sum()) # 2037 stocks for which we don't have an ending to abbreviate
# mapper.loc[:, "short_name"] = mapper.short_name.fillna(mapper.company_names)
mapper = mapper.applymap(lambda x: x.strip(" "))

In [None]:
mapper.to_parquet(cwd + "/data_shared/ticker_name_mapper.parquet")

In [None]:
mapper = pd.read_parquet(cwd + "/data_shared/ticker_name_mapper.parquet")

In [None]:
filt_ddf = ddf[ddf.stocks.isin(mapper.index.to_list())]

In [None]:
ddf.shape[0].compute()

In [None]:
filt_ddf.shape[0].compute()

Es verbleiben circa 1 Mio. Nachrichten, für die wir den Ticker zu einem FIrmennamen auflösen können.

Diese Nachrichten können wir nun wirklich parsen, und danach ordentlich kategorisieren.

In [None]:
ddf = filt_ddf
ddf = ddf.drop(columns=["author"]).rename(columns={"inferred_author":"author"})

In [None]:
# TODO: Kann effizienter werden mit GroupBy-Operation
ddf["company_name"] = ddf.stocks.apply(lambda x: mapper.company_names.loc[x], meta=pd.Series(dtype="string"))
ddf["short_name"] = ddf.stocks.apply(lambda x: mapper.short_name.loc[x], meta=pd.Series(dtype="string"))

In [None]:
name_function = lambda x: f"data-{x}.parquet"
ddf.to_parquet(cwd+'/data/latest/', name_function=name_function)

### Firmennamen-Nachrichtenkörper-Verifikation

In [None]:
ddf = dd.read_parquet(cwd+'/data/latest/')

In [None]:
# ddf["time"] = ddf["time"].astype(str)

In [None]:
def f(df):
    df["time"] = df["time"].astype(str)
    return df
# futures = client.submit(f, ddf)
# ddf = ddf.map_partitions(f)

In [None]:
# results = client.gather(futures)

In [None]:
mask = ddf.apply(lambda x: bool(re.search(x["short_name"], x["body"].replace("\n", " "), re.IGNORECASE)) or \
                 bool(re.search(x["short_name"], x.title, re.IGNORECASE)),
                 axis=1,
                 meta=pd.Series(dtype=bool))

In [None]:
# Around 11k stocks before `filtering`
# len(ddf.stocks.unique())

# Around 10k stocks after `filtering`
# len(ddf[mask].stocks.unique())

In [None]:
ddf[mask].shape[0].compute()

In [None]:
# Case 1: Firmenname wird in abgekürzter Version benutzt, die wir nicht methodisch rekonstruieren können
#   Z.B.: IBM -> International Business Machines,
#   oder UPS -> United Parcel Service
#   oder GE -> General Electric
# Case 2: Ticker wurde recycled und zeigt inzwischen auf eine andere Firma.
# Will hierzu nicht auch noch Daten kaufen, deswegen werden hier leider einige Nachrichten verworfen werden.

ddf[~mask].shape[0].compute() # ~120k Nachrichten mit `FALSCHEM` Firmennamen -> Textinhalt ableichen mit Firmennamen des Kursdaten-Datensatzes.

In [None]:
# ddf[~mask].stocks.value_counts().compute().head(15)

In [None]:
# Filter for high conviction entries
ddf = ddf[mask]

In [None]:
ddf = ddf.repartition(npartitions=30)

In [None]:
ddf.head()

### Duplikate Entfernen

In [None]:
# import cudf

In [None]:
# ddf = ddf.map_partitions(cudf.from_pandas)

In [None]:
res = ddf.map_partitions(lambda x: x.drop_duplicates())

In [None]:
# res.shape[0].compute()

In [None]:
ddf = res

In [None]:
ddf.head()

In [None]:
name_function = lambda x: f"data-{x}.parquet"
ddf.to_parquet(cwd+'/data/latest2/', name_function=name_function)

#### Convert ddf to pd.DataFrame

In [None]:
#ddf = ddf.compute()
ddf = ddf.sort_values("time")

### Timedeltas zwischen Nachrichtenmeldungen



Wir sehen, dass einige Nachrichten dupliziert vorkommen, d.h. mit einem Timedelta von 0 und mit derselben Überschrift etc. diese gilt es zu eliminieren.

In [None]:
tmp = ddf[["time", "stocks"]]

In [None]:
#### Adding timedeltas to the data frame
news_timedeltas = tmp.groupby("stocks").transform(lambda x: x.diff())

In [None]:
# ~3 minutes evaluates to true
# (news_timedeltas.index == ddf.index).all()

In [None]:
ddf.loc[:, "timedelta"] = news_timedeltas.time.fillna(pd.Timedelta(days=100))

In [None]:
news_timedeltas = ddf.timedelta

In [None]:
news_timedeltas.iloc[0].components

In [None]:
same_day_timedeltas = news_timedeltas.apply(lambda x: x.components.days == 0)

In [None]:
(same_day_timedeltas == 0).sum()

In [None]:
same_hour_timedeltas = news_timedeltas.apply(lambda x: (x.components.days == 0) & \
                                               (x.components.hours == 0))

In [None]:
print(same_hour_timedeltas.sum())

In [None]:
same_minute_timedeltas = news_timedeltas.apply(lambda x: (x.components.days == 0) & \
                                               (x.components.hours == 0) & \
                                               (x.components.minutes == 0))

In [None]:
print(same_minute_timedeltas.sum())

In [None]:
same_day_ddf = ddf.loc[same_day_timedeltas]

In [None]:
ddf.stocks.value_counts().describe()

Bis zu 5k Nachrichten pro Firma, z.B. AT&T, was in 13 Jahren ca. einer Nachricht pro Tag entspricht. Wir wollen nicht das eine Firma mit vielen Junk-Nachrichten das Modell dominiert.

Kategorisieren von Nachrichten (mit Text2Topic, wie Salbrechter?) und eliminieren von Business/Strategic etc.
Im Falle von Text2Topic, versuche Estimates des Unternehmens von Dritten zu unterscheiden.

Wichtig!!! Unterscheide zwischen LERN-Phase und PRODUKTIONS-Phase.
Wir können z.B. CLS-Token in der Produktions-Phase vergleichen, in der Lern-Phase aber noch nicht.

Text2Vec -> Business category evtl. entfernen-> Intrastock variance average

## Topic Modeling (Nach Aschluss Auslagern in Python file)

In [None]:
# Scaling up cluster and installing required packages
cluster.scale(5)
client.wait_for_workers(5)
client.run(worker_setup)

In [None]:
ddf = dd.read_parquet("gcs://extreme-lore-398917-bzg/latest2/",
                      storage_options={'token': token})

In [None]:
# Remove numbers
docs = ddf.body.map(lambda x: re.sub(r'\d+', '', x), meta=pd.Series(dtype="string"))

In [None]:
docs.name="body"

### TF-IDF

In [None]:
from src.topic_modeling.helpers import tokenize, keyword_filter
import numpy as np

In [None]:
keywords_list1 = ["launch", "business", "strategy", "management", "product", "service", "app", "customer", "merge"]
keywords_list2 = ["upgrade", "downgrade", "raise", "cut", "buy", "sell", "hold", "outperform", "underperform", "analyst", "estimate"]
keywords_list3 = ["ebit", "eps", "earnings", "report", "financial", "quarter", "annual", "year", "ended", "net", "income"]
total_keywords = keywords_list1 + keywords_list2 + keywords_list3
len(total_keywords)

In [None]:
# Tokenization + rough filtering
tfidf_docs = docs.map(lambda x: keyword_filter(tokenize(x), total_keywords), meta=pd.Series(dtype="object"))
tfidf_docs = tfidf_docs.reset_index().repartition(npartitions=5)
tfidf_docs.columns = ["index", "body"]
tfidf_docs.to_parquet("gcs://extreme-lore-398917-bzg/tfidf-tokens",
                                                    storage_options={'token': token})

In [None]:
tfidf_docs = dd.read_parquet("gcs://extreme-lore-398917-bzg/tfidf-tokens/",
                                                    storage_options={'token': token},
                                                    dtype_backend="pyarrow")

In [None]:
tfidf_docs = tfidf_docs.compute()

In [None]:
# Numpy representation is without commas making us unable to convert strings to list via eval
tfidf_docs.loc[:, "body"] = tfidf_docs.body.map(lambda x: x.replace(" ", ", "))

In [None]:
def f(tokens):
    if len(tokens) == 0:
        return np.array(["wordtopreventemptydocumentswhencalculatingtfidf"])
    else:
        return np.array(tokens)
        
tfidf_docs.loc[:, "body"] = tfidf_docs.body.map(lambda x: f(eval(x)))

In [None]:
tfidf_docs.iloc[0].body

In [None]:
topic_tfidfs_list = []
for topic_tokenizer in [keywords1, keywords2, keywords3]:
    lazy_result = dask.delayed(TfidfVectorizer(tokenizer=topic_tokenizer, lowercase=False).fit_transform)(tfidf_docs.body)
    lazy_sums = dask.delayed(np.apply_along_axis)(np.sum, 1, lazy_result.todense())
    topic_tfidfs_list.append(lazy_sums)

In [None]:
topic_indicator_list = dask.compute(*topic_tfidfs_list)

In [None]:
df = pd.DataFrame(topic_indicator_list)
df=df.transpose()

In [None]:
tfidf_docs.head().iloc[4].body

In [None]:
df.head()

### LDA

In [None]:
from gensim.corpora import Dictionary
from gensim.models import Phrases
import nltk

In [None]:
# Split the documents into tokens.
lda_docs = docs.apply(lambda x: word_tokenize(str.lower(x)), meta=pd.Series(dtype="object"))

In [None]:
# Remove words that are only one character.
# Lemmatize the documents.
lemmatizer = WordNetLemmatizer()
lda_docs = lda_docs.apply(lambda x: [lemmatizer.lemmatize(token) for token in x if len(token) >1], meta=pd.Series(dtype="object"))

In [None]:
lda_docs.head()

In [None]:
# Add bigrams and trigrams to docs (only ones that appear 20 times or more).
bigram = Phrases(docs, min_count=20)
def f(doc):
  doc = doc
  for token in bigram[doc]:
      if '_' in token:
          # Token is a bigram, add to document.
          doc.append(token)
  return doc
lda_docs = lda_docs.apply(lambda doc: f(doc))

In [None]:
# lda_docs.name = "body"
# lda_docs = lda_docs.to_frame().reset_index()
# lda_docs = lda_docs.reset_index().repartition()

In [None]:
# lda_docs.to_parquet(cwd + "/data/lda_docs", schema={"body": pa.string(),  "index":pa.int32()})
# lda_docs = dd.read_parquet(cwd + "/data/lda_docs")

In [None]:
# Remove rare and common tokens.

# Create a dictionary representation of the documents.
dictionary = Dictionary(docs)

# Filter out words that occur less than 20 documents, or more than 50% of the documents.
dictionary.filter_extremes(no_below=1000, no_above=0.6)

In [None]:
# Bag-of-words representation of the documents.
corpus = [dictionary.doc2bow(doc) for doc in docs]

In [None]:
print('Number of unique tokens: %d' % len(dictionary))
print('Number of documents: %d' % len(corpus))

In [None]:
# Train LDA model.
from gensim.models import LdaModel

# Set training parameters.
num_topics = 10
chunksize = 2000
passes = 20
iterations = 400
eval_every = None  # Don't evaluate model perplexity, takes too much time.

# Make an index to word dictionary.
temp = dictionary[0]  # This is only to "load" the dictionary.
id2word = dictionary.id2token

model = LdaModel(
    corpus=corpus,
    id2word=id2word,
    chunksize=chunksize,
    alpha='auto',
    eta='auto',
    iterations=iterations,
    num_topics=num_topics,
    passes=passes,
    eval_every=eval_every
)

In [None]:
top_topics = model.top_topics(corpus)

# Average topic coherence is the sum of topic coherences of all topics, divided by the number of topics.
avg_topic_coherence = sum([t[1] for t in top_topics]) / num_topics
print('Average topic coherence: %.4f.' % avg_topic_coherence)

from pprint import pprint
pprint(top_topics)

## Nachrichten-Parsing


In [None]:
# dask.config.set({'distributed.scheduler.worker-ttl': "60 minutes"})

In [8]:
# Wait an appropriate amount of time to give autoscaler opportunity to create nodes.
# Even CommClosedError occurrs... just wait
n = 10
# cluster.scale(n, worker_group="highmem")
client.wait_for_workers(n)

In [None]:
n = 1
# cluster.scale(0, worker_group="highmem-single")
client.wait_for_workers(n)

In [9]:
ddf = dd.read_parquet("gcs://extreme-lore-398917-bzg/latest2/",
                      storage_options={'token': token})

### Beispiel/ Untersuchung

In [None]:
sample_partition = ddf.get_partition(20)
y = sample_partition.head(5)
y.loc[:, "time"] = pd.to_datetime(y.time).dt.tz_convert("UTC")
x = y.iloc[2]
x.body

In [None]:
res = client.submit(filter_body, row=x, logging=False)

In [None]:
res.result()

### Anwenden der filter_body-Funktion auf alle Reihen:

In [10]:
def handle_timezone(x):
    try:
        return x.tz_convert("US/Eastern")
    except Exception as e:
        return x.tz_localize("US/Eastern")

In [11]:
# Still need to parse time correctly...
ddf["time"] = ddf["time"].map(lambda x: handle_timezone(pd.to_datetime(x)), meta=pd.Series(dtype="datetime64[ns, US/Eastern]"))

In [12]:
# part_n = 1
# par = ddf.partitions[:3]
par = ddf  
# par = ddf

In [13]:
with dask.annotate(executor="processes", retries=5):
    par["parsed_body"] = par.map_partitions(lambda y: y.apply(lambda x: filter_body(x),
                                                    axis=1),
                                                    meta=pd.Series(dtype="string"))

In [14]:
# Hier der error letztes Mal
# from src.cloud_manager import VMManager

In [15]:
par = client.persist(par) 

Task exception was never retrieved
future: <Task finished name='Task-463' coro=<PortForward._sync_sockets() done, defined at /opt/conda/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ClientOSError(104, 'Connection reset by peer')>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/kr8s/_portforward.py", line 170, in _sync_sockets
    async with self._connect_websocket() as ws:
  File "/opt/conda/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
  File "/opt/conda/lib/python3.10/site-packages/kr8s/_portforward.py", line 149, in _connect_websocket
    async with self.pod.api.open_websocket(
  File "/opt/conda/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
  File "/opt/conda/lib/python3.10/site-packages/kr8s/_api.py", line 189, in open_websocket
    async with session.ws_connect(**kwargs) as response:
  File "/opt/conda/lib/python3.10/site-packages/aiohttp/clien

In [None]:
from dask.distributed import wait, progress

In [None]:
# progress(par)

In [None]:
wait(par)

In [None]:
# Do this only after having persisted, because this blocks the console making us unable to interact witth client/cluster
# and can interfere with communication between client and scheduler.

# VMManager needs to be redone... do os.system(kubectl delete dsk example), its a cleaner shutdown and also deletes nodes
# with VMManager(cluster) as _:
par.to_parquet("gcs://extreme-lore-398917-bzg/processed_news",
        storage_options={'token': token})

In [None]:
client # no connection, can we connect via ip??

In [None]:
cluster # connection still here?

## Analyse der durschnittlichen Tokenlänge

## Generating timestamp-stock pairs in order to know for which days we need stock data