In [1]:
import os
# Tránh lỗi python3 trên Windows
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"



from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, to_date, sum as _sum, count, avg
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import mean_squared_error, mean_absolute_error



# Load và xử lý dữ liệu


In [2]:
import os
import pandas as pd

def load_and_prepare_data(meta_path, review_path, top_n=5, output_dir=r"C:\Users\PC\OneDrive\Dokumen\Amazon_sales_forecasting\data\sales_history"):
    spark = SparkSession.builder \
        .appName("Sales Revenue Forecasting") \
        .master("local[*]") \
        .config("spark.driver.memory", "8g") \
        .getOrCreate()

    # Đọc dữ liệu
    df_meta = spark.read.parquet(meta_path)
    df_reviews = spark.read.json(review_path)

    # Lọc các cột cần thiết từ df_meta
    cols_needed = [
        "parent_asin", "title", "price", "average_rating", "rating_number",
        "categories", "features", "description", "main_category", "store"
    ]
    df_meta_filtered = df_meta.select([c for c in cols_needed if c in df_meta.columns])

    # Xử lý dữ liệu reviews
    reviews = df_reviews.withColumn("date", to_date(from_unixtime(col("timestamp") / 1000)))
    joined_df = reviews.join(df_meta_filtered, on="parent_asin", how="left")

    # Tính doanh thu hàng ngày
    revenue_df = joined_df.filter(col("price").isNotNull()) \
        .withColumn("date", to_date("date")) \
        .groupBy("parent_asin", "date") \
        .agg(
            _sum("price").alias("daily_revenue"),
            count("*").alias("num_reviews")
        )

    # Tính tổng doanh thu và các chỉ số khác cho mỗi ASIN
    asin_metrics = revenue_df.groupBy("parent_asin") \
        .agg(
            count("*").alias("num_days"),
            _sum("daily_revenue").alias("total_revenue"),
            avg("daily_revenue").alias("avg_daily_revenue")
        ) \
        .filter(col("num_days") > 30) \
        .orderBy(col("total_revenue").desc())

    # Lấy top_n mã sản phẩm
    top_asins = asin_metrics.limit(top_n).select("parent_asin").rdd.flatMap(lambda x: x).collect()

    # Tạo thư mục lưu nếu chưa có
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Lưu lịch sử doanh thu và thông tin vào CSV cho mỗi sản phẩm
    result = []
    for asin_id in top_asins:
        target_df = revenue_df.filter(col("parent_asin") == asin_id).orderBy("date")
        df_pd = target_df.select("date", "daily_revenue").toPandas()

        # Lấy thông tin sản phẩm từ df_meta_filtered
        product_info = df_meta_filtered.filter(col("parent_asin") == asin_id).first()
        title = product_info["title"] if product_info else "Unknown"
        price = product_info["price"] if product_info else "Unknown"
        avg_rating = product_info["average_rating"] if product_info else "Unknown"
        num_reviews = product_info["rating_number"] if product_info else "Unknown"

        # Thêm thông tin sản phẩm vào dữ liệu doanh thu
        df_pd["asin_id"] = asin_id
        df_pd["title"] = title
        df_pd["price"] = price
        df_pd["average_rating"] = avg_rating
        df_pd["num_reviews"] = num_reviews

        # Lưu dữ liệu thành file CSV
        file_path = os.path.join(output_dir, f"{asin_id}_sales_history.csv")
        df_pd.to_csv(file_path, index=False)

        result.append((asin_id, df_pd))

    return result


def prepare_sequence(df_pd, seq_length=14):
    scaler = MinMaxScaler()
    sales_scaled = scaler.fit_transform(df_pd["daily_revenue"].values.reshape(-1, 1))

    def create_sequences(data, seq_length):
        xs, ys = [], []
        for i in range(len(data) - seq_length):
            x = data[i:i+seq_length]
            y = data[i+seq_length]
            xs.append(x)
            ys.append(y)
        return np.array(xs), np.array(ys)

    X, y = create_sequences(sales_scaled, seq_length)
    return X, y, scaler, sales_scaled


# Xây dựng LSTM model

In [3]:
class LSTMModel(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=2):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out



# Training

In [4]:
def train_model(X, y, asin_id, log_dir=r"C:\Users\PC\OneDrive\Dokumen\Amazon_sales_forecasting\logs", num_epochs=150, lr=0.001, save_path = r"C:\Users\PC\OneDrive\Dokumen\Amazon_sales_forecasting\best_models"):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = LSTMModel().to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    writer = SummaryWriter(log_dir=os.path.join(log_dir, f"asin_{asin_id}"))

    X_tensor = torch.tensor(X, dtype=torch.float32).to(device)
    y_tensor = torch.tensor(y, dtype=torch.float32).to(device)

    for epoch in range(num_epochs):
        model.train()
        outputs = model(X_tensor)
        loss = criterion(outputs, y_tensor)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        mse = mean_squared_error(y_tensor.cpu().detach().numpy(), outputs.cpu().detach().numpy())
        mae = mean_absolute_error(y_tensor.cpu().detach().numpy(), outputs.cpu().detach().numpy())

        writer.add_scalar("Loss/MSE", loss.item(), epoch)
        writer.add_scalar("Metrics/MSE", mse, epoch)
        writer.add_scalar("Metrics/MAE", mae, epoch)

    writer.close()
    
    if asin_id:
        os.makedirs(save_path, exist_ok=True)
        torch.save(model.state_dict(), os.path.join(save_path, f"lstm_model_{asin_id}.pt"))
        
    return model

# main process

In [5]:
meta_path = r"file:///C:/Users/PC/OneDrive/Dokumen/Amazon_sales_forecasting/data/metadata/meta_Clothing_Shoes_and_Jewelry_*.parquet"
review_path = r"D:/Clothing_Shoes_and_Jewelry/Clothing_Shoes_and_Jewelry.jsonl"

products = load_and_prepare_data(meta_path, review_path, top_n=5)

for asin_id, df_pd in products:
    print(f"Training model for ASIN: {asin_id}")
    X, y, scaler, scaled_series = prepare_sequence(df_pd)
    model = train_model(X, y, asin_id=asin_id, save_path=r"C:\Users\PC\OneDrive\Dokumen\Amazon_sales_forecasting\best_models")
    print(f"Finished training for ASIN: {asin_id}\n")

Training model for ASIN: B07TVHSDMQ
Finished training for ASIN: B07TVHSDMQ

Training model for ASIN: B0BHTPQ72V
Finished training for ASIN: B0BHTPQ72V

Training model for ASIN: B0C62MD9JY
Finished training for ASIN: B0C62MD9JY

Training model for ASIN: B07XFXXZMV
Finished training for ASIN: B07XFXXZMV

Training model for ASIN: B01M17LSEE
Finished training for ASIN: B01M17LSEE

