In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import os

In [2]:
# Mount Google Drive if using datasets stored there
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Load the Parquet file into a Pandas DataFrame to inspect columns
file_paths = {
    "behaviors": "/content/drive/My Drive/deeplearningdata/behaviors.parquet",  # Properly enclosed in quotes
    "history": "/content/drive/My Drive/deeplearningdata/history.parquet",
    "articles": "/content/drive/My Drive/deeplearningdata/articles.parquet"
}



In [4]:
# Select specific features by their names
selected_features = {
    "behaviors": ["article_id", "impression_id", "impression_time", "article_ids_inview", "user_id", "session_id", "article_ids_clicked"],
    "history": ["article_id_fixed", "user_id", "impression_time_fixed", "scroll_percentage_fixed", "read_time_fixed"],
    "articles": ["article_id", "published_time", "ner_clusters", "topics", "category", "total_read_time", "total_pageviews"]
}

In [5]:
# Function to load a small sample and print data types
def print_feature_dtypes(file_path, selected_columns, dataset_name):
    print(f"Dataset: {dataset_name}")
    data = pd.read_parquet(file_path, columns=selected_columns)
    print(data.dtypes)
    print("-" * 40)
# Print data types for each dataset
for dataset_name, file_path in file_paths.items():
    selected_columns = selected_features[dataset_name]
    print_feature_dtypes(file_path, selected_columns, dataset_name)

Dataset: behaviors
article_id                    float64
impression_id                  uint32
impression_time        datetime64[us]
article_ids_inview             object
user_id                        uint32
session_id                     uint32
article_ids_clicked            object
dtype: object
----------------------------------------
Dataset: history
article_id_fixed           object
user_id                    uint32
impression_time_fixed      object
scroll_percentage_fixed    object
read_time_fixed            object
dtype: object
----------------------------------------
Dataset: articles
article_id                  int32
published_time     datetime64[us]
ner_clusters               object
topics                     object
category                    int16
total_read_time           float32
total_pageviews           float64
dtype: object
----------------------------------------


In [13]:
def parquet_generator(file_path, selected_columns, chunk_size=1000):
    """Generator function to read specific columns from a parquet file in chunks using pyarrow."""
    table = pq.read_table(file_path, columns=selected_columns)
    df = table.to_pandas()
    for start in range(0, len(df), chunk_size):
        chunk = df.iloc[start:start + chunk_size]
        for _, row in chunk.iterrows():
            yield {
                col: (
                    str(row[col]) if isinstance(row[col], (list, pd.Series, np.ndarray)) else
                    row[col].isoformat() if isinstance(row[col], pd.Timestamp) else
                    row[col]
                )
                for col in selected_columns
            }

def infer_signature(file_path, selected_columns):
    """Infers output signature for TensorFlow dataset."""
    table = pq.read_table(file_path, columns=selected_columns)
    schema_df = table.to_pandas().iloc[:0]  # Fetch schema without rows
    output_signature = {
        col: tf.TensorSpec(
            shape=(),
            dtype=tf.string if pd.api.types.is_object_dtype(dtype) or col in ["article_ids_inview", "article_ids_clicked", "impression_time"] else
                  tf.float32 if pd.api.types.is_float_dtype(dtype) else
                  tf.int64 if pd.api.types.is_integer_dtype(dtype) else
                  tf.string  # Fallback for unsupported dtypes
        )
        for col, dtype in schema_df.dtypes.items()
    }
    return output_signature

def create_dataset(file_path, selected_columns, chunk_size=1000):
    """Creates a TensorFlow dataset from a parquet file using a generator."""
    output_signature = infer_signature(file_path, selected_columns)
    dataset = tf.data.Dataset.from_generator(
        lambda: parquet_generator(file_path, selected_columns, chunk_size),
        output_signature=output_signature
    )
    return dataset

# Create datasets for each parquet file
datasets = {
    key: create_dataset(file_paths[key], selected_features[key])
    for key in file_paths.keys()
}

# Example usage: iterate over the first few rows of the behaviors dataset
for row in datasets["behaviors"].take(5):
    print(row)

{'article_id': <tf.Tensor: shape=(), dtype=float32, numpy=nan>, 'impression_id': <tf.Tensor: shape=(), dtype=int64, numpy=48401>, 'impression_time': <tf.Tensor: shape=(), dtype=string, numpy=b'2023-05-21T21:06:50'>, 'article_ids_inview': <tf.Tensor: shape=(), dtype=string, numpy=b'[9774516 9771051 9770028 9775402 9774461 9759544 9773947 9142581 9775331\n 9775371 9759966]'>, 'user_id': <tf.Tensor: shape=(), dtype=int64, numpy=22779>, 'session_id': <tf.Tensor: shape=(), dtype=int64, numpy=21>, 'article_ids_clicked': <tf.Tensor: shape=(), dtype=string, numpy=b'[9759966]'>}
{'article_id': <tf.Tensor: shape=(), dtype=float32, numpy=9778745.0>, 'impression_id': <tf.Tensor: shape=(), dtype=int64, numpy=152513>, 'impression_time': <tf.Tensor: shape=(), dtype=string, numpy=b'2023-05-24T07:31:26'>, 'article_ids_inview': <tf.Tensor: shape=(), dtype=string, numpy=b'[9778669 9778736 9778623 9089120 9778661 9777492 9778718 9778657 9778682\n 9482970 9718262 9718298 9778728 9080070 9420172 9717914 977