## **Ratings Data Ingestion**

### **Exploring Data**

In [1]:
import pandas as pd
ratings_df = pd.read_csv(r"C:\Users\Shahe\movie-recommender\data\rating.csv")
ratings_df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,2,3.5,2005-04-02 23:53:47
1,1,29,3.5,2005-04-02 23:31:16
2,1,32,3.5,2005-04-02 23:33:39
3,1,47,3.5,2005-04-02 23:32:07
4,1,50,3.5,2005-04-02 23:29:40


In [2]:
ratings_df.shape

(20000263, 4)

In [3]:
ratings_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20000263 entries, 0 to 20000262
Data columns (total 4 columns):
 #   Column     Dtype  
---  ------     -----  
 0   userId     int64  
 1   movieId    int64  
 2   rating     float64
 3   timestamp  object 
dtypes: float64(1), int64(2), object(1)
memory usage: 610.4+ MB


In [4]:
ratings_df.isnull().sum()

userId       0
movieId      0
rating       0
timestamp    0
dtype: int64

### **Preprocessing Data**

In [5]:
import numpy as np
from datetime import datetime

RATING_MIN = 0.5
RATING_MAX = 5.0
RATING_STEP = 0.5


# -----------------------------
# Rating validation
# -----------------------------
def validate_ratings(ratings: pd.DataFrame) -> pd.DataFrame:
    ratings = ratings_df.copy()

    # Ensure numeric
    ratings = ratings[pd.to_numeric(ratings["rating"], errors="coerce").notna()]

    # Range validation
    ratings = ratings[
        (ratings["rating"] >= RATING_MIN) &
        (ratings["rating"] <= RATING_MAX)
    ]

    # Step validation (e.g., multiples of 0.5)
    ratings = ratings[
        (ratings["rating"] * (1 / RATING_STEP)).round() ==
        (ratings["rating"] * (1 / RATING_STEP))
    ]

    # Remove duplicates
    ratings = ratings.drop_duplicates(subset=["userId", "movieId"])

    return ratings


# -----------------------------
# Rating normalization
# -----------------------------
def normalize_ratings(ratings: pd.DataFrame) -> pd.DataFrame:
    ratings = ratings.copy()

    ratings["rating_scaled"] = (
        (ratings["rating"] - RATING_MIN) / (RATING_MAX - RATING_MIN)
    )

    return ratings

# -----------------------------
# Main pipeline
# -----------------------------
def run_preprocessing():
    
    ratings_valid = validate_ratings(ratings_df)
    ratings_clean = normalize_ratings(ratings_valid)
    

    # Rename columns to snake_case
    ratings_clean = ratings_clean.rename(
        columns={"userId": "user_id", "movieId": "movie_id"}
    )
    print(ratings_clean.head())
  

    # Save outputs
    ratings_clean.to_parquet(r"C:\Users\Shahe\movie-recommender\src\data\ratings_cleaned.parquet", index=False)

    print("Preprocessing completed successfully")


if __name__ == "__main__":
    run_preprocessing()


   user_id  movie_id  rating            timestamp  rating_scaled
0        1         2     3.5  2005-04-02 23:53:47       0.666667
1        1        29     3.5  2005-04-02 23:31:16       0.666667
2        1        32     3.5  2005-04-02 23:33:39       0.666667
3        1        47     3.5  2005-04-02 23:32:07       0.666667
4        1        50     3.5  2005-04-02 23:29:40       0.666667
Preprocessing completed successfully
