In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    to_timestamp,
    when,
    sum as spark_sum,
    lag,
    unix_timestamp,
    col,
    monotonically_increasing_id,
    udf
)
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.window import Window

import logging

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance,
    VectorParams,
    PointStruct
)


# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


In [2]:
 spark = SparkSession.builder.appName("test qdrant ").config("spark.driver.memory", "8g").config("spark.executor.memory", "8g").getOrCreate() 

In [3]:
# Load the CSV into a DataFrame
df = spark.read.format("csv").option("header", "true").load("data/test.csv")

# Display the first 2 rows
# Show schema
df.printSchema()

# Show sample data
df.show(5, truncate=False)



root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)

+------------------------+----------+----------+-------------------+-------------------------+------+------+---------+------------------------------------+
|event_time              |event_type|product_id|category_id        |category_code            |brand |price |user_id  |user_session                        |
+------------------------+----------+----------+-------------------+-------------------------+------+------+---------+------------------------------------+
|2019-11-01T00:00:00.000Z|view      |1003461   |2053013555631882655|electronics.smartphone   |xiaomi|489.07|520088904|4d3b30da-a5e4-49df-b1a8-ba5943f1dd33|

In [4]:
# (Optional) Repartition for parallelism
df = df.repartition(200)

# ------------------------------------------------------------------------
# 4. Convert `event_time` to Timestamp
# ------------------------------------------------------------------------
df = df.withColumn(
    "event_time",
    to_timestamp("event_time", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
)

# ------------------------------------------------------------------------
# 5. Generate flags for event types
# ------------------------------------------------------------------------
df = (
    df.withColumn("view_count", when(col("event_type") == "view", 1).otherwise(0))
      .withColumn("cart_count", when(col("event_type") == "cart", 1).otherwise(0))
      .withColumn("purchase_count", when(col("event_type") == "purchase", 1).otherwise(0))
)

# ------------------------------------------------------------------------
# 6. Compute session-level totals
# ------------------------------------------------------------------------
df_totals = df.groupBy("user_session").agg(
    spark_sum("view_count").alias("total_views"),
    spark_sum("cart_count").alias("total_carts"),
    spark_sum("purchase_count").alias("total_purchases")
)

# ------------------------------------------------------------------------
# 7. Compute product-level features (views, carts, purchases) and join
# ------------------------------------------------------------------------
df_product = df.groupBy("user_session", "product_id", "user_id").agg(
    spark_sum("view_count").alias("product_views"),
    spark_sum("cart_count").alias("product_carts"),
    spark_sum("purchase_count").alias("product_purchases")
)

df_features = df_product.join(df_totals, on="user_session", how="left")

df_features = (
    df_features
    .withColumn(
        "F1",
        when(col("total_views") != 0, col("product_views") / col("total_views")).otherwise(0)
    )
    .withColumn(
        "F2",
        when(col("total_carts") != 0, col("product_carts") / col("total_carts")).otherwise(0)
    )
    .withColumn(
        "F3",
        when(col("total_purchases") != 0, col("product_purchases") / col("total_purchases")).otherwise(0)
    )
)

# ------------------------------------------------------------------------
# 8. Create window for time-based features
# ------------------------------------------------------------------------
window_order = Window.partitionBy("user_session").orderBy("event_time")

df_time = (
    df.withColumn("prev_event_time", lag("event_time").over(window_order))
      .withColumn(
          "time_spent_seconds",
          unix_timestamp("event_time") - unix_timestamp("prev_event_time")
      )
      .na.fill(0, subset=["time_spent_seconds"])
      .withColumn("time_spent", col("time_spent_seconds").cast("double"))
)

# ------------------------------------------------------------------------
# 9. Aggregate time spent per product vs. total
# ------------------------------------------------------------------------
df_time_agg = df_time.groupBy("user_session", "product_id", "user_id").agg(
    spark_sum("time_spent").alias("product_time_spent")
)

df_total_time = df_time.groupBy("user_session").agg(
    spark_sum("time_spent").alias("total_time_spent")
)

df_features = (
    df_features
    .join(df_time_agg, on=["user_session", "product_id", "user_id"], how="left")
    .join(df_total_time, on="user_session", how="left")
)

df_features = df_features.withColumn(
    "F4",
    when(col("total_time_spent") != 0, col("product_time_spent") / col("total_time_spent"))
    .otherwise(0)
)

# ------------------------------------------------------------------------
# 10. Define weights and compute a final score
# ------------------------------------------------------------------------
w1, w2, w3, w4 = 0.1, 0.25, 0.45, 0.2
df_features = (
    df_features
    .withColumn(
        "score",
        w1 * col("F1") + w2 * col("F2") + w3 * col("F3") + w4 * col("F4")
    )
    .fillna({"score": 0})
)

final_df = df_features.select("user_id", "product_id", "score")
logger.info("Feature engineering completed.")
final_df.show(10)


INFO:__main__:Feature engineering completed.


+---------+----------+--------------------+
|  user_id|product_id|               score|
+---------+----------+--------------------+
|541999338|   1004873| 0.11002694380292534|
|513186499|   3700937|0.010434749505101263|
|512436136|   1004544| 0.06195899033158134|
|514439576|   1003310|                 1.0|
|554155613|   1005115|  0.0663168415792104|
|566401872|   1004767|  0.6991228070175439|
|556719801|   1307464| 0.05039044969033301|
|554683103|  14700087| 0.13874755381604698|
|544405527|   1480429|              0.1625|
|538524222|   1002532|  0.1473709557915535|
+---------+----------+--------------------+
only showing top 10 rows



In [5]:
df = final_df
df.show()

+---------+----------+--------------------+
|  user_id|product_id|               score|
+---------+----------+--------------------+
|512528032|  28713076|                 0.1|
|541999338|   1004873| 0.11002694380292534|
|544768462|  28721761| 0.17487179487179488|
|553464787|  22400054|0.006706955629129499|
|546526292|  28720354| 0.06014492753623188|
|513186499|   3700937|0.010434749505101263|
|512436136|   1004544| 0.06195899033158134|
|514439576|   1003310|                 1.0|
|566492425|   1004209|                 0.1|
|554155613|   1005115|  0.0663168415792104|
|547136026|   2702050|0.018813650169733784|
|566401872|   1004767|  0.6991228070175439|
|556719801|   1307464| 0.05039044969033301|
|554683103|  14700087| 0.13874755381604698|
|516453968|  21401294| 0.03378361475922452|
|544405527|   1480429|              0.1625|
|538524222|   1002532|  0.1473709557915535|
|565881130|   1004258| 0.30000000000000004|
|525147147|   5100798|0.030862282878411914|
|514095583|   1003475| 0.3000000

In [6]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import split, col



# Chuyển đổi user_id thành chuỗi và đóng gói trong một danh sách (Word2Vec yêu cầu input là danh sách từ)
df_string = df.withColumn("user_id_str", col("user_id").cast("string"))
df_words = df_string.withColumn("user_id_list", split(col("user_id_str"), ""))  # Mỗi ký tự là một "word"

# Áp dụng Word2Vec
word2Vec = Word2Vec(vectorSize=10, minCount=0, inputCol="user_id_list", outputCol="user_embedding")
model = word2Vec.fit(df_words)
df_with_embedding = model.transform(df_words)

# Chọn các cột cần thiết
df_final = df_with_embedding.select("user_id", "product_id", "score", "user_embedding")

# Hiển thị DataFrame với embedding
df_final.show(truncate=False)




+---------+----------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id  |product_id|score               |user_embedding                                                                                                                                                                                                      |
+---------+----------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|512528032|28713076  |0.1                 |[-0.15432980470359325,-0.02577964384108782,-0.2503759369254112,0.04343072660267353,-0.052165868878364566,-0.1265269317664206,-0.19043815284967425,-0.2899469114840031,-0.19845416061580182

In [None]:
client = QdrantClient(host='qdrant', port=6333)
df_pandas = df_final.select("user_id", "product_id", "score", "user_embedding").toPandas()
collection_name = "test"

# Kiểm tra xem collection đã tồn tại chưa
collections = client.get_collections()
if collection_name not in [collection.name for collection in collections.collections]:
    client.recreate_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(size=10, distance=Distance.COSINE)  # Thay `size` theo kích thước embedding của bạn
    )
    print(f"Collection '{collection_name}' đã được tạo.")
else:
    print(f"Collection '{collection_name}' đã tồn tại.")


INFO:numexpr.utils:Note: NumExpr detected 12 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
INFO:numexpr.utils:NumExpr defaulting to 8 threads.


In [None]:
from pyspark.sql.functions import col

# Giả sử df_final là DataFrame PySpark cuối cùng của bạn
# Chọn các cột cần thiết và chuyển đổi sang pandas
df_pandas = df_final.select("user_id", "product_id", "score", "user_embedding").toPandas()
df_pandas['user_id'] = df_pandas['user_id'].str.replace(',', '').astype(int)
# Chuyển cột embedding từ Vector (PySpark) sang list (Python)
df_pandas['user_embedding'] = df_pandas['user_embedding'].apply(lambda x: x.toArray().tolist() if hasattr(x, 'toArray') else x)

# Lưu từng point vào Qdrant
for idx, row in df_pandas.iterrows():
    point = PointStruct(
        id=row['user_id'],  # Sử dụng user_id làm ID point
        vector=row['user_embedding'],
        payload={
            "product_id": row['product_id'],
            "score": row['score']
        }
    )
    client.upsert(
        collection_name=collection_name,
        points=[point]
    )
    print(f"Đã lưu user_id: {row['user_id']}")
