In [None]:
#Задание 1

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import random
import uuid
from datetime import datetime, timedelta
import faker


spark = SparkSession.builder.appName("OrderGeneration").getOrCreate()

cities = ["Moscow", "Saint Petersburg", "Novosibirsk", "Yekaterinburg", "Kazan", "Nizhny Novgorod", "Chelyabinsk", "Omsk", "Samara", "Rostov-on-Don"]
restaurants = ["Café Moscow", "Pizza & Coffee", "Tea House", "Sushi World", "Burger King", "Cafe 24", "Fresh Food", "Pasta Bar", "The Diner", "Taste of Italy"]
menu_items = [("Pizza", 150), ("Tea", 100), ("Apple Juice", 200), ("Coffee", 250), ("Cake", 400), ("Soup", 300), ("Salad", 250), ("Burger", 350)]
cities_coords = {
    "Moscow": (55.7558, 37.6173),
    "Saint Petersburg": (59.9343, 30.3351),
    "Novosibirsk": (55.0084, 82.9357),
    "Yekaterinburg": (56.8389, 60.6057),
    "Kazan": (55.8304, 49.0661),
    "Nizhny Novgorod": (56.2965, 43.9361),
    "Chelyabinsk": (55.1644, 61.4368),
    "Omsk": (54.9924, 73.3686),
    "Samara": (53.2007, 50.1500),
    "Rostov-on-Don": (47.2357, 39.7015),
}

fake = faker.Faker()
def generate_order(order_id, city, restaurant, date_time):
    order = []
    menu_item_count = random.randint(1, 5)
    for _ in range(menu_item_count):
        item, price = random.choice(menu_items)
        order.append((order_id, city, "Russia", restaurant, cities_coords[city][0], cities_coords[city][1], item, price, date_time))
    return order

orders = []
order_id = 1000000
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 12, 31)
date_range = (end_date - start_date).days

for city in cities:
    for restaurant in restaurants:
        for _ in range(1000): 
            date_time = start_date + timedelta(days=random.randint(0, date_range), hours=random.randint(0, 23), minutes=random.randint(0, 59))
            order_data = generate_order(order_id, city, restaurant, date_time.strftime('%Y-%m-%d %H:%M:%S'))
            orders.extend(order_data)
            order_id += 1

for i in range(len(orders) // 10):
    idx = random.randint(0, len(orders) - 1)
    if random.random() < 0.5:
        orders[idx] = orders[idx][:4] + (None, None, None, None) + orders[idx][8:]
    else:
        orders[idx] = orders[idx][:4] + (-999.999, -999.999) + orders[idx][6:]

columns = ["Order_id", "city", "country", "facility_name", "lat", "lng", "menu_item", "price", "datetime"]
df = spark.createDataFrame(orders, columns)
df.write.option("header", "true").csv("output/orders_data", mode="overwrite")
spark.stop()

In [None]:
#Задание 2

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os

#spark = SparkSession.builder.appName("ReadAndFilterOrders").getOrCreate()

#spark = SparkSession.builder \
 #   .appName("AvroExample") \
  #  .config("spark.jars", "file:///home/lidakarpovich/miniconda3/envs/jupyter_env/lib/python3.10/site-packages/pyspark/jars/spark-avro_2.12-3.1.2.jar") \
   # .getOrCreate()

spark = SparkSession.builder \
    .appName("AvroExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.5") \
    .getOrCreate()

spark.sql("SET spark.sql.avro.compression.codec=snappy")

input_path = "output/orders_data"
df = spark.read.option("header", "true").csv(input_path, inferSchema=True)
df.show(5)
invalid_df = df.filter(
    (col("lat").isNull()) | (col("lng").isNull()) | (col("lat") == -999.999) | (col("lng") == -999.999)
)

valid_df = df.subtract(invalid_df)
invalid_df.write.option("header", "true").csv("path/to/deadletter", mode="overwrite")
valid_df.show(5)

# Запись в Avro с партиционированием по дате и городу
#valid_df.write.format("avro").partitionBy("datetime", "city").mode("overwrite").save("path/to/output_avro/file_name.avro")
valid_df.write.format("avro").partitionBy("datetime", "city").mode("overwrite").save("path/to/output_avro/" + "file_name.avro")

# Запись в Parquet с партиционированием по дате и городу
valid_df.write.format("parquet").partitionBy("datetime", "city").mode("overwrite").save("path/to/output_parquet")

spark.stop()

In [None]:
#Задание 3

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, StringType, TimestampType

spark = SparkSession.builder.appName("AvroToPostgres").getOrCreate()
input_avro = "orders_avro"
df = spark.read.format("avro").load(input_avro)

schema = df \
    .withColumn("Order_id", col("Order_id").cast(IntegerType())) \
    .withColumn("lat", col("lat").cast(DoubleType())) \
    .withColumn("lng", col("lng").cast(DoubleType())) \
    .withColumn("price", col("price").cast(IntegerType())) \
    .withColumn("datetime", col("datetime").cast(TimestampType()))

# Разделение на 3НФ таблицы
cities_df = df.select("city", "country").distinct()
facilities_df = df.select("facility_name", "city", "lat", "lng").distinct()
menu_df = df.select("menu_item", "price").distinct()
orders_df = df.select("Order_id", "datetime", "facility_name", "menu_item")

postgres_url = "jdbc:postgresql://localhost:5432/orders_db"
postgres_properties = {
    "user": "postgres",
    "password": "password",
    "driver": "org.postgresql.Driver"
}

cities_df.write.jdbc(postgres_url, "cities", mode="overwrite", properties=postgres_properties)
facilities_df.write.jdbc(postgres_url, "facilities", mode="overwrite", properties=postgres_properties)
menu_df.write.jdbc(postgres_url, "menu", mode="overwrite", properties=postgres_properties)
orders_df.write.jdbc(postgres_url, "orders", mode="overwrite", properties=postgres_properties)

spark.stop()

In [None]:
#Задание 4

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, StringType, TimestampType

spark = SparkSession.builder.appName("AvroToPostgresStar").getOrCreate()

input_avro = "orders_avro"
df = spark.read.format("avro").load(input_avro)

df = df \
    .withColumn("Order_id", col("Order_id").cast(IntegerType())) \
    .withColumn("lat", col("lat").cast(DoubleType())) \
    .withColumn("lng", col("lng").cast(DoubleType())) \
    .withColumn("price", col("price").cast(IntegerType())) \
    .withColumn("datetime", col("datetime").cast(TimestampType()))

fact_orders_df = df.select(
    "Order_id", "datetime", "city", "country", "facility_name", "lat", "lng", "menu_item", "price"
)

postgres_url = "jdbc:postgresql://localhost:5432/orders_db"
postgres_properties = {
    "user": "postgres",
    "password": "password",
    "driver": "org.postgresql.Driver"
}

fact_orders_df.write.jdbc(postgres_url, "fact_orders", mode="overwrite", properties=postgres_properties)

spark.stop()

In [None]:
#Задание 5

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, struct, sum as spark_sum

spark = SparkSession.builder.appName("ParquetToMongo") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/orders_db.orders") \
    .getOrCreate()

input_parquet = "orders_parquet"
df = spark.read.parquet(input_parquet)

orders_aggregated_df = df.groupBy("Order_id", "city", "country", "facility_name", "datetime") \
    .agg(
        collect_list(struct("menu_item", "price")).alias("order_items"),
        spark_sum("price").alias("total_price")
    )

orders_aggregated_df.write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .mode("overwrite") \
    .save()

spark.stop()