**Цель задания:** Использовать Apache Spark для создания синтетического набора данных, который имитирует информацию о покупках в интернет-магазине.   
Набор данных должен включать в себя информацию о заказах, включая дату заказа, идентификатор пользователя, название товара, количество и цену.  
Сгенерированные данные будут использованы для последующего анализа покупательской активности и понимания потребительских трендов.  
**Задачи**
* Создать DataFrame с полями: Дата, UserID, Продукт, Количество, Цена.
* Данные для поля Продукт генерируются из списка возможных товаров ( не меньше 5 товаров )
* Количество и Цена должны генерироваться случайно в заданных пределах.
* Дата должна быть в пределах последнего года.
* UserID представляет собой случайное число, имитирующее идентификаторы пользователей.
* Обратите внимание, что должна быть возможности изменять количество сгенерированных строк. Минимальное количество - 1000 строк.
---
*Сохранение данных:
Сохранить сгенерированный DataFrame в формате CSV для последующего анализа.
Результат выполнения задания (код генерации синтетических данных и созданный файл *.csv) необходимо выложить в github/gitlab и указать ссылку на Ваш репозиторий (не забудьте: репозиторий должен быть публичным).*

In [587]:
import requests
from bs4 import BeautifulSoup
import re
from pyspark.sql import SparkSession 
import pyspark.sql.functions as F
import numpy as np
from datetime import date
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, FloatType

In [588]:

spark = SparkSession.builder.appName("Shop").getOrCreate()

In [589]:
req = requests.get('https://bystronom.ru/catalog/sales/supercena/')
# считываем текст HTML-документа
src = req.text
# инициализируем html-код страницы
soup = BeautifulSoup(src, 'html.parser')

# достаем максимальное кол-во страниц
pages_soup = soup.find_all('ul', {'class':'bx_pagination_page_list_num'})
page_numbers = int(re.findall(r'PAGEN_1=(\d+)', str(pages_soup))[-1])

prices = []
names = []

# в каждом цикле считывем новую страницу и добавляем к спискам
for num in range(1, page_numbers+1):
    req = requests.get(f'https://bystronom.ru/catalog/sales/supercena/?PAGEN_1={num}&SIZEN_1=12')
    # считываем текст HTML-документа
    src = req.text
    # инициализируем html-код страницы
    soup = BeautifulSoup(src, 'html.parser')


    names_soup = soup.find_all('div',{'class':'name'})
    names.extend(list(map(lambda x: x.decode_contents(), names_soup)))

    
    prices_soup = soup.find_all('div',{'class':'price'})
    for div in prices_soup:
        match = re.search(r'(\d+)<sup>(\d+)', div.decode_contents()) #  `decode_contents()` предоставляет только внутреннее содержимое без внешнего тега
        if match:
            prices.append(float(match.group(0).replace('<sup>','.')))

if len(names) == len(prices) > 0:
    products = dict(zip(names, prices))
else:
    print('Разная длина списков')


In [590]:
# cоздание серии дат
# с начала года по текущую дату
date_range = [["2024-01-01",date.today()]]
df = spark.createDataFrame(date_range,['start','end'])

# Функция sequence позволяет конструировать массив, генерируя последовательность элементов от start до stop (включительно). 
# Поддерживаемые типы: byte, short, integer, long, date и timestamp.
dates_df = df.selectExpr("sequence(to_date(start), to_date(end), interval 1 day) as date_sequence")

# explode - Возвращает новую строку для каждого элемента в заданном массиве или карте.
dates_df = dates_df.selectExpr("explode(date_sequence) AS date")

In [591]:
# генерируем данные и склеиваем в список
def generation_store(volume=5):
    user_id = np.random.randint(1,100000, size=volume).tolist()
    product= np.random.choice(list(products.keys()), size=volume).tolist()
    # подтягиваем цену из словаря для каждого продукта
    price = list(map(lambda x: products.get(x), product))
    cnt = np.random.randint(1,5, size=volume).tolist()
    dt = np.random.choice(list(dates_df.rdd.flatMap(lambda x: x).collect()), size=volume).tolist()
    
    return user_id, product, price, cnt, dt
client_id, product, price, cnt, dt = generation_store(volume=10000)  

if len(client_id) == len(product) == len(price) == len(cnt) == len(dt):
    shop_data = list(zip(dt, client_id, product, price, cnt))

In [592]:
# Создаем схему для DataFrame
schema = StructType([
    StructField("dt", DateType(), True),
    StructField("client_id", IntegerType(), True), 
    StructField("item", StringType(), True),
    StructField("price", FloatType(), True),
    StructField("quantity", IntegerType(), True)

])
# Создаем DataFrame с использованием схемы
df_shop = spark.createDataFrame(shop_data, schema)
df_shop.show(5)


+----------+---------+--------------------+-----+--------+
|        dt|client_id|                item|price|quantity|
+----------+---------+--------------------+-----+--------+
|2024-04-24|    26231|Горошек БОНДЮЭЛЬ ...| 79.9|       1|
|2024-02-01|    63049|Подгузники-трусик...|849.0|       1|
|2024-03-05|    44526|Колготки ГЛАМУР В...|289.9|       4|
|2024-04-07|    95720|Молоко ТОМСКОЕ МО...| 94.9|       2|
|2024-02-16|    44606|Мак.изделия МАКФА...| 49.9|       2|
+----------+---------+--------------------+-----+--------+
only showing top 5 rows



In [593]:
# сохраняем в 1 файл csv
df_shop.coalesce(1).write.csv('df_shop.csv')




                                                                                