In [1]:
#Dependencies
import pandas as pd
import numpy as np
import tensorflow as tf
import itertools
import matplotlib.pyplot as plt
import time
from tensorflowonspark import TFCluster
from pyspark.sql import SparkSession

from envs import OfflineEnv
from recommender import DRRAgent

import os
os.environ["PYSPARK_PYTHON"] = "/data2/lc/my_envs/rec_rl_1/bin/python3.8"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/data2/lc/my_envs/rec_rl_1/bin/python3.8"
ROOT_DIR = os.getcwd()
DATA_DIR = os.path.join(ROOT_DIR, 'ml-1m')
STATE_SIZE = 10
MAX_EPISODE_NUM = 10

def train_fn(argv,ctx):
    # 初始化环境
    print('Data loading...')
    #Loading datasets
    ratings_list = [i.strip().split("::") for i in open(os.path.join(DATA_DIR,'ratings.dat'), 'r').readlines()]
    users_list = [i.strip().split("::") for i in open(os.path.join(DATA_DIR,'users.dat'), 'r').readlines()]
    movies_list = [i.strip().split("::") for i in open(os.path.join(DATA_DIR,'movies.dat'),encoding='latin-1').readlines()]
    ratings_df = pd.DataFrame(ratings_list, columns = ['UserID', 'MovieID', 'Rating', 'Timestamp'], dtype = np.uint32)
    movies_df = pd.DataFrame(movies_list, columns = ['MovieID', 'Title', 'Genres'])
    movies_df['MovieID'] = movies_df['MovieID'].apply(pd.to_numeric)

    print("Data loading complete!")
    print("Data preprocessing...")

    # 电影 id 到电影标题的映射
    movies_id_to_movies = {movie[0]: movie[1:] for movie in movies_list}
    ratings_df = ratings_df.applymap(int)

    # 按用户整理看过的电影
    users_dict = np.load('./data/user_dict.npy', allow_pickle=True)

    # 每个用户的电影历史长度
    users_history_lens = np.load('./data/users_histroy_len.npy')

    users_num = max(ratings_df["UserID"])+1
    items_num = max(ratings_df["MovieID"])+1

    # 训练设置
    train_users_num = int(users_num * 0.8)
    train_items_num = items_num
    train_users_dict = {k:users_dict.item().get(k) for k in range(1, train_users_num+1)}
    train_users_history_lens = users_history_lens[:train_users_num]

    print('DONE!')
    time.sleep(2)

    env = OfflineEnv(train_users_dict, train_users_history_lens, movies_id_to_movies, STATE_SIZE)
    recommender = DRRAgent(env, users_num, items_num, STATE_SIZE, use_wandb=False)
    recommender.actor.build_networks()
    recommender.critic.build_networks()
    recommender.train(MAX_EPISODE_NUM, load_model=False)


2025-03-26 13:28:56.604185: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-26 13:28:56.678110: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("DistributedDRR") \
    .getOrCreate()

# 从 SparkSession 中获取 SparkContext
sc = spark.sparkContext

# 配置 TensorFlow on Spark 集群
tf_args = []  # 定义 tf_args 参数


25/03/26 13:29:12 WARN Utils: Your hostname, bit3090 resolves to a loopback address: 127.0.1.1; using 192.168.0.4 instead (on interface eno2np1)
25/03/26 13:29:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/26 13:29:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
cluster = TFCluster.run(sc, train_fn, tf_args, num_executors=4, num_ps=1, tensorboard=False)


2025-03-26 13:29:23,101 INFO (MainThread-275056) Reserving TFSparkNodes 
2025-03-26 13:29:23,103 INFO (MainThread-275056) cluster_template: {'ps': [0], 'worker': [1, 2, 3]}
2025-03-26 13:29:23,110 INFO (MainThread-275056) Reservation server binding to port 0
2025-03-26 13:29:23,112 INFO (MainThread-275056) listening for reservations at ('192.168.0.4', 33299)
2025-03-26 13:29:23,114 INFO (MainThread-275056) Starting TensorFlow on executors
2025-03-26 13:29:23,422 INFO (MainThread-275056) Waiting for TFSparkNodes to start
2025-03-26 13:29:23,426 INFO (MainThread-275056) waiting for 4 reservations
2025-03-26 13:29:24,428 INFO (MainThread-275056) waiting for 4 reservations / 4]
2025-03-26 13:29:25,430 INFO (MainThread-275056) waiting for 4 reservations
2025-03-26 13:29:25.968768: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different 2025-03-26 13:29:25.968768: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slig

2025-03-26 13:29:30,284 INFO (MainThread-275608) node: {'executor_id': 0, 'host': '192.168.0.4', 'job_name': 'ps', 'task_index': 0, 'port': 43891, 'tb_pid': 0, 'tb_port': 0, 'addr': ('192.168.0.4', 38705), 'authkey': b'\xd2\xd7\xae=eWF@\xb8\xd8\xc4\xf8N\xe8F.'}
2025-03-26 13:29:30,284 INFO (MainThread-275608) node: {'executor_id': 1, 'host': '192.168.0.4', 'job_name': 'worker', 'task_index': 0, 'port': 35689, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-9tr43j3_/listener-pqypl1ku', 'authkey': b'j\xd7\xdf;H\xe5A<\x86k\xaa\x86\xf8\xe5\xa9\xd4'}
2025-03-26 13:29:30,284 INFO (MainThread-275608) node: {'executor_id': 2, 'host': '192.168.0.4', 'job_name': 'worker', 'task_index': 1, 'port': 37933, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-8v4kn4u2/listener-odn8do21', 'authkey': b'\xff\x8bv\xfc\xca\x14Ed\xa0\xeb\x92\x8bd\x87\xd3F'}
2025-03-26 13:29:30,284 INFO (MainThread-275608) node: {'executor_id': 3, 'host': '192.168.0.4', 'job_name': 'worker', 'task_index': 2, 'port': 44765, 'tb_pid'

In [None]:
cluster.shutdown()
spark.stop()