Создаем спарк сессию и читаем датасеты

In [53]:
from pyspark.sql import SparkSession
import os
import sys
from pyspark.sql.functions import *

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
 

spark = SparkSession.builder\
    .config("spark.driver.memory", "10g")\
    .master('local[*]')\
    .appName('Task2').getOrCreate()



city_and_rest = spark.read.format("csv").\
                option("delimiter", ",").\
                option("header","true").\
                option("encoding", "cp1251").\
                load("MLBD_Lab1_citysandrest.csv")\
                .withColumn('rid', monotonically_increasing_id())

menu_and_price = spark.read.format("csv").\
                option("delimiter", ",").\
                option("header","true").\
                option("encoding", "cp1251").\
                load("MLBD_Lab1_menuanddata.csv")


Множим датасет с городами

In [54]:
city_and_rest1 = city_and_rest.withColumn('facility_name',  expr(f"explode(array_repeat(facility_name,4))"))

Создаем временной ряд с колонкой rid (rest id) для джоина

In [64]:
unique_rest_num = city_and_rest.select('facility_name').distinct().count()
unique_menu_num = menu_and_price.select('Eat').distinct().count()

def timeseries(seed=2,count_=4,range_=80000):

  timeseries = spark.range(range_).withColumnRenamed('id','oid')
  random_timestamp = lit((rand(seed) * 31536000) + 1735689600) 
  random_datetime = from_unixtime(random_timestamp, "yyyy-MM-dd HH:mm:ss")

  timeseries = timeseries.withColumn('time', concat(random_datetime.substr(1, 10), lit(' '), random_datetime.substr(12, 8)))

  timeseries = timeseries.withColumn('rid',floor(rand()*unique_rest_num)).orderBy(rand()) #.\

  

  return timeseries

timeseries = timeseries(5,3,81000)
timeseries.show(10)

+--------+-------------------+---+
|order_id|               time|rid|
+--------+-------------------+---+
|   25578|2025-03-04 21:34:40|  7|
|   55345|2025-03-14 05:17:19| 11|
|   79069|2025-08-17 13:47:12| 11|
|   60111|2025-09-20 02:08:09|  6|
|   34663|2025-05-20 07:47:39| 21|
|   79286|2025-01-02 15:16:13| 11|
|   80745|2025-11-01 04:39:21| 14|
|   40301|2025-09-30 23:01:01|  5|
|   19211|2025-07-15 19:13:58| 16|
|   77262|2025-01-05 23:45:56| 15|
+--------+-------------------+---+
only showing top 10 rows



Джоним

In [65]:
result =  city_and_rest1.join(timeseries, on='rid')

Делаем кросс джоин меню со списком, состоящих из уникальных номеров ресторанов


Это нужно, чтобы для каждая позиция в меню была в каждом ресторане

In [66]:
menu_and_price1 = menu_and_price.crossJoin(spark.range(unique_rest_num)).orderBy(rand())

Далее джоиним рестораны и меню

In [67]:
result1 = result.join(menu_and_price1,menu_and_price1.id==result.rid).drop('id').drop('rid').orderBy(rand())

Сэмплируем и сортируем по времени наши заказы

In [68]:
dataset = result1.sample(0.1).sort('time')


Вот и наш датасет с заказами

Безусловно, один и тот же ресторан может повторяться несколько раз, так как время заказа может отличаться

In [69]:
dataset.show(800)

+----------------+-------+--------------------+----------+-----------+--------+-------------------+-------------------+-------+
|            city|country|       facility_name|       lat|        lng|order_id|               time|                Eat|  Price|
+----------------+-------+--------------------+----------+-----------+--------+-------------------+-------------------+-------+
|     Novosibirsk| Russia|       #СИБИРЬСИБИРЬ|55.0295233| 82.9049737|   73231|2025-01-01 03:06:34|         Cheesecake| 640.89|
|     Novosibirsk| Russia|       #СИБИРЬСИБИРЬ|55.0295233| 82.9049737|   73231|2025-01-01 03:06:34|     Sandwiches set| 555.47|
|     Novosibirsk| Russia|       #СИБИРЬСИБИРЬ|55.0295233| 82.9049737|   73231|2025-01-01 03:06:34|                Tea| 134.90|
|     Novosibirsk| Russia|       #СИБИРЬСИБИРЬ|55.0295233| 82.9049737|   73231|2025-01-01 03:06:34|       Black caviar|7777.77|
|     Novosibirsk| Russia|       #СИБИРЬСИБИРЬ|55.0295233| 82.9049737|   73231|2025-01-01 03:06:34|     

Размер датасета

In [70]:
dataset.count()

841536

Загрязняем

In [71]:
result = dataset.withColumn("city", when(rand() < 12e-2, None).otherwise(col("city"))) \
                .withColumn("country", when(rand() < 23e-2, None).otherwise(col("country"))) \
                .withColumn("facility_name", when(rand() < 14e-2, None).otherwise(col("facility_name"))) \
                .withColumn("lat", when(rand() < 35e-2 , -999.999).otherwise(col("lat"))) \
                .withColumn("lng", when(rand() < 46e-2, -999.999).otherwise(col("lng"))) \
                .withColumn("eat", when(rand() < 37e-2, None).otherwise(col("eat"))) \
                .withColumn("price", when(rand() < 28e-2, None).otherwise(col("price"))) \
                .withColumn("time", when(rand() < 19e-2, None).otherwise(col("time")))

In [72]:
result.show(800)

+----------------+-------+--------------------+----------+-----------+--------+-------------------+-------------------+-------+
|            city|country|       facility_name|       lat|        lng|order_id|               time|                eat|  price|
+----------------+-------+--------------------+----------+-----------+--------+-------------------+-------------------+-------+
|     Novosibirsk| Russia|       #СИБИРЬСИБИРЬ|  -999.999|   -999.999|   73231|2025-01-01 03:06:34|                Tea|   null|
|            null| Russia|       #СИБИРЬСИБИРЬ|  -999.999| 82.9049737|   73231|2025-01-01 03:06:34|               null| 134.90|
|     Novosibirsk| Russia|       #СИБИРЬСИБИРЬ|  -999.999|   -999.999|   73231|2025-01-01 03:06:34|               null|   null|
|     Novosibirsk| Russia|       #СИБИРЬСИБИРЬ|55.0295233| 82.9049737|   73231|2025-01-01 03:06:34|           BBQ ribs| 367.51|
|     Novosibirsk| Russia|       #СИБИРЬСИБИРЬ|  -999.999| 82.9049737|   73231|               null|     

Сохраняем

In [73]:
path = 'task1/'

result.repartition(1).write.option("header",False) \
    .option("delimiter",";") \
    .format("csv") \
    .mode("overwrite") \
    .save(path)