In [1]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import matplotlib.pyplot as plt

In [3]:
WORKING_DIRECTORY = '/content/drive/MyDrive/Colab\ Notebooks/diploma/'

In [4]:
 # Создаём SparkSession
spark = SparkSession.builder \
    .appName("PetCo") \
    .getOrCreate()

# Сначала подготовим датасет с последовательностями покупок

In [5]:
# Read dataset with beh logs
data_logs = spark.read.parquet(WORKING_DIRECTORY+'data/needed_beh_logs')

In [6]:
data_logs.show()

+----------------+--------------------+-----+--------------------+--------------------+--------------------+-----------------+-------------+--------------------+--------------------+-----------+--------------------+------------+--------------------+--------------------+--------------------+-------+
|customer_user_id|              ac_key|ac_id|             user_id|          session_id|           timestamp|         raw_term|  filter_name|        filter_value|              action|customer_id|           item_name|variation_id|      variation_name|               items|            items_v2|revenue|
+----------------+--------------------+-----+--------------------+--------------------+--------------------+-----------------+-------------+--------------------+--------------------+-----------+--------------------+------------+--------------------+--------------------+--------------------+-------+
|       435675978|key_afiSr5Y4gCaaSW5X| 2560|e5fbf045-94fa-498...|e5fbf045-94fa-498...|2024-11-29 10

In [7]:
data_logs.printSchema()  # Структура данных

root
 |-- customer_user_id: string (nullable = true)
 |-- ac_key: string (nullable = true)
 |-- ac_id: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- raw_term: string (nullable = true)
 |-- filter_name: string (nullable = true)
 |-- filter_value: string (nullable = true)
 |-- action: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- item_name: string (nullable = true)
 |-- variation_id: long (nullable = true)
 |-- variation_name: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- customer_id: string (nullable = true)
 |    |    |-- item_id: long (nullable = true)
 |    |    |-- item_name: string (nullable = true)
 |    |    |-- price: double (nullable = true)
 |-- items_v2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- customer_id: string (nullabl

In [8]:
data_purchases = data_logs.filter(data_logs.action == 'purchase')
print(f'Number of group purchases: {data_purchases.count()}')

Number of group purchases: 98248


In [9]:
data_purchases.show()

+----------------+--------------------+-----+--------------------+--------------------+--------------------+--------+-----------+------------+--------+-----------+---------+------------+--------------+--------------------+--------------------+-------+
|customer_user_id|              ac_key|ac_id|             user_id|          session_id|           timestamp|raw_term|filter_name|filter_value|  action|customer_id|item_name|variation_id|variation_name|               items|            items_v2|revenue|
+----------------+--------------------+-----+--------------------+--------------------+--------------------+--------+-----------+------------+--------+-----------+---------+------------+--------------+--------------------+--------------------+-------+
|       635619080|key_afiSr5Y4gCaaSW5X| 1947|6521ca44-0aed-409...|6521ca44-0aed-409...|2024-11-29 01:34:...|    NULL|       NULL|        NULL|purchase|       NULL|     NULL|        NULL|          NULL|[{5190834, 612302...|[{5190834, 612302...| 

In [10]:
from pyspark.sql.functions import explode, col, lit

# Разворачиваем массив items
data_purchases_transformed = data_purchases.select(
    col("customer_user_id"),
    explode("items").alias("item"),  # Разворачиваем массив товаров
    lit(1).alias("purchase"),  # Добавляем столбец с 1
    col("timestamp")
).select(
    col("customer_user_id"),
    col("item.customer_id").alias("customer_id"),  # Извлекаем customer_id из структуры
    col("purchase"),
    col("timestamp")
)

data_purchases_transformed.show()

+----------------+-----------+--------+--------------------+
|customer_user_id|customer_id|purchase|           timestamp|
+----------------+-----------+--------+--------------------+
|       635619080|    5190834|       1|2024-11-29 01:34:...|
|       635619080|    5025044|       1|2024-11-29 01:34:...|
|       635619080|    5025045|       1|2024-11-29 01:34:...|
|       636184329|    5028719|       1|2024-11-29 19:42:...|
|       636184329|    5025045|       1|2024-11-29 19:42:...|
|       636184329|    5095824|       1|2024-11-29 19:42:...|
|       636184329|    5025044|       1|2024-11-29 19:42:...|
|       636184329|    5001348|       1|2024-11-29 19:42:...|
|       636184329|    5095826|       1|2024-11-29 19:42:...|
|       636184329|    5077002|       1|2024-11-29 19:42:...|
|       636184329|    5077002|       1|2024-11-29 19:42:...|
|       636184329|    5077004|       1|2024-11-29 19:42:...|
|        25837854|    5081330|       1|2024-11-29 23:10:...|
|        25837854|      

In [12]:
# Определяем путь для сохранения
output_path = '/content/drive/My Drive/Colab Notebooks/diploma/results/processed datasets/'

# Сохраняем DataFrame в файл Parquet
data_purchases_transformed.write.mode("overwrite").parquet(output_path)

print(f"Датасет сохранён по пути {output_path}.")

Датасет сохранён по пути /content/drive/My Drive/Colab Notebooks/diploma/results/processed datasets/.


In [18]:
# Переводим PySpark DataFrame в Pandas
pandas_data_purchases_transformed = data_purchases_transformed.toPandas()

# Указываем путь для сохранения CSV
output_csv_path = '/content/drive/My Drive/Colab Notebooks/diploma/results/processed_data/purchases.csv'

# Сохраняем Pandas DataFrame в CSV
pandas_data_purchases_transformed.to_csv(output_csv_path, index=False)

print(f"Датасет сохранён в формате CSV по пути {output_csv_path}.")

Датасет сохранён в формате CSV по пути /content/drive/My Drive/Colab Notebooks/diploma/results/processed_data/purchases.csv.


In [19]:
import pandas as pd
pd.read_csv('/content/drive/My Drive/Colab Notebooks/diploma/results/processed_data/purchases.csv')

Unnamed: 0,customer_user_id,customer_id,purchase,timestamp
0,635619080,5190834,1,2024-11-29 01:34:57.295
1,635619080,5025044,1,2024-11-29 01:34:57.295
2,635619080,5025045,1,2024-11-29 01:34:57.295
3,636184329,5028719,1,2024-11-29 19:42:45.191
4,636184329,5025045,1,2024-11-29 19:42:45.191
...,...,...,...,...
339486,555831271,5106697,1,2024-06-23 11:56:22.133
339487,76786633,5224513,1,2024-06-23 19:02:41.761
339488,76786633,5052535,1,2024-06-23 19:02:41.761
339489,76786633,5169632,1,2024-06-23 19:02:41.761


In [20]:
# import os
# from google.colab import drive

# # Монтируем Google Drive
# drive.mount('/content/drive')

# # Проверяем, существует ли директория
# directory_path = '/content/drive/My Drive/Colab Notebooks/diploma/results/processed datasets/'
# if not os.path.exists(directory_path):
#     os.makedirs(directory_path)
#     print(f"Папка {directory_path} создана.")
# else:
#     print(f"Папка {directory_path} существует.")

# # Сохраняем DataFrame в Google Drive
# output_path = '/content/drive/My Drive/Colab Notebooks/diploma/results/processed datasets/processed_data.parquet'
# final_df.write.mode("overwrite").parquet(output_path)

# # Чтение файла Parquet
# df = spark.read.parquet(output_path)
# df.show(truncate=False)

# Теперь подготовим датасет с описанием товаров

In [22]:
# Read dataset with beh logs
data_items = spark.read.parquet(WORKING_DIRECTORY+'data/data_set_items')
data_items.show()

+----------+---------------+--------------------+-----+-----------+--------------------+--------------------+--------------+-------------+--------------------+------------+-------------------+-------------------+----------+--------------------+
|        id|autocomplete_id|                name|score|customer_id|          name_lower|       metadata_json|computed_score|boost_or_bury|    ds_metadata_json|section_name|         created_at|         updated_at|       day|              ac_key|
+----------+---------------+--------------------+-----+-----------+--------------------+--------------------+--------------+-------------+--------------------+------------+-------------------+-------------------+----------+--------------------+
|3668992957|           1947|Kaytee Forti-Diet...|   -1|    5017277|kaytee forti-diet...|{"url": "/shop/en...|         68803|            0|{"weighted_keywor...|    Products|2022-05-16 14:44:33|2024-06-26 02:09:22|2024-06-26|key_afiSr5Y4gCaaSW5X|
|3668994348|        

In [23]:
data_items.printSchema()  # Структура данных

root
 |-- id: long (nullable = true)
 |-- autocomplete_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- name_lower: string (nullable = true)
 |-- metadata_json: string (nullable = true)
 |-- computed_score: integer (nullable = true)
 |-- boost_or_bury: integer (nullable = true)
 |-- ds_metadata_json: string (nullable = true)
 |-- section_name: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- day: string (nullable = true)
 |-- ac_key: string (nullable = true)



In [25]:
# Оставляем только столбцы id, customer_id, name, name_lower, metadata_json
data_items_filtered = data_items.select("id", "customer_id", "name", "name_lower", "metadata_json")

# Показываем результат
data_items_filtered.show(truncate=False)

+----------+-----------+------------------------------------------------------------------------------------+------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [26]:
from pyspark.sql.functions import from_json, col

# Определим структуру для metadata_json
metadata_schema = StructType([
    StructField("url", StringType(), True),
    StructField("image_url", StringType(), True),
    StructField("facets", StructType([
        StructField("How to get it", ArrayType(StringType()), True),
        StructField("Primary Brand", ArrayType(StringType()), True),
        StructField("Primary Color", ArrayType(StringType()), True),
        StructField("Killed Item flag", ArrayType(StringType()), True),
        StructField("Primary Pet Type", ArrayType(StringType()), True),
        StructField("Personalized Item flag", ArrayType(StringType()), True),
        StructField("Prescription Medication", ArrayType(StringType()), True)
    ]), True),
    StructField("group_ids", ArrayType(StringType()), True),
    StructField("PTC_OMNI_REPEAT_DELIVERY_FL", StringType(), True),
    StructField("parentCatEntryIDAsString", StringType(), True),
    StructField("PTC_OMNI_PRIMARY_ITEM_FLAG", StringType(), True),
    StructField("mfName", StringType(), True),
    StructField("PTC_OMNI_IN_STORE_ONLY_FLAG", StringType(), True),
    StructField("PTC_OMNI_PROP_65_FLAG", StringType(), True),
    StructField("PTC_OMNI_TAXONOMY", StringType(), True),
    StructField("PTC_OMNI_PERSONALIZED_ITEM_FL", StringType(), True),
    StructField("parentCatEntryID", StringType(), True),
    StructField("startDate", StringType(), True),
    StructField("deactivated", BooleanType(), True),
    StructField("PTC_OMNI_SAME_DAY_DELIVERY_FG", StringType(), True),
    StructField("PTC_OMNI_PDP_BEHAVIOR_TEMPLATE", StringType(), True),
    StructField("itemname", StringType(), True),
    StructField("PTC_OMNI_BOPUS_FLAG", StringType(), True),
    StructField("PTC_OMNI_BRAND_PRIMARY", StringType(), True),
    StructField("itemurl", StringType(), True),
    StructField("itemimg", StringType(), True),
    StructField("UPC_NUMBER", StringType(), True)
])

# Преобразуем metadata_json в структуру
data_items_parsed = data_items_filtered.withColumn(
    "metadata_parsed", from_json(col("metadata_json"), metadata_schema)
)

# Извлекаем нужные атрибуты из metadata_parsed в отдельные столбцы
data_items_with_columns = data_items_parsed.select(
    "id", "customer_id", "name", "name_lower",
    "metadata_parsed.url", "metadata_parsed.image_url", "metadata_parsed.itemname",
    "metadata_parsed.facets", "metadata_parsed.group_ids",
    "metadata_parsed.PTC_OMNI_REPEAT_DELIVERY_FL", "metadata_parsed.PTC_OMNI_PRIMARY_ITEM_FLAG",
    "metadata_parsed.PTC_OMNI_TAXONOMY", "metadata_parsed.PTC_OMNI_PERSONALIZED_ITEM_FL",
    "metadata_parsed.PTC_OMNI_SAME_DAY_DELIVERY_FG", "metadata_parsed.PTC_OMNI_PDP_BEHAVIOR_TEMPLATE",
    "metadata_parsed.PTC_OMNI_BOPUS_FLAG", "metadata_parsed.PTC_OMNI_BRAND_PRIMARY",
    "metadata_parsed.itemurl", "metadata_parsed.itemimg", "metadata_parsed.UPC_NUMBER"
)

# Показываем результат
# data_items_with_columns.show(truncate=False)

In [27]:
# one_item = data_items_with_columns.filter(data_items.customer_id == 5190834)
# one_item.show()

In [28]:
# one_item.select('facets').collect()

In [29]:
same_columns = ['name', 'name_lower', 'url', 'image_url', 'itemname', 'facets']

In [30]:
from pyspark.sql.functions import collect_list, udf
from pyspark.sql.types import ArrayType, StringType

# Функция для вычисления пересечения списка списков
def array_intersection(arrays):
    if not arrays:
        return []
    # Инициализируем пересечение первым списком
    result = set(arrays[0])
    # Пересекаем с каждым следующим списком
    for arr in arrays[1:]:
        result = result.intersection(arr)
    return list(result)

# Регистрируем UDF для вычисления пересечения
intersection_udf = udf(array_intersection, ArrayType(StringType()))

# Группируем по customer_id и собираем все group_ids в список списков, затем вычисляем пересечение
data_items_intersect = data_items_with_columns.groupBy("customer_id") \
    .agg(collect_list("group_ids").alias("group_ids_list")) \
    .withColumn("group_ids_intersect", intersection_udf("group_ids_list"))

# Выводим результат
# data_items_intersect.select("customer_id", "group_ids_intersect").show(truncate=False)

In [31]:
data_items_intersect.filter(data_items.customer_id == 5190834).select('group_ids_intersect').collect()[0]

Row(group_ids_intersect=['same-day-delivery-cat-products', 'buy-online-pick-up-in-store-dog-products', 'buy-online-pick-up-in-store-cat-products', 'same-day-delivery-dog-products', 'cat-feeding-scoops', 'dog-food-containers', 'dog-bowls'])

In [32]:
from pyspark.sql.functions import first

# Предполагаем, что same_columns уже определён, например:
# same_columns = ["customer_id", "url", "image_url", "itemname", ...]

# Агрегируем data_items_with_columns по customer_id:
# Для каждого столбца из same_columns (кроме customer_id) берем первое значение
aggregated_same = data_items_with_columns.groupBy("customer_id").agg(
    *[first(col_name).alias(col_name) for col_name in same_columns if col_name != "customer_id"]
)

# Теперь объединяем агрегированный DataFrame с DataFrame, содержащим group_ids_intersect.
# Предполагается, что data_items_intersect был получен ранее, например:
# data_items_intersect = data_items_with_columns.groupBy("customer_id") \
#     .agg(collect_list("group_ids").alias("group_ids_list")) \
#     .withColumn("group_ids_intersect", intersection_udf("group_ids_list"))
final_df = aggregated_same.join(
    data_items_intersect.select("customer_id", "group_ids_intersect"),
    on="customer_id",
    how="left"
)

In [33]:
from pyspark.sql.functions import col

final_df = final_df.withColumn("How_to_get_it", col("facets.`How to get it`")) \
                   .withColumn("Primary_Brand", col("facets.`Primary Brand`")) \
                   .withColumn("Primary_Color", col("facets.`Primary Color`")) \
                   .withColumn("Killed_Item_flag", col("facets.`Killed Item flag`")) \
                   .withColumn("Primary_Pet_Type", col("facets.`Primary Pet Type`")) \
                   .withColumn("Personalized_Item_flag", col("facets.`Personalized Item flag`")) \
                   .withColumn("Prescription_Medication", col("facets.`Prescription Medication`"))

# Если исходный столбец facets больше не нужен, можно его удалить:
final_df = final_df.drop("facets")

final_df.show(truncate=False)

+-----------+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------+-----------------------+-------------+----------------+----------------+----------------------+-----------------------+
|customer_id|name                                      

In [34]:
# Переводим PySpark DataFrame в Pandas
pandas_final_df = final_df.toPandas()

# Указываем путь для сохранения CSV
output_csv_path = '/content/drive/My Drive/Colab Notebooks/diploma/results/processed_data/items.csv'

# Сохраняем Pandas DataFrame в CSV
pandas_final_df.to_csv(output_csv_path, index=False)

print(f"Датасет сохранён в формате CSV по пути {output_csv_path}.")

Датасет сохранён в формате CSV по пути /content/drive/My Drive/Colab Notebooks/diploma/results/processed_data/items.csv.


In [36]:
import pandas as pd
pd.read_csv(output_csv_path)

Unnamed: 0,customer_id,name,name_lower,url,image_url,itemname,group_ids_intersect,How_to_get_it,Primary_Brand,Primary_Color,Killed_Item_flag,Primary_Pet_Type,Personalized_Item_flag,Prescription_Medication
0,100039,Prevue Pet Products Cockatiel Court Tabletop P...,prevue pet products cockatiel court tabletop p...,/shop/en/petcostore/product/prevue-pet-cockati...,https://assets.petco.com/petco/image/upload/f_...,Prevue Pet Products Cockatiel Court Tabletop P...,"['cockatiel-perches', 'parakeet-perches', 'con...",['One Time Delivery'],['Prevue Pet Products'],['Multi-Color'],['No'],['Bird'],['No'],
1,100140,Nature's Miracle Orange Scent Oxy Formula Set-...,nature's miracle orange scent oxy formula set-...,/shop/en/petcostore/product/natures-miracle-or...,https://assets.petco.com/petco/image/upload/f_...,Nature's Miracle Orange Scent Oxy Formula Set-...,"['stain-and-odor-removers', 'same-day-delivery...","['Same Day Delivery', 'Free Pickup Today', 'On...","[""Nature's Miracle""]",,['No'],"['Cat,Dog']",['No'],
2,100243,Kaytee Forti-Diet Pro Health Honey Stick Cocka...,kaytee forti-diet pro health honey stick cocka...,/shop/en/petcostore/product/kaytee-forti-diet-...,https://assets.petco.com/petco/image/upload/f_...,Kaytee Forti-Diet Pro Health Honey Stick Cocka...,"['conure-treats', 'repeat-delivery-eligible-pr...","['Same Day Delivery', 'Free Pickup Today', 'On...",['Kaytee'],,['No'],['Bird'],['No'],
3,100244,Kaytee Forti-Diet Pro Health Honey Stick Parro...,kaytee forti-diet pro health honey stick parro...,/shop/en/petcostore/product/kaytee-forti-diet-...,https://assets.petco.com/petco/image/upload/f_...,Kaytee Forti-Diet Pro Health Honey Stick Parro...,"['repeat-delivery-eligible-products', 'bird-tr...","['Same Day Delivery', 'Free Pickup Today', 'On...",['Kaytee'],,['No'],['Bird'],['No'],
4,100254,Royal Canin Shih Tzu Adult Breed Specific Dry ...,royal canin shih tzu adult breed specific dry ...,/shop/en/petcostore/product/royal-canin-mini-c...,https://assets.petco.com/petco/image/upload/f_...,Royal Canin Shih Tzu Adult Breed Specific Dry ...,"['small-breed-dog-food', 'repeat-delivery-elig...","['Same Day Delivery', 'Free Pickup Today', 'On...",['Royal Canin'],,['No'],['Dog'],['No'],
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
31996,8830,PoochPads Reusable Housebreaking Pad,poochpads reusable housebreaking pad,/shop/en/petcostore/product/poochpads-reusable...,https://assets.petco.com/petco/image/upload/f_...,PoochPads Reusable Housebreaking Pad,"['dog-potty-training', 'dog-repeat-delivery-pr...","['One Time Delivery', 'Repeat Delivery']",['PoochPads'],,['No'],['Dog'],['No'],
31997,8864,PetSafe Cat Flap,petsafe cat flap,/shop/en/petcostore/product/petsafe-cat-flap,https://assets.petco.com/petco/image/upload/f_...,PetSafe Cat Flap,['cat-doors-and-flaps'],['One Time Delivery'],['PetSafe'],['White'],['No'],['Cat'],['No'],['No']
31998,9095,TropiClean Papaya Mist Deodorizing Spray for Pets,tropiclean papaya mist deodorizing spray for pets,/shop/en/petcostore/product/tropiclean-natural...,https://assets.petco.com/petco/image/upload/f_...,TropiClean Papaya Mist Deodorizing Spray for Pets,"['animal-welfare-dog-supplies', 'sustainable-d...","['Same Day Delivery', 'Free Pickup Today', 'On...",['TropiClean'],,['No'],['Dog'],['No'],
31999,9612,Zoo Med Little Dripper,zoo med little dripper,/shop/en/petcostore/product/zoo-med-little-dri...,https://assets.petco.com/petco/image/upload/f_...,Zoo Med Little Dripper,['buy-online-pick-up-in-store-reptile-products...,"['Same Day Delivery', 'Free Pickup Today', 'On...",['Zoo Med'],,['No'],['Reptile'],['No'],
