# AWS Glue: Feature Store Data Processing

This notebook processes data for Feature Stores used in the XGBoost ranking model.
It runs in AWS Glue and prepares User and Item Feature Groups.


## Initialize Glue Session


In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Initialize Glue Context
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


In [None]:
from pyspark.sql.functions import col, from_unixtime, count, avg, countDistinct
import pyspark.sql.functions as f

spark.conf.set("spark.sql.caseSensitive", "true")


## Step 1: Read and Process Reviews Dataset


In [None]:
# S3 paths
reviews_input = "s3://recommendation-project-rapid/raw/All_Beauty.jsonl"
metadata_input = "s3://recommendation-project-rapid/raw/meta_All_Beauty.jsonl"

# Load Reviews
df_reviews = spark.read.json(reviews_input)

# Process reviews with event time
reviews_cleaned = (
    df_reviews.select(
        col("user_id"),
        col("parent_asin"),
        col("rating").cast("double"),
        col("timestamp")
    )
    .withColumn(
        "event_time_seconds", (col("timestamp") / 1000).cast("double")
    )
    .withColumn(
        "calendar_date", from_unixtime(col("event_time_seconds"), "yyyy-MM-dd")
    )
    .drop("timestamp")
)

print(f"Loaded {df_reviews.count()} reviews")


## Step 2: Read and Process Metadata Dataset


In [None]:
# Load Metadata
df_metadata = spark.read.json(metadata_input)

# Select columns needed for Feature Store
metadata_cleaned = df_metadata.select(
    col("parent_asin").alias("meta_parent_asin"),
    col("title").alias("movie_title"),
    col("main_category"),
    col("price").cast("double").alias("price")  # Add price if available
)

print(f"Loaded {df_metadata.count()} metadata records")


## Step 3: Create User Feature Group Data


In [None]:
# Aggregate user features
user_features = (
    reviews_cleaned
    .groupBy("user_id", "event_time_seconds")
    .agg(
        count("*").alias("rating_count_by_user"),
        avg("rating").alias("avg_rating_by_user")
    )
    .select(
        col("user_id"),
        col("event_time_seconds"),
        col("rating_count_by_user").cast("double"),
        col("avg_rating_by_user").cast("double")
    )
)

print(f"User features shape: {user_features.count()} records")
user_features.show(5)


## Step 4: Create Item Feature Group Data


In [None]:
# Join reviews with metadata
reviews_with_metadata = reviews_cleaned.join(
    metadata_cleaned,
    reviews_cleaned.parent_asin == metadata_cleaned.meta_parent_asin,
    "inner"
).drop("meta_parent_asin")

# Aggregate item features
item_features = (
    reviews_with_metadata
    .groupBy("parent_asin", "event_time_seconds")
    .agg(
        count("*").alias("rating_count"),
        avg("rating").alias("average_rating"),
        f.first("main_category").alias("main_category"),
        f.first("price").alias("price")
    )
    .select(
        col("parent_asin"),
        col("event_time_seconds"),
        col("rating_count").cast("double"),
        col("average_rating").cast("double"),
        col("main_category"),
        col("price").cast("double")
    )
)

print(f"Item features shape: {item_features.count()} records")
item_features.show(5)


## Step 5: Save to S3 for Feature Store Ingestion


In [None]:
# Output paths
user_features_output = "s3://recommendation-project-rapid/feature-store/users/"
item_features_output = "s3://recommendation-project-rapid/feature-store/items/"

# Write user features
user_features.write \
    .mode("overwrite") \
    .partitionBy("event_time_seconds") \
    .parquet(user_features_output)

# Write item features
item_features.write \
    .mode("overwrite") \
    .partitionBy("event_time_seconds") \
    .parquet(item_features_output)

print(f"User features saved to: {user_features_output}")
print(f"Item features saved to: {item_features_output}")
print("Job Complete! Data ready for Feature Store ingestion.")
