# Установка Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

Установка переменных среды

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

### Подключение необходимых библиотек

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

Загружаем [датасет](https://www.kaggle.com/datasets/asaniczka/amazon-uk-products-dataset-2023) с кагла

In [None]:
! pip install -q kaggle
from google.colab import files
files.upload()

In [None]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets download asaniczka/amazon-uk-products-dataset-2023
! unzip -q /content/amazon-uk-products-dataset-2023.zip -d /content/Dataset
!rm /content/amazon-uk-products-dataset-2023.zip

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col
def replace_double_quotes_with_empty(cell):
    return regexp_replace(cell, '"', '')

In [None]:
df = spark.read.csv('Dataset/products.csv', header=True, sep=',"')

for column_name in df.columns:
    df = df.withColumn(column_name, replace_double_quotes_with_empty(col(column_name)))

expressions = [col(column).alias(column.replace('"', '')) for column in df.columns]

df = df.select(expressions)
df.limit(5)

asin,title,imgUrl,productURL,stars,reviews,price,isBestSeller,boughtInLastMonth,categoryName
B09B96TG33,Echo Dot (5th gen...,https://m.media-a...,https://www.amazo...,4.7,15308,21.99,False,0,Hi-Fi Speakers
B01HTH3C8S,Anker Soundcore m...,https://m.media-a...,https://www.amazo...,4.7,98099,23.99,True,0,Hi-Fi Speakers
B09B8YWXDF,Echo Dot (5th gen...,https://m.media-a...,https://www.amazo...,4.7,15308,21.99,False,0,Hi-Fi Speakers
B09B8T5VGV,Echo Dot with clo...,https://m.media-a...,https://www.amazo...,4.7,7205,31.99,False,0,Hi-Fi Speakers
B09WX6QD65,Introducing Echo ...,https://m.media-a...,https://www.amazo...,4.6,1881,17.99,False,0,Hi-Fi Speakers


In [None]:
# Преобразование типов данных
df = df.withColumn("stars", df["stars"].cast("float"))
df = df.withColumn("reviews", df["reviews"].cast("int"))
df = df.withColumn("price", df["price"].cast("float"))
df = df.withColumn("boughtInLastMonth", df["boughtInLastMonth"].cast("int"))
df = df.withColumn("isBestSeller", df["isBestSeller"].cast("boolean"))
df.dtypes

[('asin', 'string'),
 ('title', 'string'),
 ('imgUrl', 'string'),
 ('productURL', 'string'),
 ('stars', 'float'),
 ('reviews', 'int'),
 ('price', 'float'),
 ('isBestSeller', 'boolean'),
 ('boughtInLastMonth', 'int'),
 ('categoryName', 'string')]

# Spark запросы

### 1. Топ 10 самых дорогих категорий, в которой товары купили хотя бы 1 раз за последний месяц и звезд больше 4

In [None]:
filtered_df = df.filter((col("boughtInLastMonth") >= 1) & (col("stars") > 4))
result = filtered_df.groupBy("categoryName").agg(round(avg(col("price")), 2).alias("avgPrice"))
sorted_result = result.orderBy(col("avgPrice").desc())
sorted_result.show(10, truncate=False)

+----------------------------+--------+
|categoryName                |avgPrice|
+----------------------------+--------+
|Mobile Phones & Smartphones |189.25  |
|3D Printers                 |176.66  |
|PA & Stage                  |159.99  |
|Coffee & Espresso Machines  |124.76  |
|Mowers & Outdoor Power Tools|100.6   |
|Home Office Furniture       |88.48   |
|Karaoke Equipment           |86.12   |
|Bedroom Furniture           |82.78   |
|Home Audio Record Players   |79.99   |
|Bedding Collections         |79.99   |
+----------------------------+--------+
only showing top 10 rows



### 2. Топ категорий по количеству бестселлеров

In [None]:
from pyspark.sql.functions import count
bestsellers_count = df.filter(col("isBestSeller")).groupBy("categoryName").agg(count("*").alias("BestSellersCount")).orderBy(col("BestSellersCount").desc())
bestsellers_count.show(truncate=False)

+---------------------------------+----------------+
|categoryName                     |BestSellersCount|
+---------------------------------+----------------+
|Health & Personal Care           |485             |
|Grocery                          |313             |
|Sports & Outdoors                |300             |
|Pet Supplies                     |283             |
|Power & Hand Tools               |243             |
|Baby                             |188             |
|Women                            |182             |
|Arts & Crafts                    |163             |
|Men                              |147             |
|Hardware                         |146             |
|Storage & Organisation           |142             |
|Large Appliances                 |139             |
|PC & Video Games                 |136             |
|Customers' Most Loved            |108             |
|Office Supplies                  |107             |
|Building Supplies                |93         

### 3. Вывести yникальные категории

In [None]:
distinct_categories = df.select("categoryName").distinct().show(truncate=False)

+---------------------------+
|categoryName               |
+---------------------------+
|Bird & Wildlife Care       |
|CD, Disc & Tape Players    |
|USB Gadgets                |
|200                        |
|Skiing Poles               |
|Motorbike Batteries        |
|Storage & Home Organisation|
|Projectors                 |
|Graphics Cards             |
|3D Printers                |
|Motorbike Accessories      |
|Motherboards               |
|Wind Instruments           |
|0                          |
|PC Gaming Accessories      |
|False                      |
|Handmade Gifts             |
|Pet Supplies               |
|Living Room Furniture      |
|Kids' Art & Craft Supplies |
+---------------------------+
only showing top 20 rows



### 4. Продукты с самой высокой оценкой (количеством звезд), у которых отзывов больше среднего

In [None]:
average_reviews = df.select(avg(col("reviews"))).collect()[0][0]

top_rated_products = df.filter((col("stars") == df.selectExpr("max(stars)").collect()[0][0]) & (col("reviews") > average_reviews))
top_rated_products.select("title").show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|PocBuds Wireless Earbuds, Bluetooth 5.3 Headphones with ENC Mic, 2023 Earpods 40H Deep Base Wireless Earphones IP5 Waterproof, LED Display USB-C Charging Case Ear buds|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+



### 5. Названия продуктов категории Pet Supplies и количество покупок за последний месяц, у определенных товаров
- звезд > 4,5
- отзывов > 100
- цена больше 15
- лучший продавец
- за последний месяц куплено > 1000.

In [None]:
result = df.filter(
    (col("stars") > 4.5) &
    (col("reviews") > 100) &
    (col("price") > 15) &
    (col("isBestSeller")) &
    (col("boughtInLastMonth") > 1000) &
    (col("categoryName") == "Pet Supplies")
)

result = result.orderBy(col("stars").desc(), col("reviews").desc())
result.select('title', 'boughtInLastMonth').limit(10)

title,boughtInLastMonth
Purina ONE Adult ...,9000
Sheba Fine Flakes...,10000
FELIWAY Classic 3...,5000
YuMOVE Senior Dog...,10000
James Wellbeloved...,2000
ICF - CLX Wipes -...,3000
