## <span style='color:#ff5f27'> 📝 Imports

In [None]:
# !pip install -r requirements.txt --quiet

In [1]:
import pandas as pd
import numpy as np
import great_expectations as ge
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

from features.users import generate_users
from features.videos import generate_video_content
from features.interactions import generate_interactions, generate_user_interactions_window_agg, generate_video_interactions_window_agg
from streaming import config

## <span style='color:#ff5f27'> ⚙️ Data Generation</span>


### <span style='color:#ff5f27'> 👥 Users Data Generation</span>

In [2]:
user_data = generate_users(
    config.USERS_AMOUNT_HISTORICAL,
    historical=True,
)

data_users_df = pd.DataFrame(user_data)
data_users_df.registration_date = data_users_df.registration_date.apply(pd.to_datetime)
data_users_df.head()

Unnamed: 0,user_id,gender,age,country,registration_date,registration_month
0,CS970A,Other,29,Madagascar,2023-09-22 09:34:00,2023-09
1,TR063X,Male,23,Bermuda,2024-01-18 09:34:00,2024-01
2,MM136F,Other,87,Libya,2023-05-12 09:34:00,2023-05
3,ZO655G,Male,52,Svalbard & Jan Mayen,2022-11-13 09:34:00,2022-11
4,TG286Q,Female,36,Paraguay,2023-09-01 09:34:00,2023-09


In [3]:
data_users_df.shape

(25000, 6)

### <span style='color:#ff5f27'> 🎥 Content Data Generation</span>


In [4]:
# Generate data for videos
video_data = generate_video_content(
    config.VIDEO_AMOUNT_HISTORICAL, 
    historical=True,
)

data_video_df = pd.DataFrame(video_data)
data_video_df.upload_date = data_video_df.upload_date.apply(pd.to_datetime)

data_video_df.head()

Unnamed: 0,video_id,category,views,likes,video_length,upload_date,upload_month
0,9YU52H,Sports,119401,87940,232,2023-03-29 09:34:03,2023-03
1,7QV37W,Music,296033,24250,229,2024-03-14 09:34:03,2024-03
2,4GM38U,Technology,101622,52096,78,2024-03-30 09:34:03,2024-03
3,9TM98X,Sports,196662,49376,221,2023-10-05 09:34:03,2023-10
4,6MN27O,Cooking,12995,7523,236,2022-05-15 09:34:03,2022-05


### <span style='color:#ff5f27'> 🔗 Interactions Generation</span>


In [5]:
# Generate interactions
interactions = generate_interactions(
    config.INTERACTIONS_AMOUNT_HISTORICAL, 
    user_data, 
    video_data,
)

data_interactions_df = pd.DataFrame(interactions)
data_interactions_df.interaction_date = data_interactions_df.interaction_date.apply(pd.to_datetime)
data_interactions_df.head()

Unnamed: 0,interaction_id,user_id,video_id,video_category,interaction_type,watch_time,interaction_date,interaction_day
0,2629-93-0330,EO463T,9EW94Y,Education,skip,198,2024-03-10 09:34:03,2024-03-10
1,9882-17-4396,CL565A,2NR54A,Lifestyle,skip,90,2023-02-17 09:34:00,2023-02-17
2,5872-38-9277,SA809C,5HN30A,Music,skip,61,2024-03-02 09:34:00,2024-03-02
3,1353-75-9585,CV182O,9CQ22Q,Dance,comment,24,2024-02-27 09:34:00,2024-02-27
4,6136-75-5775,OQ522I,6RG02X,Comedy,skip,93,2023-10-05 09:34:03,2023-10-05


In [6]:
user_interactions_window_agg = generate_user_interactions_window_agg(
    config.INTERACTIONS_AMOUNT_HISTORICAL, 
    user_data, 
    video_data,
)

In [7]:
user_interactions_window_agg_df = pd.DataFrame(user_interactions_window_agg)
user_interactions_window_agg_df.window_end_time = user_interactions_window_agg_df.window_end_time.apply(pd.to_datetime)
user_interactions_window_agg_df

Unnamed: 0,user_id,video_category,window_end_time,interaction_day,like_count,dislike_count,view_count,comment_count,share_count,skip_count,total_watch_time
0,YB094V,Technology,2022-05-04 14:34:03,2022-05-04,0,74,99,61,47,56,69
1,OR189I,Entertainment,2023-11-20 11:34:03,2023-11-20,78,10,64,25,79,13,86
2,GO239O,Entertainment,2022-11-18 00:34:03,2022-11-18,33,74,12,40,66,75,82
3,FU575T,Travel,2022-05-13 06:34:03,2022-05-13,41,44,41,9,1,69,18
4,VC840U,Lifestyle,2023-10-14 07:34:03,2023-10-14,45,72,35,100,74,10,82
...,...,...,...,...,...,...,...,...,...,...,...
999995,NP477J,Sports,2023-08-07 07:34:03,2023-08-07,22,82,35,79,64,12,98
999996,KA512K,Cooking,2024-03-22 12:34:03,2024-03-22,52,21,32,74,39,93,68
999997,PK178A,Comedy,2023-07-05 13:34:03,2023-07-05,81,76,64,2,83,39,43
999998,FD801A,Technology,2023-01-18 21:34:03,2023-01-18,37,17,10,87,44,46,66


In [8]:
video_interactions_window_agg = generate_video_interactions_window_agg(    
    config.INTERACTIONS_AMOUNT_HISTORICAL, 
    user_data, 
    video_data,
)

In [9]:
video_interactions_window_agg_df = pd.DataFrame(video_interactions_window_agg)
video_interactions_window_agg_df.window_end_time = video_interactions_window_agg_df.window_end_time.apply(pd.to_datetime)
video_interactions_window_agg_df

Unnamed: 0,video_id,window_end_time,interaction_day,like_count,dislike_count,view_count,comment_count,share_count,skip_count,total_watch_time
0,1FY50Q,2022-10-14 22:34:03,2022-10-14,28,95,90,17,34,69,48
1,1RR97E,2022-05-22 04:34:03,2022-05-22,71,55,42,47,52,2,3
2,8AR54U,2023-11-11 11:34:03,2023-11-11,26,30,44,22,17,61,15
3,3LR58N,2023-02-08 06:34:03,2023-02-08,78,34,88,4,2,57,25
4,5DQ37G,2023-02-05 12:34:03,2023-02-05,59,22,64,58,29,21,58
...,...,...,...,...,...,...,...,...,...,...
999995,5NE56Z,2022-12-22 11:34:03,2022-12-22,21,92,93,15,1,50,61
999996,1AJ05X,2023-05-03 02:34:03,2023-05-03,94,34,9,38,92,52,55
999997,0VJ10Y,2024-03-30 06:34:03,2024-03-30,50,98,53,73,18,75,78
999998,2GG92N,2022-05-08 09:34:03,2022-05-08,82,19,32,55,84,99,37


## <span style="color:#ff5f27">👮🏻‍♂️ Great Expectations </span>

In [10]:
# Create a Great Expectations DataFrame from the pandas DataFrame
ge_users_df = ge.from_pandas(data_users_df)

# Initialize the expectation suite
expectation_suite_users = ge_users_df.get_expectation_suite()
expectation_suite_users.expectation_suite_name = "user_data_suite"

# Expectation: Age should be between 0 and 120
expectation_suite_users.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={"column": "age", "min_value": 12, "max_value": 100}
    )
)

# Expectations: Columns should not have null values
for column in ge_users_df.columns:
    expectation_suite_users.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_not_be_null",
            kwargs={"column": column}
        )
    )

# Expectation: Gender should only contain specific values
expectation_suite_users.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_distinct_values_to_be_in_set",
        kwargs={"column": "gender", "value_set": ["Male", "Female", "Other"]}
    )
)

{"meta": {}, "kwargs": {"column": "gender", "value_set": ["Male", "Female", "Other"]}, "expectation_type": "expect_column_distinct_values_to_be_in_set"}

In [11]:
# Create a Great Expectations DataFrame from the pandas DataFrame
ge_video_df = ge.from_pandas(data_video_df)

# Initialize the expectation suite
expectation_suite_videos = ge_video_df.get_expectation_suite()
expectation_suite_videos.expectation_suite_name = "video_data_suite"

# Expectation: Views, Likes, and Video Length should be non-negative
for column in ["views", "likes", "video_length"]:
    expectation_suite_videos.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_be_between",
            kwargs={"column": column, "min_value": 0, "max_value": None}
        )
    )

# Expectation: Valid date format for upload_date
expectation_suite_videos.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_dateutil_parseable",
        kwargs={"column": "upload_date"}
    )
)

{"meta": {}, "kwargs": {"column": "upload_date"}, "expectation_type": "expect_column_values_to_be_dateutil_parseable"}

In [12]:
# Create a Great Expectations DataFrame from the pandas DataFrame
ge_interactions_df = ge.from_pandas(data_interactions_df)

# Initialize the expectation suite
expectation_suite_interactions = ge_interactions_df.get_expectation_suite()
expectation_suite_interactions.expectation_suite_name = "interactions_data_suite"

# Expectations: Non-null values in all columns
for column in ge_interactions_df.columns:
    expectation_suite_interactions.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_not_be_null",
            kwargs={"column": column}
        )
    )

# Expectation: Validate interaction types
expectation_suite_interactions.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_distinct_values_to_be_in_set",
        kwargs={
            "column": "interaction_type",
            "value_set": ['like', 'dislike', 'view', 'comment', 'share', 'skip']
        }
    )
)

# Expectation: Positive watch time
expectation_suite_interactions.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={"column": "watch_time", "min_value": 0}
    )
)

{"meta": {}, "kwargs": {"column": "watch_time", "min_value": 0}, "expectation_type": "expect_column_values_to_be_between"}

## <span style="color:#ff5f27">🔮 Connect to Hopsworks Feature Store </span>

In [13]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Multiple projects found. 

	 (1) Bytewax_pipeline
	 (2) flink_tiktok



Enter project to access:  2



Logged in to project, explore it here https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120
Connected. Call `.close()` to terminate connection gracefully.


## <span style="color:#ff5f27">🪄 Feature Group Creation </span>


In [14]:
users_fg = fs.get_or_create_feature_group(
    name="users",
    description="Users data.",
    version=1,
    primary_key=["user_id"],
    partition_key=["registration_month"],
    event_time="registration_date",
    online_enabled=True,
    expectation_suite=expectation_suite_users,
)

users_fg.insert(data_users_df)
print('Done ✅')

Feature Group created successfully, explore it at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120/fs/68/fg/2094
Validation succeeded.
Validation Report saved successfully, explore a summary at https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120/fs/68/fg/2094


Uploading Dataframe: 0.00% |          | Rows 0/25000 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: users_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai/p/120/jobs/named/users_1_offline_fg_materialization/executions
Done ✅


In [15]:
videos_fg = fs.get_or_create_feature_group(
    name="videos",
    description="Videos data.",
    version=1,
    primary_key=["video_id"],
    partition_key=["upload_month"],
    online_enabled=True,
    event_time="upload_date",
    expectation_suite=expectation_suite_videos,
)

videos_fg.insert(data_video_df)
print('Done ✅')

Feature Group created successfully, explore it at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120/fs/68/fg/2095
Validation failed.
Validation Report saved successfully, explore a summary at https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120/fs/68/fg/2095


Uploading Dataframe: 0.00% |          | Rows 0/25000 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: videos_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai/p/120/jobs/named/videos_1_offline_fg_materialization/executions
Done ✅


In [16]:
interactions_fg = fs.get_or_create_feature_group(
    name="interactions",
    description="Interactions data.",
    version=1,
    primary_key=["interaction_id", "user_id", "video_id"],
    partition_key = ["interaction_day"],
    online_enabled=True,
    event_time="interaction_date",
    expectation_suite=expectation_suite_interactions,
)

interactions_fg.insert(data_interactions_df)
print('Done ✅')

Feature Group created successfully, explore it at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120/fs/68/fg/2096
Validation succeeded.
Validation Report saved successfully, explore a summary at https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120/fs/68/fg/2096


Uploading Dataframe: 0.00% |          | Rows 0/1000000 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: interactions_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai/p/120/jobs/named/interactions_1_offline_fg_materialization/executions
Done ✅


In [17]:
user_window_agg_1h_fg = fs.get_or_create_feature_group(
    "user_window_agg_1h",
    version=1,
    statistics_config=False,
    primary_key=["user_id"],
    partition_key=["interaction_day"],
    event_time="window_end_time",
    online_enabled=True,
    stream=True,
)

user_window_agg_1h_fg.insert(user_interactions_window_agg_df)

Feature Group created successfully, explore it at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120/fs/68/fg/2097


Uploading Dataframe: 0.00% |          | Rows 0/1000000 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: user_window_agg_1h_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai/p/120/jobs/named/user_window_agg_1h_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x33ff0d720>, None)

In [18]:
video_window_agg_1h_fg = fs.get_or_create_feature_group(
    "video_window_agg_1h",
    version=1,
    statistics_config=False,
    primary_key=["video_id"],
    partition_key=["interaction_day"],
    event_time="window_end_time",
    online_enabled=True,
    stream=True,
)

video_window_agg_1h_fg.insert(video_interactions_window_agg_df)

Feature Group created successfully, explore it at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120/fs/68/fg/2098


Uploading Dataframe: 0.00% |          | Rows 0/1000000 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: video_window_agg_1h_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai/p/120/jobs/named/video_window_agg_1h_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x33ff64f40>, None)

## <span style="color:#ff5f27">🪄 Ranking Feature Group </span>


In [19]:
video_interactions_df = pd.merge(
    data_interactions_df, 
    data_video_df, 
    on='video_id', 
    how='inner',
)

ranking_df = pd.merge(
    video_interactions_df, 
    data_users_df, 
    on='user_id', 
    how='inner',
)

ranking_df['label'] = np.where(
    ranking_df.interaction_type.isin(['view', 'like', 'share', 'comment']), 
    1, 
    0,
)

ranking_df.drop(
    ['interaction_id', 'interaction_type', 'watch_time', 'interaction_date', 'upload_date', 'registration_date'], 
    axis=1,
    inplace=True,
)

ranking_df.head()

Unnamed: 0,user_id,video_id,video_category,interaction_day,category,views,likes,video_length,upload_month,gender,age,country,registration_month,label
0,EO463T,9EW94Y,Education,2024-03-10,Education,173225,155626,198,2023-11,Male,86,St. Barthélemy,2022-08,0
1,EO463T,5ME15H,Lifestyle,2023-10-07,Lifestyle,45880,4041,234,2022-07,Male,86,St. Barthélemy,2022-08,1
2,EO463T,8JD08X,Comedy,2023-06-24,Comedy,46231,26117,143,2022-10,Male,86,St. Barthélemy,2022-08,0
3,EO463T,3KS70N,Education,2023-11-08,Education,56552,47677,197,2023-10,Male,86,St. Barthélemy,2022-08,0
4,EO463T,9LO34X,Dance,2022-08-23,Dance,32679,30403,131,2022-06,Male,86,St. Barthélemy,2022-08,0


In [20]:
ranking_fg = fs.get_or_create_feature_group(
    name="ranking",
    description="Ranking Data.",
    version=1,
    primary_key=["user_id", "video_id"],
    partition_key = ["interaction_day"],    
    online_enabled=True, # why online?
)

ranking_fg.insert(ranking_df)
print('Done ✅')

Feature Group created successfully, explore it at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai:443/p/120/fs/68/fg/2099


Uploading Dataframe: 0.00% |          | Rows 0/1002387 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: ranking_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://60342400-fd68-11ee-a374-5db5bf1f1917.cloud.hopsworks.ai/p/120/jobs/named/ranking_1_offline_fg_materialization/executions
Done ✅


---