# Feature Engineering and Complex Transformations

This notebook demonstrates advanced feature engineering using PySpark functions on the processed YouTube Analytics data.

## Tasks:
1. **engagement_score**: Weighted metric combining likes, dislikes, and comments relative to views
2. **days_to_trend**: Calculate days between trending_date and publish_time
3. **trending_rank**: Rank videos within each trending_date and category using Window Functions

In [None]:
# Setup notebook environment
from notebook_setup import setup_notebook_environment, test_imports

# Setup paths and test imports
project_root = setup_notebook_environment()
test_imports()

In [None]:
# Import required modules
from config.settings import Config
from src.utils.spark_utils import SparkUtils
from src.data_ingestion.processed_data_loader import ProcessedDataLoader

# PySpark functions for feature engineering
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DoubleType

import logging
logging.basicConfig(level=logging.INFO)

In [None]:
# Initialize Spark session
spark = SparkUtils.get_spark_session()
print(f"Spark version: {spark.version}")

# Initialize data loader
loader = ProcessedDataLoader(spark)
config = Config()

## Load Processed Data

Load the processed YouTube trending videos data that was generated by our pipeline.

In [None]:
# Load processed video data
print("Loading processed YouTube trending videos data...")
df = loader.load_processed_videos()

if df is None:
    raise ValueError("Could not load processed data. Please run the pipeline first.")

print(f"Loaded {df.count():,} records")
print("\nSchema:")
df.printSchema()

In [None]:
# Show sample data
print("Sample of loaded data:")
df.select(
    "video_id", "title", "channel_title", "trending_date", "publish_time",
    "views", "likes", "dislikes", "comment_count", "category_name", "country"
).show(5, truncate=False)

## Task 1: Calculate Engagement Score

Create an `engagement_score` column using the formula:
```
engagement_score = ((likes * 0.5) + (dislikes * 0.2) + (comment_count * 0.3)) / views
```

We'll handle division-by-zero errors by setting engagement_score to 0 when views is 0 or null.

In [None]:
# Task 1: Calculate engagement_score
print("Task 1: Calculating engagement_score...")

# Calculate engagement score with division-by-zero handling
df_with_engagement = df.withColumn(
    "engagement_score",
    F.when(
        (F.col("views").isNull()) | (F.col("views") == 0),
        0.0
    ).otherwise(
        (
            (F.coalesce(F.col("likes"), F.lit(0)) * 0.5) +
            (F.coalesce(F.col("dislikes"), F.lit(0)) * 0.2) +
            (F.coalesce(F.col("comment_count"), F.lit(0)) * 0.3)
        ) / F.col("views")
    ).cast(DoubleType())
)

print("Engagement score calculated successfully!")

# Show sample results
print("\nSample engagement scores:")
df_with_engagement.select(
    "video_id", "title", "views", "likes", "dislikes", "comment_count", "engagement_score"
).orderBy(F.desc("engagement_score")).show(10, truncate=False)

In [None]:
# Analyze engagement score statistics
print("Engagement Score Statistics:")
engagement_stats = df_with_engagement.select(
    F.count("engagement_score").alias("count"),
    F.mean("engagement_score").alias("mean"),
    F.stddev("engagement_score").alias("stddev"),
    F.min("engagement_score").alias("min"),
    F.max("engagement_score").alias("max"),
    F.expr("percentile_approx(engagement_score, 0.5)").alias("median")
).collect()[0]

for field in engagement_stats.asDict():
    value = engagement_stats[field]
    if isinstance(value, float):
        print(f"{field}: {value:.6f}")
    else:
        print(f"{field}: {value}")

## Task 2: Calculate Days to Trend

Calculate the number of days between `trending_date` and `publish_time`.
This requires parsing both date columns and computing the difference.

In [None]:
# Task 2: Calculate days_to_trend
print("Task 2: Calculating days_to_trend...")

# First, let's examine the date formats
print("Examining date formats:")
df_with_engagement.select("trending_date", "publish_time").show(5, truncate=False)

In [None]:
# Parse trending_date (format: YY.DD.MM) and publish_time (ISO format)
df_with_dates = df_with_engagement.withColumn(
    "trending_date_parsed",
    F.to_date(F.col("trending_date"), "yy.dd.MM")
).withColumn(
    "publish_date_parsed",
    F.to_date(F.col("publish_time_parsed"))
)

# Calculate days_to_trend
df_with_days_to_trend = df_with_dates.withColumn(
    "days_to_trend",
    F.when(
        F.col("trending_date_parsed").isNull() | F.col("publish_date_parsed").isNull(),
        None
    ).otherwise(
        F.datediff(F.col("trending_date_parsed"), F.col("publish_date_parsed"))
    ).cast(IntegerType())
)

print("Days to trend calculated successfully!")

# Show sample results
print("\nSample days_to_trend calculations:")
df_with_days_to_trend.select(
    "video_id", "title", "trending_date", "publish_time", 
    "trending_date_parsed", "publish_date_parsed", "days_to_trend"
).filter(F.col("days_to_trend").isNotNull()).show(10, truncate=False)

In [None]:
# Analyze days_to_trend statistics
print("Days to Trend Statistics:")
days_stats = df_with_days_to_trend.select(
    F.count("days_to_trend").alias("count"),
    F.mean("days_to_trend").alias("mean"),
    F.stddev("days_to_trend").alias("stddev"),
    F.min("days_to_trend").alias("min"),
    F.max("days_to_trend").alias("max"),
    F.expr("percentile_approx(days_to_trend, 0.5)").alias("median")
).collect()[0]

for field in days_stats.asDict():
    value = days_stats[field]
    if isinstance(value, float):
        print(f"{field}: {value:.2f}")
    else:
        print(f"{field}: {value}")

# Show distribution of days_to_trend
print("\nDays to Trend Distribution:")
df_with_days_to_trend.groupBy("days_to_trend").count().orderBy("days_to_trend").show(20)

## Task 3: Calculate Trending Rank

Use PySpark Window Functions to rank videos within each `trending_date` and `category_name` based on their `views` in descending order.

In [None]:
# Task 3: Calculate trending_rank using Window Functions
print("Task 3: Calculating trending_rank using Window Functions...")

# Define window specification: partition by trending_date and category_name, order by views descending
window_spec = Window.partitionBy("trending_date", "category_name").orderBy(F.desc("views"))

# Calculate trending_rank
df_with_rank = df_with_days_to_trend.withColumn(
    "trending_rank",
    F.row_number().over(window_spec)
)

print("Trending rank calculated successfully!")

# Show sample results
print("\nSample trending ranks (Top 3 videos per category per day):")
df_with_rank.select(
    "trending_date", "category_name", "trending_rank", "title", "channel_title", "views"
).filter(F.col("trending_rank") <= 3).orderBy(
    "trending_date", "category_name", "trending_rank"
).show(20, truncate=False)

In [None]:
# Analyze trending rank distribution
print("Trending Rank Analysis:")

# Count videos by rank position
print("\nDistribution of trending ranks:")
df_with_rank.groupBy("trending_rank").count().orderBy("trending_rank").show(10)

# Show top-ranked videos across all categories and dates
print("\nTop-ranked videos (rank = 1) by category and date:")
top_ranked = df_with_rank.filter(F.col("trending_rank") == 1)
print(f"Total #1 ranked videos: {top_ranked.count()}")

top_ranked.select(
    "trending_date", "category_name", "title", "channel_title", "views", "engagement_score"
).orderBy(F.desc("views")).show(10, truncate=False)

## Final Dataset with All Features

Let's create the final dataset with all the engineered features and analyze the results.

In [None]:
# Create final dataset with all features
print("Creating final dataset with all engineered features...")

# Select relevant columns for the final dataset
final_df = df_with_rank.select(
    "video_id", "title", "channel_title", "category_name", "country",
    "trending_date", "publish_time", "views", "likes", "dislikes", "comment_count",
    "engagement_score", "days_to_trend", "trending_rank"
)

print(f"Final dataset contains {final_df.count():,} records with {len(final_df.columns)} columns")

# Show schema of final dataset
print("\nFinal dataset schema:")
final_df.printSchema()

In [None]:
# Show comprehensive sample of final dataset
print("Sample of final dataset with all engineered features:")
final_df.orderBy(F.desc("engagement_score")).show(10, truncate=False)

In [None]:
# Summary statistics for all engineered features
print("Summary Statistics for Engineered Features:")
print("=" * 50)

# Engagement Score
print("\n1. Engagement Score:")
final_df.select(
    F.count("engagement_score").alias("count"),
    F.mean("engagement_score").alias("mean"),
    F.min("engagement_score").alias("min"),
    F.max("engagement_score").alias("max")
).show()

# Days to Trend
print("\n2. Days to Trend:")
final_df.select(
    F.count("days_to_trend").alias("count"),
    F.mean("days_to_trend").alias("mean"),
    F.min("days_to_trend").alias("min"),
    F.max("days_to_trend").alias("max")
).show()

# Trending Rank
print("\n3. Trending Rank:")
final_df.select(
    F.count("trending_rank").alias("count"),
    F.mean("trending_rank").alias("mean"),
    F.min("trending_rank").alias("min"),
    F.max("trending_rank").alias("max")
).show()

In [None]:
# Clean up
SparkUtils.stop_spark_session()
print("Spark session stopped.")