
# Topics Processor 💬

This notebook cleans up and assigns more well structured topics to news sections. 

#### Notebook Properties
* Upstream Notebook: `src.engineering.word_counts_and_sentiments`
* Compute Resources: `64 GB RAM, 4 CPUs`
* Last Updated: `Dec 10 2023`

#### Data

| **Name** | **Type** | **Location Type** | **Description** | **Location** | 
| --- | --- | --- | --- | --- | 
| `all_the_news` | `input` | `Delta` | Read full delta dataset of `AllTheNews` | `catalog/text_eda/all_the_news.delta` | 

In [0]:
!python -m spacy download en_core_web_sm -q

In [0]:
import spacy
nlp = spacy.load("en_core_web_sm")

In [0]:
import warnings

warnings.filterwarnings("ignore")

import pandas as pd
import numpy as np
import json
import plotly.express as px
import contextlib

from tqdm.autonotebook import tqdm
from deltalake import DeltaTable
from deltalake.exceptions import TableNotFoundError
import pyarrow as pa
from src.utils.io import FileSystemHandler, partition_dataframe
from src.utils.schemas import all_the_news_raw_schema
from src.utils.functions import (
    assign_simple_topics_to_dataframe,
    get_topics_for_section,
    stop_words,
)

In [0]:
pd.set_option("display.max_columns", None)
pd.set_option("display.max_info_rows", None)

pd.options.plotting.backend = "plotly"

tqdm.pandas()
datafs = FileSystemHandler("s3")

with open("./section_to_topic_lexicon.json", "r") as f:
    simple_topic_lexicon: dict[str, list[str]] = json.load(f)

In [0]:
LIMIT_PARTITIONS: int | None = None
"""An input parameter to limit the number of table partitions to read from delta. Useful to perform EDA on a sample of data."""

SHUFFLE_PARTITIONS: bool = False
"""Whether to randomize the partitions before reading"""

INPUT_TABLE: str = "all_the_news" 
INPUT_CATALOG: str = "text_eda"
OUTPUT_TABLE: str = "all_the_news"
OUTPUT_CATALOG: str = "simple_topic"


### Read Data

In [0]:
atn_delta_table: DeltaTable = datafs.read_delta(
    table=INPUT_TABLE,
    catalog_name=INPUT_CATALOG,
    as_pandas=False,
)

df: pd.DataFrame = datafs.read_delta_partitions(
    delta_table=atn_delta_table,
    N_partitions=LIMIT_PARTITIONS,
    shuffle_partitions=SHUFFLE_PARTITIONS,
)

In [0]:
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values(by=["date"])

df = df.dropna(subset=["section"])
df["section"] = df["section"].dropna().str.title()

print(df.shape)
df.head()

In [0]:
df.loc[
    df.section.fillna("").str.lower().apply(lambda cell: cell.startswith("opinion")),
    "section",
] = "Opinion"

In [0]:
unique_section_counts: pd.Series = df["section"].value_counts()
sections_with_single_article: list[str] = unique_section_counts[
    unique_section_counts == 1
].index

print(len(sections_with_single_article))

df.loc[df.section.isin(sections_with_single_article), "section"] = None

In [0]:
for year in df.year.unique():
    year_unique_section_counts = df[df.year == year]["section"].value_counts()
    sections_with_single_article_in_year: list[str] = year_unique_section_counts[
        year_unique_section_counts == 1
    ].index
    print(
        f"Year {year} has {len(sections_with_single_article_in_year)} single article sections, that will be nullified."
    )
    df.loc[
        (df.section.isin(sections_with_single_article_in_year)) & (df.year == year),
        "section",
    ] = None

In [0]:
df = df.dropna(subset=["section"])

In [0]:
generic_sections: list[str] = [
    "Market News",
    "World News",
    "Business News",
    "Wires",
    "World",
    "Intel",
    "News",
]

df = df[~df.section.isin(generic_sections)]
print(df.shape)


### Assign Sections by Geography

In [0]:
def get_geographical_entity(text: str) -> list[str] | None:
    """Uses NER to check if a string is a geographical entity (country or place)"""
    docs: list = [nlp(h_s) for sp_s in text.split() for h_s in sp_s.split("-")]
    gpe_entities = [
        ent.text
        for doc in docs
        for ent in doc.ents
        if ent.label_ in ("GPE", "LOC", "NORP")
    ]
    return gpe_entities if gpe_entities else None


section_df = df["section"].drop_duplicates().to_frame().reset_index(drop=True)
section_df["geo"] = section_df["section"].apply(get_geographical_entity)
section_df["is_geo"] = (
    section_df["geo"].dropna().apply(lambda cell: True if len(cell) > 0 else False)
)
section_df["is_geo"] = section_df["is_geo"].fillna(False)
print(section_df.shape)
section_df.head()

In [0]:
section_df.is_geo.value_counts(normalize=True) * 100

In [0]:
section_df = section_df.set_index("section")
geo_section_mapping: dict[str, bool] = section_df.to_dict()["geo"]
tuple(geo_section_mapping.items())[0:5]

In [0]:
df["geo"] = df["section"].map(geo_section_mapping)
df["is_geo"] = df["geo"].dropna().apply(lambda cell: True if len(cell) > 0 else False)
df["is_geo"] = df["is_geo"].fillna(False)
df.is_geo.value_counts(normalize=True)

In [0]:
non_geo_sections = df[~df.is_geo].section.value_counts()
non_geo_sections.head()

In [0]:
print(df.shape)
df[df.is_geo][["date", "section", "publication", "is_geo", "geo"]].sample(5)


## Section Coalescing

Here, we use a topic lexicon to assign news sections to topics after some additional preprocessing

In [0]:
df_y = df.copy()
df_y = df_y[~df_y.is_geo]
df_y = df_y.dropna(subset=["section"])
df_y = df_y.drop(columns=["geo", "is_geo"])
print(df_y.shape)
df_y.head()

In [0]:
df_y, section_df = assign_simple_topics_to_dataframe(
    df_y, simple_topic_lexicon=simple_topic_lexicon
)
df_y["simple_topic"] = df_y["simple_topic"].replace([np.nan], [None])
print(section_df["simple_topic"].info())
section_df.head()

In [0]:
df_y['simple_topic'].info()

In [0]:
df_y[["section", "simple_topic", "title"]].head()

In [0]:
section_df[section_df.simple_topic.isna()][
    ["section", "article_count", "section_clean"]
].iloc[0:10]

In [0]:
df_y = df_y.dropna(subset=['simple_topic'])
print(df_y.shape)

In [0]:
section_df.groupby(["simple_topic"])["section"].count().sort_values(
    ascending=False
).plot(
    kind="bar",
    template="plotly_white",
    title="Section Capture per Mapped Simple Topic",
)

In [0]:
section_df[section_df.simple_topic == "Sports"].sample(5)

In [0]:
df_y.simple_topic.value_counts(normalize=True).plot(
    kind="bar", template="plotly_white", title="Articles per Simple Topic"
)

In [0]:
section_dist = section_df[["simple_topic", "section", "article_count"]].dropna(
    subset=["simple_topic"]
)

section_dist_fig = px.bar(
    section_dist,
    x="simple_topic",
    y="article_count",
    color="section",
    title="Section Distributions per Topic across Article Counts",
    template="plotly_white",
)

section_dist_fig.show()

In [0]:
new_pub_dist = df_y.publication.value_counts(normalize=True)
old_pub_dist = df.publication.value_counts(normalize=True)

pub_dist_df = pd.DataFrame()
pub_dist_df.index = old_pub_dist.index
pub_dist_df["old_pub_ratio"] = old_pub_dist * 100
pub_dist_df["new_pub_ratio"] = new_pub_dist * 100
pub_dist_df["ratio_pp"] = pub_dist_df["new_pub_ratio"] - pub_dist_df["old_pub_ratio"]

pub_dist_df = pub_dist_df.sort_values(by=["ratio_pp"])

print(pub_dist_df.shape)

print(
    "Standard Deviation of Publication Representaton Percentage Points",
    round(pub_dist_df["ratio_pp"].std(), 2),
)

pub_dist_df.transpose().round(2)

In [0]:
with contextlib.suppress(TableNotFoundError):
    """if table already doesn't exist, then ignore"""
    print(datafs.clear_delta(table=OUTPUT_TABLE, catalog_name=OUTPUT_CATALOG))

In [0]:
new_text_fields: list[pa.field] = [
    pa.field("title_word_count", pa.int64()),
    pa.field("article_word_count", pa.int64()),
    pa.field("title_textblob_sentiment", pa.float64()),
    pa.field("article_textblob_sentiment", pa.float64()),
    pa.field("vader_prob_positive_title", pa.float64()),
    pa.field("vader_prob_negative_title", pa.float64()),
    pa.field("vader_prob_neutral_title", pa.float64()),
    pa.field("vader_compound_title", pa.float64()),
    pa.field("simple_topic", pa.string()),
]

all_the_news_simple_topic_schema = all_the_news_raw_schema

for new_field in new_text_fields:
    all_the_news_simple_topic_schema = all_the_news_simple_topic_schema.append(new_field)

all_the_news_simple_topic_schema

In [0]:
df_y.columns

In [0]:
df_partitions: list[pd.DataFrame] = partition_dataframe(df_y, N_Partitions=25)

for p_df in tqdm(df_partitions):
    datafs.write_delta(
        dataframe=p_df,
        table=OUTPUT_TABLE,
        catalog_name=OUTPUT_CATALOG,
        schema=all_the_news_simple_topic_schema,
    )