The demo and small datasets provided by EB-Nerd are subsets of the full dataset, designed for different levels of experimentation and prototypig. 


- We will use the demo dataset for the beggining in order to develop our model. and to quickly validate our core or preprocessing pipeline. 

- Then we will use the small dataset to verify that our code works the demo dataset, because it is a more representative subset for training an evaluating the models . 

- The large dataset, requires significant computational resources, and it is more time consuming to process so we should used it only after confirming the pipeline works correctly with smaller datasets. 

# Let's sarts with all the essential preprocessing steps required for the NRMS model and incorporates the data cleaning. 

### What Was Included
1. Load and Merge Data:

Combined history.parquet and behaviors.parquet for both training and validation datasets.
Ensured the userâ€™s click history (clicked_articles) and behaviors (inview_articles) were merged properly.

2. Explode inview_articles:

Expanded inview_articles into individual rows.
Generated binary labels (1 for clicked articles, 0 for others).

3. Tokenize and Encode Articles:

Used XLM-RoBERTa to tokenize and generate embeddings for news titles and subtitles.
Prepared these embeddings for use by the NRMS model.

4. Create Article Mappings:

Mapped article IDs to their tokenized/encoded representations for efficient lookup.

5. Negative Sampling:

Paired each positive (clicked) article with multiple negative (non-clicked) samples.
Ensured balanced data for training the NRMS model.

6. Prepare Dataloaders:

Formatted the data into batches using NRMSDataLoaderPretransform.
This ensures compatibility with the NRMS training process.

7. Data Cleaning (from your notebook):

Dropped unnecessary columns (impression_time, articles_num, etc.) from the dataset.
Verified column existence before dropping to avoid errors.
Ensured the dataset is clean and optimized for processing.

8. Efficient Output:

Saved the final processed datasets in chunked Parquet files, allowing efficient handling of large files.


The constants defined in our code represent hey parameters, column names and configuration values used throughout the preprocessing and model pipeline. 

In [71]:
from pathlib import Path
import polars as pl
from transformers import AutoTokenizer, AutoModel
import pyarrow.parquet as pq
import pyarrow as pa

# Define constants

#Dataset columns names
DEFAULT_USER_COL = "user_id"
DEFAULT_HISTORY_ARTICLE_ID_COL = "article_id_fixed"
DEFAULT_CLICKED_ARTICLES_COL = "article_ids_clicked"
DEFAULT_INVIEW_ARTICLES_COL = "article_ids_inview"
DEFAULT_TITLE_COL = "title"
DEFAULT_SUBTITLE_COL = "subtitle"
DEFAULT_LABELS_COL = "label"




# Transformer model name 
TRANSFORMER_MODEL_NAME = "FacebookAI/xlm-roberta-base"

# Preprocessing Parameters 
HISTORY_SIZE = 20
MAX_TITLE_LENGTH = 30
NPRATIO = 4
SEED = 42



1. Defines file paths
2. Defines an output directory to save processed data 

In [52]:
from pathlib import Path

# Define the base dataset path relative to your current location
DATASET_PATH = Path("c:/Users/Lydia/OneDrive - Danmarks Tekniske Universitet/DTU master , lectures and exercises/Deep learning my exercises/Deep-learning_final_project-2/ebnerd_demo")

# Define file paths
HISTORY_PATH = DATASET_PATH / "train/history.parquet"  # history.parquet in train folder
TRAIN_BEHAVIORS_PATH = DATASET_PATH / "train/behaviors.parquet"  # behaviors.parquet in train folder
ARTICLES_PATH = DATASET_PATH / "articles.parquet"  # articles.parquet directly in ebnerd_demo

# Verify that the files exist
def verify_file_paths(*paths):
    for path in paths:
        if not path.exists():
            raise FileNotFoundError(f"File not found: {path}")

verify_file_paths(HISTORY_PATH, TRAIN_BEHAVIORS_PATH, ARTICLES_PATH)

# Print confirmation
print("All files are found!")



All files are found!


In [53]:
from pathlib import Path

print(Path(HISTORY_PATH).exists())  # Should return True if the file exists
print(Path(TRAIN_BEHAVIORS_PATH).exists())  # Should return True
print(Path(ARTICLES_PATH).exists())  # Should return True


True
True
True


In this part of the script we are performing the data clueaning and merging step. 
1. Loading the datasets
2. Truncating click history: Limits click history to a fixed size for consistency.
The above means limiting the number of articles in a user's click history to a fixed size. For example for users with long click histories , we date only a number of them (5), and if a user has a short click histories, the history is padded with zeros to match the required size. All click histories are the same length.



3. Merging datasets : combines user click history with their behavior logs
4. Cleaning the data: Drops irrelevant columns to reduce dataset size and focus on required information

In [64]:
# Define columns to drop (from your notebook)
COLUMNS_TO_DROP = [
    "impression_time", "articles_num", "last_modified_time", "body",
    "published_time_x", "published_time_y", "subcategory", "ner_clusters",
    "entity_groups", "impr_pub_hour"
]

# Define a custom truncate_history function
def truncate_history(column, history_size, padding_value=0):
    """
    Truncates or pads a list of clicked articles to a fixed history size.
    """
    return column.arr.slice(0, history_size).arr.eval(
        lambda x: x + [padding_value] * (history_size - len(x)) if len(x) < history_size else x
    )



# Updated load_and_clean_data function
def load_and_clean_data(history_path, behaviors_path, articles_path):
    # Load history.parquet
    df_history = pl.read_parquet(history_path)

    # Group by user_id and aggregate articles into a list
    df_history_grouped = df_history.groupby(DEFAULT_USER_COL).agg(
        pl.col(DEFAULT_HISTORY_ARTICLE_ID_COL).list().alias(DEFAULT_HISTORY_ARTICLE_ID_COL)
    )

    # Truncate the article history
    df_history_grouped = df_history_grouped.with_columns(
        truncate_history(pl.col(DEFAULT_HISTORY_ARTICLE_ID_COL), HISTORY_SIZE, 0).alias(DEFAULT_HISTORY_ARTICLE_ID_COL)
    )

    # Load behaviors.parquet and join with grouped history
    df_behaviors = pl.read_parquet(behaviors_path).join(
        df_history_grouped,
        on=DEFAULT_USER_COL,
        how="left",
    )

    # Drop irrelevant columns
    columns_to_drop = [col for col in COLUMNS_TO_DROP if col in df_behaviors.columns]
    df_behaviors = df_behaviors.drop(columns=columns_to_drop)

    return df_behaviors




In this step we are preparing the inview articles column for the next steps in the preprocessing pipeline.

1. Converts the list of inview_articles into individual rows, with each row corresponding to a single article, because the nrms model works on a per-article basis, comparing each article the user saw against their click history to predict clicks. Exploding ensures each article in the list is processed individually. 



Example: Before Exploding:

- user_id	inview_articles

1	[101, 102, 103]

2	[201, 202]

After Exploding:

- user_id	inview_article

1	101

1	102

1	103

2	201

2	202



2. Assigns a binary label (1 or 0) to each inview_article, indicating whether the user clicked on the article.

In [60]:
# Explode and add labels
def explode_inview_articles(df):
    return (
        df.with_columns(
            pl.col(DEFAULT_INVIEW_ARTICLES_COL).list.explode().alias("inview_article")
        )
        .with_columns(
            pl.when(pl.col("inview_article").is_in(pl.col(DEFAULT_CLICKED_ARTICLES_COL)))
            .then(1)
            .otherwise(0)
            .alias(DEFAULT_LABELS_COL)
        )
    )


This function is responsible for preparing the textual content in our dataset by tokenizing and encoding it into numerical embeddings using a transformer model.

In [56]:
# Tokenize articles
def tokenize_and_encode_articles(articles_path):
    df_articles = pl.read_parquet(articles_path)

    # Load transformer model and tokenizer
    transformer_model = AutoModel.from_pretrained(TRANSFORMER_MODEL_NAME)
    transformer_tokenizer = AutoTokenizer.from_pretrained(TRANSFORMER_MODEL_NAME)

    # Tokenize and encode text
    df_articles = convert_text2encoding_with_transformers(
        df_articles, transformer_tokenizer, DEFAULT_TITLE_COL, max_length=MAX_TITLE_LENGTH
    )

    return df_articles

In [57]:
# Negative sampling
def apply_negative_sampling(df):
    return df.pipe(
        sampling_strategy_wu2019,
        npratio=NPRATIO,
        shuffle=True,
        with_replacement=True,
        seed=SEED,
    )



In [65]:
# Save large datasets in chunks
def save_dataset_in_chunks(df, output_file, chunk_size=1_000_000):
    table = pa.Table.from_pandas(df.iloc[:1])  # Use the first row to get schema
    writer = pq.ParquetWriter(output_file, table.schema, compression="gzip")

    total_rows = len(df)
    print(f"Starting to write {total_rows} rows in chunks...")

    for start in range(0, total_rows, chunk_size):
        chunk = df.iloc[start:start + chunk_size]
        table = pa.Table.from_pandas(chunk)
        writer.write_table(table)
        print(f"Written rows {start} to {start + len(chunk) - 1}.")

    writer.close()
    print(f"Final dataset saved as '{output_file}'")

# Main preprocessing pipeline
df_train_behaviors = load_and_clean_data(HISTORY_PATH, TRAIN_BEHAVIORS_PATH, ARTICLES_PATH)
df_train_behaviors = explode_inview_articles(df_train_behaviors)
df_train_behaviors = apply_negative_sampling(df_train_behaviors)
save_dataset_in_chunks(df_train_behaviors, OUTPUT_PATH / "train.parquet")

AttributeError: 'DataFrame' object has no attribute 'groupby'

In [86]:
from pathlib import Path
import pandas as pd
from transformers import AutoTokenizer, AutoModel
import pyarrow as pa
import pyarrow.parquet as pq

# Define constants

# Dataset columns names
DEFAULT_USER_COL = "user_id"
DEFAULT_HISTORY_ARTICLE_ID_COL = "article_id_fixed"
DEFAULT_CLICKED_ARTICLES_COL = "article_ids_clicked"
DEFAULT_INVIEW_ARTICLES_COL = "article_ids_inview"
DEFAULT_TITLE_COL = "title"
DEFAULT_SUBTITLE_COL = "subtitle"
DEFAULT_LABELS_COL = "label"

# Transformer model name
TRANSFORMER_MODEL_NAME = "FacebookAI/xlm-roberta-base"

# Preprocessing Parameters
HISTORY_SIZE = 20
MAX_TITLE_LENGTH = 30
NPRATIO = 4
SEED = 42

# Define the base dataset path relative to your current location
DATASET_PATH = Path(
    "c:/Users/Lydia/OneDrive - Danmarks Tekniske Universitet/DTU master , lectures and exercises/Deep learning my exercises/Deep-learning_final_project-2/ebnerd_demo"
)

# Define file paths
HISTORY_PATH = DATASET_PATH / "train/history.parquet"  # history.parquet in train folder
TRAIN_BEHAVIORS_PATH = DATASET_PATH / "train/behaviors.parquet"  # behaviors.parquet in train folder
ARTICLES_PATH = DATASET_PATH / "articles.parquet"  # articles.parquet directly in ebnerd_demo

# Verify that the files exist
def verify_file_paths(*paths):
    for path in paths:
        if not path.exists():
            raise FileNotFoundError(f"File not found: {path}")

verify_file_paths(HISTORY_PATH, TRAIN_BEHAVIORS_PATH, ARTICLES_PATH)
print("All files are found!")

# Define columns to drop
COLUMNS_TO_DROP = [
    "impression_time", "scroll_percentage", "device_type", "session_id",
    "next_read_time", "next_scroll_percentage", "postcode", "age",
    "is_subscriber", "gender", "is_sso_user"
]

# Define a custom truncate_history function
def truncate_history_manual(history, history_size, padding_value=0):
    """
    Manually truncate or pad a list of article histories.
    """
    return [
        h[:history_size] + [padding_value] * max(0, history_size - len(h))
        for h in history
    ]

# Load and clean data
def load_and_clean_data(history_path, behaviors_path):
    # Load history.parquet
    df_history = pd.read_parquet(history_path)

    # Verify required columns
    required_columns = [DEFAULT_USER_COL, DEFAULT_HISTORY_ARTICLE_ID_COL]
    for col in required_columns:
        if col not in df_history.columns:
            raise ValueError(f"Missing required column: {col} in history.parquet")

    # Aggregate articles into lists per user
    user_history = df_history.groupby(DEFAULT_USER_COL)[DEFAULT_HISTORY_ARTICLE_ID_COL].apply(list).reset_index()

    # Truncate histories
    user_history[DEFAULT_HISTORY_ARTICLE_ID_COL] = truncate_history_manual(
        user_history[DEFAULT_HISTORY_ARTICLE_ID_COL], HISTORY_SIZE, 0
    )

    # Load behaviors.parquet
    df_behaviors = pd.read_parquet(behaviors_path)

    # Verify required columns
    required_behavior_columns = [
        DEFAULT_USER_COL,
        DEFAULT_CLICKED_ARTICLES_COL,
        DEFAULT_INVIEW_ARTICLES_COL,
    ]
    for col in required_behavior_columns:
        if col not in df_behaviors.columns:
            raise ValueError(f"Missing required column: {col} in behaviors.parquet")

    # Merge behaviors with truncated history
    df_behaviors = pd.merge(df_behaviors, user_history, on=DEFAULT_USER_COL, how="left")

    # Drop irrelevant columns
    df_behaviors.drop(columns=[col for col in COLUMNS_TO_DROP if col in df_behaviors.columns], inplace=True)

    return df_behaviors

# Explode and add labels
def explode_inview_articles(df):
    """
    Explode the inview articles and add labels.
    """
    df_exploded = df.explode(DEFAULT_INVIEW_ARTICLES_COL)
    df_exploded[DEFAULT_LABELS_COL] = df_exploded.apply(
        lambda row: 1 if row[DEFAULT_INVIEW_ARTICLES_COL] in row[DEFAULT_CLICKED_ARTICLES_COL] else 0,
        axis=1
    )
    return df_exploded

# Tokenize articles
def tokenize_and_encode_articles(articles_path):
    df_articles = pd.read_parquet(articles_path)

    # Verify required columns
    required_article_columns = [DEFAULT_TITLE_COL, DEFAULT_SUBTITLE_COL]
    for col in required_article_columns:
        if col not in df_articles.columns:
            raise ValueError(f"Missing required column: {col} in articles.parquet")

    # Load transformer model and tokenizer
    transformer_model = AutoModel.from_pretrained(TRANSFORMER_MODEL_NAME)
    transformer_tokenizer = AutoTokenizer.from_pretrained(TRANSFORMER_MODEL_NAME)

    # Tokenize and encode text
    # Add your text tokenization logic here, e.g., apply transformer tokenizer on titles

    return df_articles

# Negative sampling
def apply_negative_sampling(df):
    """
    Apply negative sampling to the dataset.
    """
    # Implement your negative sampling logic here
    pass

# Save large datasets in chunks
def save_dataset_in_chunks(df, output_file, chunk_size=1_000_000):
    """
    Save large datasets in chunks using PyArrow Parquet.
    """
    # Ensure all list columns are represented consistently as strings for saving
    for col in df.columns:
        if df[col].apply(lambda x: isinstance(x, list)).any():
            df[col] = df[col].apply(lambda x: x if isinstance(x, list) else [])
            df[col] = df[col].apply(str)  # Convert lists to strings
    
    # Convert to PyArrow table
    table = pa.Table.from_pandas(df)
    writer = pq.ParquetWriter(output_file, table.schema, compression="gzip")

    total_rows = len(df)
    print(f"Starting to write {total_rows} rows in chunks...")

    for start in range(0, total_rows, chunk_size):
        chunk = df.iloc[start:start + chunk_size]
        table = pa.Table.from_pandas(chunk)
        writer.write_table(table)
        print(f"Written rows {start} to {start + len(chunk) - 1}.")

    writer.close()
    print(f"Final dataset saved as '{output_file}'")


# Main preprocessing pipeline
OUTPUT_PATH = DATASET_PATH / "processed"
OUTPUT_PATH.mkdir(parents=True, exist_ok=True)

df_train_behaviors = load_and_clean_data(HISTORY_PATH, TRAIN_BEHAVIORS_PATH)
df_train_behaviors = explode_inview_articles(df_train_behaviors)
save_dataset_in_chunks(df_train_behaviors, OUTPUT_PATH / "train.parquet")

print("Data preprocessing complete.")


All files are found!
Starting to write 278139 rows in chunks...
Written rows 0 to 278138.
Final dataset saved as 'c:\Users\Lydia\OneDrive - Danmarks Tekniske Universitet\DTU master , lectures and exercises\Deep learning my exercises\Deep-learning_final_project-2\ebnerd_demo\processed\train.parquet'
Data preprocessing complete.


In [89]:
import polars as pl

# Check columns in history.parquet
history_df = pl.read_parquet(HISTORY_PATH)
print("Columns in history.parquet:", history_df.columns)

# Check columns in behaviors.parquet
behaviors_df = pl.read_parquet(TRAIN_BEHAVIORS_PATH)
print("Columns in behaviors.parquet:", behaviors_df.columns)




Columns in history.parquet: ['user_id', 'impression_time_fixed', 'scroll_percentage_fixed', 'article_id_fixed', 'read_time_fixed']
Columns in behaviors.parquet: ['impression_id', 'article_id', 'impression_time', 'read_time', 'scroll_percentage', 'device_type', 'article_ids_inview', 'article_ids_clicked', 'user_id', 'is_sso_user', 'gender', 'postcode', 'age', 'is_subscriber', 'session_id', 'next_read_time', 'next_scroll_percentage']


In [88]:
import pandas as pd

# Define the path to your train.parquet file
train_parquet_path = "c:/Users/Lydia/OneDrive - Danmarks Tekniske Universitet/DTU master , lectures and exercises/Deep learning my exercises/Deep-learning_final_project-2/ebnerd_demo/processed/train.parquet"

# Read the Parquet file
df_train = pd.read_parquet(train_parquet_path)

# Print the first few rows
df_train.head()


Unnamed: 0,impression_id,article_id,read_time,article_ids_inview,article_ids_clicked,user_id,article_id_fixed,label
0,48401,,21.0,9774516,[9759966],22779,"[array([9738452, 9737521, 9738760, 9733713, 97...",0
0,48401,,21.0,9771051,[9759966],22779,"[array([9738452, 9737521, 9738760, 9733713, 97...",0
0,48401,,21.0,9770028,[9759966],22779,"[array([9738452, 9737521, 9738760, 9733713, 97...",0
0,48401,,21.0,9775402,[9759966],22779,"[array([9738452, 9737521, 9738760, 9733713, 97...",0
0,48401,,21.0,9774461,[9759966],22779,"[array([9738452, 9737521, 9738760, 9733713, 97...",0


In [90]:
import pandas as pd

# Define the paths for the datasets
history_path = "c:/Users/Lydia/OneDrive - Danmarks Tekniske Universitet/DTU master , lectures and exercises/Deep learning my exercises/Deep-learning_final_project-2/ebnerd_demo/train/history.parquet"
behaviors_path = "c:/Users/Lydia/OneDrive - Danmarks Tekniske Universitet/DTU master , lectures and exercises/Deep learning my exercises/Deep-learning_final_project-2/ebnerd_demo/train/behaviors.parquet"
articles_path = "c:/Users/Lydia/OneDrive - Danmarks Tekniske Universitet/DTU master , lectures and exercises/Deep learning my exercises/Deep-learning_final_project-2/ebnerd_demo/articles.parquet"

# Read the datasets
history_df = pd.read_parquet(history_path)
behaviors_df = pd.read_parquet(behaviors_path)
articles_df = pd.read_parquet(articles_path)

# Print the columns of each dataset
{
    "History Columns": history_df.columns.tolist(),
    "Behaviors Columns": behaviors_df.columns.tolist(),
    "Articles Columns": articles_df.columns.tolist()
}


{'History Columns': ['user_id',
  'impression_time_fixed',
  'scroll_percentage_fixed',
  'article_id_fixed',
  'read_time_fixed'],
 'Behaviors Columns': ['impression_id',
  'article_id',
  'impression_time',
  'read_time',
  'scroll_percentage',
  'device_type',
  'article_ids_inview',
  'article_ids_clicked',
  'user_id',
  'is_sso_user',
  'gender',
  'postcode',
  'age',
  'is_subscriber',
  'session_id',
  'next_read_time',
  'next_scroll_percentage'],
 'Articles Columns': ['article_id',
  'title',
  'subtitle',
  'last_modified_time',
  'premium',
  'body',
  'published_time',
  'image_ids',
  'article_type',
  'url',
  'ner_clusters',
  'entity_groups',
  'topics',
  'category',
  'subcategory',
  'category_str',
  'total_inviews',
  'total_pageviews',
  'total_read_time',
  'sentiment_score',
  'sentiment_label']}

In [91]:
history_df.head()

Unnamed: 0,user_id,impression_time_fixed,scroll_percentage_fixed,article_id_fixed,read_time_fixed
0,13538,"[2023-04-27T10:17:43.000000, 2023-04-27T10:18:...","[100.0, 35.0, 100.0, 24.0, 100.0, 23.0, 100.0,...","[9738663, 9738569, 9738663, 9738490, 9738663, ...","[17.0, 12.0, 4.0, 5.0, 4.0, 9.0, 5.0, 46.0, 11..."
1,58608,"[2023-04-27T18:48:09.000000, 2023-04-27T18:48:...","[37.0, 61.0, 100.0, 100.0, 55.0, 100.0, 100.0,...","[9739362, 9739179, 9738567, 9739344, 9739202, ...","[2.0, 24.0, 72.0, 65.0, 11.0, 4.0, 101.0, 0.0,..."
2,95507,"[2023-04-27T15:20:28.000000, 2023-04-27T15:20:...","[60.0, 100.0, 100.0, 21.0, 29.0, 67.0, 49.0, 5...","[9739035, 9738646, 9634967, 9738902, 9735495, ...","[18.0, 29.0, 51.0, 12.0, 10.0, 10.0, 13.0, 24...."
3,106588,"[2023-04-27T08:29:09.000000, 2023-04-27T08:29:...","[24.0, 57.0, 100.0, nan, nan, 100.0, 100.0, 73...","[9738292, 9738216, 9737266, 9737556, 9737657, ...","[9.0, 15.0, 42.0, 9.0, 3.0, 58.0, 26.0, 214.0,..."
4,617963,"[2023-04-27T14:42:25.000000, 2023-04-27T14:43:...","[100.0, 100.0, nan, 46.0, 23.0, 19.0, 61.0, 70...","[9739035, 9739088, 9738902, 9738968, 9738760, ...","[45.0, 29.0, 116.0, 26.0, 34.0, 42.0, 58.0, 59..."


In [92]:
behaviors_df.head()

Unnamed: 0,impression_id,article_id,impression_time,read_time,scroll_percentage,device_type,article_ids_inview,article_ids_clicked,user_id,is_sso_user,gender,postcode,age,is_subscriber,session_id,next_read_time,next_scroll_percentage
0,48401,,2023-05-21 21:06:50,21.0,,2,"[9774516, 9771051, 9770028, 9775402, 9774461, ...",[9759966],22779,False,,,,False,21,16.0,27.0
1,152513,9778745.0,2023-05-24 07:31:26,30.0,100.0,1,"[9778669, 9778736, 9778623, 9089120, 9778661, ...",[9778661],150224,False,,,,False,298,2.0,48.0
2,155390,,2023-05-24 07:30:33,45.0,,1,"[9778369, 9777856, 9778500, 9778021, 9778627, ...",[9777856],160892,False,,,,False,401,215.0,100.0
3,214679,,2023-05-23 05:25:40,33.0,,2,"[9776715, 9776406, 9776566, 9776071, 9776808, ...",[9776566],1001055,False,,,,False,1357,40.0,47.0
4,214681,,2023-05-23 05:31:54,21.0,,2,"[9775202, 9776855, 9776688, 9771995, 9776583, ...",[9776553],1001055,False,,,,False,1358,5.0,49.0
