## Task 1

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
import random
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("OrderHistoryGenerator").getOrCreate()

cities = [
    ("Moscow", 55.7558, 37.6173), ("Saint Petersburg", 59.9343, 30.3351),
    ("Novosibirsk", 55.0084, 82.9357), ("Ekaterinburg", 56.8389, 60.6057),
    ("Simferopol", 44.5653, 34.0615), ("Nizhny Novgorod", 56.2965, 43.9361),
    ("Sevastopol", 44.3600, 36.3200), ("Samara", 53.2415, 50.2212),
    ("Rostov-on-Don", 47.2357, 39.7015), ("Ufa", 54.7348, 55.9578),
    
]

city_dict = {city[0]: city for city in cities}

facilities = [
    ("Starik_Hynkalych", city_dict["Simferopol"]),
    ("Starik_Hynkalych", city_dict["Sevastopol"]),
    ("Starik_Hynkalych", city_dict["Saint Petersburg"]),
    ("Pushkin", city_dict["Moscow"]),
    ("Aziatiq", random.choice(cities)),
    ("Aziatiq", random.choice(cities)),
    ("The_byk", random.choice(cities)),
    ("The_byk", random.choice(cities)),
    ("The_byk", random.choice(cities)),
    ("Yumi_Yumi", city_dict["Saint Petersburg"]),
    ("DODOpizza", random.choice(cities)),
    ("DODOpizza", random.choice(cities)),
    ("DODOpizza", random.choice(cities)),
    ("LETH", random.choice(cities))
]

menu_items = [
    ("Pizza", 500), ("Tea", 100), ("Apple Juice", 150),
    ("Sushi", 600), ("Pasta", 400), ("Ramen", 450),
    ("Salad_Caesar", 350), ("Steak", 800), ("Harcho", 300),
    ("Hinkali", 600), ("Chebyrek", 190), ("Fried_potato", 200),
    ("Spicy_Jokable", 1000), ("Lobster", 3500)
]

def generate_orders():
    start_date = datetime(2024, 1, 1)
    end_date = datetime(2024, 12, 31)
    num_orders = 1_000_000
    
    data = []
    for _ in range(num_orders):
        order_id = random.randint(10000, 999999)
        facility_name, (city, lat, lng) = random.choice(facilities)
        order_time = start_date + timedelta(seconds=random.randint(0, (end_date - start_date).total_seconds()))
        order_time_str = order_time.strftime("%Y-%m-%d %H:%M:%S")
        
        if random.random() < 0.1:
            if random.random() < 0.5:
                lat, lng = -999.999, -999.999
            else:
                facility_name = None

        for _ in range(random.randint(1, 5)):
            menu_item, price = random.choice(menu_items)
            
            if random.random() < 0.1:
                menu_item, price = None, None
            
            data.append((order_id, city, "Russia", facility_name, lat, lng, menu_item, price, order_time_str))
    
    return data

schema = StructType([
    StructField("Order_id", IntegerType(), False),
    StructField("city", StringType(), False),
    StructField("country", StringType(), False),
    StructField("facility_name", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("menu_item", StringType(), True),
    StructField("price", IntegerType(), True),
    StructField("datetime", StringType(), False)
])

df = spark.createDataFrame(generate_orders(), schema)
output_path = "orders_output"
df.repartition(10).write.csv(output_path, header=True, mode="overwrite")

print(f"CSV-файлы записаны в папку {output_path}")
spark.stop()

25/03/22 11:05:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/22 11:05:39 WARN TaskSetManager: Stage 0 contains a task of very large size (68756 KiB). The maximum recommended task size is 1000 KiB.
25/03/22 11:05:46 WARN TaskMemoryManager: Failed to allocate a page (8388592 bytes), try again.
25/03/22 11:05:46 WARN TaskMemoryManager: Failed to allocate a page (8388592 bytes), try again.
25/03/22 11:05:46 WARN TaskMemoryManager: Failed to allocate a page (8388592 bytes), try again.
25/03/22 11:05:46 WARN TaskMemoryManager: Failed to allocate a page (8388592 bytes), try again.
                                                                                

CSV-файлы записаны в папку orders_output


## Task 2

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.appName("OrderProcessing").getOrCreate()

input_path = "orders_output"
output_parquet = "orders_parquet"
output_avro = "orders_avro"
deadletter_path = "deadletter"

df = spark.read.option("header", "true").csv(input_path)

df = df.withColumn("price", col("price").cast("int")) \
       .withColumn("lat", col("lat").cast(DoubleType())) \
       .withColumn("lng", col("lng").cast(DoubleType())) \
       .withColumn("datetime", to_date(col("datetime")))

valid_records = df.filter(
    (col("facility_name").isNotNull()) & 
    (col("menu_item").isNotNull()) & 
    (col("price").isNotNull()) & 
    (col("lat") != -999.999) & 
    (col("lng") != -999.999)
)

invalid_records = df.subtract(valid_records)

invalid_records.write.mode("overwrite").csv(deadletter_path, header=True)

valid_records.write.mode("overwrite").partitionBy("datetime", "city").parquet(output_parquet)
valid_records.write.mode("overwrite").partitionBy("datetime", "city").format("avro").save(output_avro)

print("Фильтрация завершена. Данные сохранены в Parquet и Avro.")
spark.stop()

                                                                                

Фильтрация завершена. Данные сохранены в Parquet и Avro.


## Task 3

CREATE TABLE cities (
    id SERIAL PRIMARY KEY,
    name TEXT UNIQUE NOT NULL,
    country TEXT NOT NULL
);

CREATE TABLE facilities (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    city_id INTEGER REFERENCES cities(id) ON DELETE CASCADE
);

CREATE TABLE orders (
    id BIGINT PRIMARY KEY,
    facility_id INTEGER REFERENCES facilities(id) ON DELETE CASCADE,
    datetime TIMESTAMP NOT NULL
);

CREATE TABLE order_items (
    id SERIAL PRIMARY KEY,
    order_id BIGINT REFERENCES orders(id) ON DELETE CASCADE,
    menu_item TEXT NOT NULL,
    price INTEGER
);


Schema |    Name     | Type  |  Owner   
--------+-------------+-------+----------
 public | cities      | table | mlbduser
 public | facilities  | table | mlbduser
 public | order_items | table | mlbduser
 public | orders      | table | mlbduser


In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

postgres_jar_path = "postgresql-42.7.3.jar"

spark = SparkSession.builder \
    .appName("LoadOrdersToPostgres") \
    .config("spark.jars", postgres_jar_path) \
    .config("spark.driver.extraClassPath", postgres_jar_path) \
    .config("spark.executor.extraClassPath", postgres_jar_path) \
    .getOrCreate()

df = spark.read.format("avro").load("orders_avro")

jdbc_url = "jdbc:postgresql://localhost:5432/mldbdb?ssl=false"
properties = {
    "user": "mlbduser",
    "password": "mlbdpasswd",
    "driver": "org.postgresql.Driver"
}

cities_df = df.select("city", "country").distinct().withColumnRenamed("city", "name")
cities_df = cities_df.dropDuplicates(["name"])

cities_db_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "cities") \
    .option("user", "mlbduser") \
    .option("password", "mlbdpasswd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

new_cities_df = cities_df.join(cities_db_df, cities_df.name == cities_db_df.name, "left_anti")
new_cities_df.write.option("driver", "org.postgresql.Driver").jdbc(jdbc_url, "cities", mode="append", properties=properties)

cities_db_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "cities") \
    .option("user", "mlbduser") \
    .option("password", "mlbdpasswd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

facilities_df = df.select("facility_name", "city").distinct().withColumnRenamed("facility_name", "name")

facilities_df = facilities_df.alias("facilities")
cities_db_df = cities_db_df.alias("cities")

facilities_df = facilities_df.join(
    cities_db_df,
    facilities_df.city == cities_db_df.name
).select(
    facilities_df["name"],
    cities_db_df["id"].alias("city_id")
)

facilities_df = facilities_df.dropDuplicates(["name", "city_id"])
facilities_df.write.option("driver", "org.postgresql.Driver").jdbc(jdbc_url, "facilities", mode="append", properties=properties)

facilities_db_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "facilities") \
    .option("user", "mlbduser") \
    .option("password", "mlbdpasswd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

orders_df = df.select("Order_id", "facility_name", "datetime").distinct().withColumnRenamed("Order_id", "id")

orders_df = orders_df.alias("orders")
facilities_db_df = facilities_db_df.alias("facilities")

orders_df = orders_df.join(
    facilities_db_df,
    orders_df.facility_name == facilities_db_df.name
).select(
    orders_df["id"],
    facilities_db_df["id"].alias("facility_id"),
    orders_df["datetime"]
)

orders_df = orders_df.filter(col("datetime").isNotNull())
orders_df = orders_df.withColumn("id", col("id").cast("bigint"))

orders_df = orders_df.dropDuplicates(["id"])

orders_db_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "orders") \
    .option("user", "mlbduser") \
    .option("password", "mlbdpasswd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

new_orders_df = orders_df.join(orders_db_df, orders_df.id == orders_db_df.id, "left_anti")

if new_orders_df.count() > 0:
    new_orders_df.write.option("driver", "org.postgresql.Driver").jdbc(jdbc_url, "orders", mode="append", properties=properties)
    print(f"Вставлено {new_orders_df.count()} новых заказов.")
else:
    print("Нет новых заказов для вставки.")

orders_db_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "orders") \
    .option("user", "mlbduser") \
    .option("password", "mlbdpasswd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

order_items_df = df.select("Order_id", "menu_item", "price")
order_items_df = order_items_df.join(orders_db_df, order_items_df.Order_id == orders_db_df.id).select(
    orders_db_df["id"].alias("order_id"),
    order_items_df["menu_item"],
    order_items_df["price"]
)

order_items_df = order_items_df.dropDuplicates(["order_id", "menu_item"])
order_items_df.write.option("driver", "org.postgresql.Driver").jdbc(jdbc_url, "order_items", mode="append", properties=properties)

print("Данные успешно загружены в PostgreSQL")
spark.stop()

                                                                                

Нет новых заказов для вставки.


                                                                                

Данные успешно загружены в PostgreSQL


## TASK 4

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

postgres_jar_path = "postgresql-42.7.3.jar"

spark = SparkSession.builder \
    .appName("LoadOrdersToPostgresStarSchema") \
    .config("spark.jars", postgres_jar_path) \
    .config("spark.driver.extraClassPath", postgres_jar_path) \
    .config("spark.executor.extraClassPath", postgres_jar_path) \
    .getOrCreate()

df = spark.read.format("avro").load("orders_avro")

jdbc_url = "jdbc:postgresql://localhost:5432/mldbdb?ssl=false"
properties = {
    "user": "mlbduser",
    "password": "mlbdpasswd",
    "driver": "org.postgresql.Driver"
}

cities_df = df.select("city", "country").distinct().withColumnRenamed("city", "city_name")
cities_df.write.option("driver", "org.postgresql.Driver").jdbc(jdbc_url, "dim_cities", mode="append", properties=properties)

dim_cities_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "dim_cities") \
    .option("user", "mlbduser") \
    .option("password", "mlbdpasswd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

facilities_df = df.select("facility_name", "city").distinct().withColumnRenamed("facility_name", "name")
facilities_df = facilities_df.join(dim_cities_df, facilities_df.city == dim_cities_df.city_name) \
    .select(col("name").alias("facility_name"), col("city_id"))

facilities_df.write.option("driver", "org.postgresql.Driver").jdbc(jdbc_url, "dim_facilities", mode="append", properties=properties)

menu_items_df = df.select("menu_item", "price").distinct().withColumnRenamed("menu_item", "menu_item_name")
menu_items_df.write.option("driver", "org.postgresql.Driver").jdbc(jdbc_url, "dim_menu_items", mode="append", properties=properties)

dim_facilities_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "dim_facilities") \
    .option("user", "mlbduser") \
    .option("password", "mlbdpasswd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

dim_menu_items_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "dim_menu_items") \
    .option("user", "mlbduser") \
    .option("password", "mlbdpasswd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

orders_fact_df = df.join(dim_facilities_df, df.facility_name == dim_facilities_df.facility_name) \
    .join(dim_menu_items_df, df.menu_item == dim_menu_items_df.menu_item_name) \
    .select(
        col("Order_id").cast("bigint").alias("order_id"),
        col("facility_id"),
        col("menu_item_id"),
        col("datetime"),
        lit(1).alias("quantity")
    )

orders_fact_df = orders_fact_df.dropDuplicates(["order_id"])

orders_db_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "orders_fact") \
    .option("user", "mlbduser") \
    .option("password", "mlbdpasswd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

new_orders_df = orders_fact_df.join(orders_db_df, orders_fact_df.order_id == orders_db_df.order_id, "left_anti")

if new_orders_df.count() > 0:
    new_orders_df.write.option("driver", "org.postgresql.Driver").jdbc(jdbc_url, "orders_fact", mode="append", properties=properties)
    print(f"Вставлено {new_orders_df.count()} новых заказов.")
else:
    print("Нет новых заказов для вставки.")

print("Данные успешно загружены в PostgreSQL (Star Schema)")

spark.stop()

25/03/22 14:31:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/22 14:31:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/22 14:31:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/22 14:31:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/22 14:31:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/22 14:31:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/22 14:32:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/22 14:32:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/22 14:32:01 WARN RowBasedKeyValueBatch: Calling spill() on

Вставлено 0 новых заказов.
Данные успешно загружены в PostgreSQL (Star Schema)


## TASK 5

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, collect_list, struct
from pymongo import MongoClient

spark = SparkSession.builder \
    .appName("AggregateOrdersToMongoDB") \
    .getOrCreate()

parquet_path = "orders_parquet"
df = spark.read.parquet(parquet_path)

aggregated_df = df.groupBy("order_id").agg(
    _sum("price").alias("total_amount"),
    collect_list(
        struct(
            col("menu_item").alias("item_name"),
            col("price").alias("item_price")
        )
    ).alias("items")
)

mongo_uri = "mongodb://localhost:27017/"
mongo_db = "mldbdb"
mongo_collection = "orders"

aggregated_data = aggregated_df.collect()

client = MongoClient(mongo_uri)
db = client[mongo_db]
collection = db[mongo_collection]

for record in aggregated_data:
    record_dict = record.asDict()
    collection.insert_one(record_dict)

print("Данные успешно загружены в MongoDB")

spark.stop()

                                                                                

Данные успешно загружены в MongoDB
