# **Задачи на тренировку SPARK**

Таблица в БД - building
entrance (подъезд)  floor (этаж)  flat (номер квартиры)  num_persons (количество человек).
Для начала подготовим исходную выборку данных.

In [1]:
class Building:
  def __init__(self, num_of_entrances=1, num_of_floors=1, num_of_flats=1):
    self.num_of_entrances = num_of_entrances
    self.num_of_floors = num_of_floors
    self.num_of_flats = num_of_flats
  
  def get_building(self):
    building = []
    import itertools
    import random
    temp = list(itertools.product(range(1, self.num_of_entrances+1), 
                             range(1, self.num_of_floors+1), range(1, self.num_of_flats+1)))
    for i, flat in enumerate(temp, start=1):
      building.append({'entrance': flat[0], 'floor': flat[1], 'flat': i, 'num_persons': random.randint(1, 5)})
    return building

Создадим здание из 2 подъездов 2 этажей по 4 квартиры на этаже.

In [2]:
building = Building(2, 2, 4)
one_building = building.get_building()

Создадим датасет Spark.

In [3]:
# Подключаем библиотеки и создаем sparksession
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window

spark = SparkSession.builder \
                    .appName("buildings") \
                    .master("local") \
                    .enableHiveSupport() \
                    .getOrCreate()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=ba1a2a634cba09ed948c59733883f7f73ba53447654bb3e447bfea2f4f511778
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

Создаем датасет.

In [4]:
building = spark.createDataFrame(one_building)
building.show(5)

+--------+----+-----+-----------+
|entrance|flat|floor|num_persons|
+--------+----+-----+-----------+
|       1|   1|    1|          4|
|       1|   2|    1|          1|
|       1|   3|    1|          3|
|       1|   4|    1|          4|
|       1|   5|    2|          4|
+--------+----+-----+-----------+
only showing top 5 rows



Далее идут задачи с использованием данных этого датасета. Все задачи будут прорешаны 2-мя способами: с помощью spark.sql и spark.function.

### *1 Вывести все подъезды и этажи с количеством человек на этаже*

In [5]:
building.createOrReplaceTempView('building')

In [6]:
spark.sql("""
          select 
            entrance, 
            floor,
            sum(num_persons) as persons_on_the_floor
          from building
          group by entrance, floor
""").show()

+--------+-----+--------------------+
|entrance|floor|persons_on_the_floor|
+--------+-----+--------------------+
|       1|    1|                  12|
|       2|    2|                  12|
|       1|    2|                  12|
|       2|    1|                  16|
+--------+-----+--------------------+



In [7]:
building.groupBy("entrance", "floor").agg(f.sum("num_persons").alias("persons_on_the_floor")).show()

+--------+-----+--------------------+
|entrance|floor|persons_on_the_floor|
+--------+-----+--------------------+
|       1|    1|                  12|
|       2|    2|                  12|
|       1|    2|                  12|
|       2|    1|                  16|
+--------+-----+--------------------+



### *2 Вывести все подъезды и этажи с количеством человек на этаже больше 5*

In [8]:
spark.sql("""
          select
            entrance,
            floor,
            sum(num_persons) as persons_on_the_floor
          from building
          group by entrance, floor
          having sum(num_persons) > 5
          """).show()

+--------+-----+--------------------+
|entrance|floor|persons_on_the_floor|
+--------+-----+--------------------+
|       1|    1|                  12|
|       2|    2|                  12|
|       1|    2|                  12|
|       2|    1|                  16|
+--------+-----+--------------------+



In [9]:
building.groupby("entrance", "floor").agg(f.sum("num_persons").alias("persons_on_the_floor")).filter(f.col("persons_on_the_floor") > 5).show()

+--------+-----+--------------------+
|entrance|floor|persons_on_the_floor|
+--------+-----+--------------------+
|       1|    1|                  12|
|       2|    2|                  12|
|       1|    2|                  12|
|       2|    1|                  16|
+--------+-----+--------------------+



### *3 Вывести Подъезд (entrance) и этаж с самым большим суммарным количеством человек?*

In [14]:
spark.sql("""
          with sum_floor as (
            select
            entrance,
            floor,
            sum(num_persons) as persons_on_the_floor
            from building
            group by entrance, floor
          )
          select entrance, floor
          from sum_floor
          order by persons_on_the_floor desc
          limit 1          
          """).show()

+--------+-----+
|entrance|floor|
+--------+-----+
|       2|    1|
+--------+-----+



In [32]:
building.groupby("entrance", "floor").agg(f.sum(f.col("num_persons")).alias("persons_on_the_floor")).sort(f.desc("persons_on_the_floor")).limit(1).show()

+--------+-----+--------------------+
|entrance|floor|persons_on_the_floor|
+--------+-----+--------------------+
|       2|    1|                  16|
+--------+-----+--------------------+



### *4 Вывести для каждого подъезда самый густонаселенный этаж*

In [37]:
spark.sql("""
          with sum_floor as (
            select
            entrance,
            floor,
            sum(num_persons) as persons_on_the_floor
            from building
            group by entrance, floor
          ),

          sum_floor_with_rn as (
            select
            *,
            row_number() over (partition by entrance order by persons_on_the_floor desc) as rn
            from sum_floor
          )

          select
          entrance,
          floor,
          persons_on_the_floor
          from sum_floor_with_rn
          where rn = 1
          
          """).show()

+--------+-----+--------------------+
|entrance|floor|persons_on_the_floor|
+--------+-----+--------------------+
|       1|    1|                  12|
|       2|    1|                  16|
+--------+-----+--------------------+



In [59]:
building.groupBy("entrance", "floor")\
        .agg(f.sum("num_persons").alias("persons_on_the_floor"))\
        .withColumn("rn", f.row_number().over(Window.partitionBy("entrance").orderBy(f.desc("persons_on_the_floor"))))\
        .where("rn = 1")\
        .show()

+--------+-----+--------------------+---+
|entrance|floor|persons_on_the_floor| rn|
+--------+-----+--------------------+---+
|       1|    1|                  12|  1|
|       2|    1|                  16|  1|
+--------+-----+--------------------+---+

