In [None]:
from pyspark.sql import SparkSession
import time

In [None]:
spark = SparkSession.builder \
    .appName("SQL_Queries") \
    .getOrCreate()

In [None]:
start_time = time.time()
objects = spark.read.parquet("hdfs://localhost:9000/gin/objects.parquet")
object_addresses = spark.read.parquet("hdfs://localhost:9000/gin/object_addresses.parquet")
source_undivided = spark.read.parquet("hdfs://localhost:9000/gin/source_undivided.parquet")
source_destructing = spark.read.parquet("hdfs://localhost:9000/gin/source_destructing.parquet")


objects.createOrReplaceTempView("objects")
object_addresses.createOrReplaceTempView("object_addresses")
source_undivided.createOrReplaceTempView("source_undivided")
source_destructing.createOrReplaceTempView("source_destructing")

In [None]:
end_time = time.time()

print(f"Этап занял {end_time - start_time:.2f} секунд")

In [None]:
# Выполнение запроса 1
start_time = time.time()
result = spark.sql("""
    SELECT id
    FROM (
        SELECT DISTINCT o.id AS id, o.cadastral_number AS cn, 'asdfg' AS sa, 'dfgbhn' AS fa, 'hybgvf' AS ar
        FROM objects o
        JOIN source_destructing sd ON sd.cadastral_number = o.cadastral_number
        UNION
        SELECT DISTINCT oa.object_id AS id, '-21345' AS cn, oa.simple_address AS sa, oa.full_address AS fa, oa.address_reference AS ar
        FROM object_addresses oa
        JOIN source_destructing sd 
        WHERE sd.address = oa.full_address OR sd.address = oa.address_reference
    ) AS b
""")

In [None]:
result.show()
end_time = time.time()
print(f"Этап занял {end_time - start_time:.2f} секунд")

In [None]:
# Выполнение запроса 2
start_time = time.time()
result = spark.sql("""
    SELECT id
    FROM (
        SELECT DISTINCT o.id AS id, o.cadastral_number AS cn, 'asdfg' AS sa, 'dfgbhn' AS fa, 'hybgvf' AS ar
        FROM objects o
        JOIN source_undivided su ON su.cadastral_number = o.cadastral_number
        UNION
        SELECT DISTINCT oa.object_id AS id, '-21345' AS cn, oa.simple_address AS sa, oa.full_address AS fa, oa.address_reference AS ar
        FROM object_addresses oa
        JOIN source_undivided su 
        WHERE su.location = oa.simple_address OR su.location = oa.full_address OR su.location = oa.address_reference
    ) AS b
""")

In [None]:
result.show()
end_time = time.time()
print(f"Этап занял {end_time - start_time:.2f} секунд")

In [None]:
# Создание подзапроса для 3 запроса
start_time = time.time()
subquery = spark.sql("""
    SELECT DISTINCT o.id AS id, o.cadastral_number AS cn, 'asdfg' AS sa, 'dfgbhn' AS fa, 'hybgvf' AS ar
    FROM objects o
    JOIN source_undivided su ON su.cadastral_number = o.cadastral_number
    UNION
    SELECT DISTINCT oa.object_id AS id, '-21345' AS cn, oa.simple_address AS sa, oa.full_address AS fa, oa.address_reference AS ar
    FROM object_addresses oa
    JOIN source_undivided su 
    WHERE su.location = oa.simple_address OR su.location = oa.full_address OR su.location = oa.address_reference
""")
subquery.createOrReplaceTempView("b")

In [None]:
# Обновление данных - создаем новый DataFrame с обновленными значениями
updated_source_undivided = spark.sql("""
    SELECT 
        su.*,
        CASE
            WHEN su.cadastral_number = b.cn OR su.location IN (b.sa, b.fa, b.ar) THEN b.id
            ELSE su.object_id
        END AS new_object_id
    FROM source_undivided su
    LEFT JOIN b ON su.cadastral_number = b.cn OR su.location IN (b.sa, b.fa, b.ar)
""")

# Заменяем старый столбец object_id новым
final_source_undivided = updated_source_undivided.drop("object_id").withColumnRenamed("new_object_id", "object_id")

# Сохраняем обновленные данные обратно в HDFS
final_source_undivided.write.mode("append").parquet("hdfs://localhost:9000/gin/source_undivided_updated.parquet")

In [None]:
spark.stop()
end_time = time.time()
print(f"Этап занял {end_time - start_time:.2f} секунд")

In [None]:
spark.stop()