In [40]:
import findspark
findspark.init('C:/Spark')

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window as w
from pyspark.sql import DataFrame

from functools import reduce

In [2]:
# для подключения к БД требуется скачать драйвер postgresql и указать его расположение в параметре config
spark = SparkSession.builder \
    .config('spark.driver.extraClassPath', r'C:\Users\Andre\sql_ex\postgresql-42.3.4.jar') \
    .getOrCreate()

In [3]:
url = 'jdbc:postgresql://localhost:5432/sql_ex_data'
properties = {'user':'user', 'password': 'password'}

1. Найдите номер модели, скорость и размер жесткого диска для всех ПК стоимостью менее 500 дол. Вывести: model, speed и hd

In [37]:
df_pc = spark.read.jdbc(url=url, table='PC', properties=properties)

df_pc.select(f.col('model'), f.col('speed'), f.col('hd')).filter(f.col('price') < 500).show()

+-----+-----+----+
|model|speed|  hd|
+-----+-----+----+
| 1232|  500|10.0|
| 1232|  450| 8.0|
| 1232|  450|10.0|
| 1260|  500|10.0|
+-----+-----+----+



2. Найдите производителей принтеров. Вывести: maker

In [5]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)

df_product.select(f.col('maker')).filter(f.col('type') == 'Printer').distinct().show()

+-----+
|maker|
+-----+
|    E|
|    D|
|    A|
+-----+



3. Найдите номер модели, объем памяти и размеры экранов ПК-блокнотов, цена которых превышает 1000 дол.

In [6]:
df_laptop = spark.read.jdbc(url=url, table='laptop', properties=properties)

df_laptop.select(f.col('model'), f.col('ram'), f.col('screen')).filter(f.col('price') > 1000).show()

+-----+---+------+
|model|ram|screen|
+-----+---+------+
| 1750|128|    14|
| 1298| 64|    15|
| 1752|128|    14|
+-----+---+------+



4. Найдите все записи таблицы Printer для цветных принтеров.

In [7]:
df_printer = spark.read.jdbc(url=url, table='printer', properties=properties)

df_printer.filter(f.col('color') == 'y').show()

+----+-----+-----+----+------+
|code|model|color|type| price|
+----+-----+-----+----+------+
|   2| 1433|    y| Jet|270.00|
|   3| 1434|    y| Jet|290.00|
+----+-----+-----+----+------+



5. Найдите номер модели, скорость и размер жесткого диска ПК, имеющих 12x или 24x CD и цену менее 600 дол.

In [8]:
df_pc = spark.read.jdbc(url=url, table='PC', properties=properties)

df_pc.select(f.col('model'), f.col('speed'), f.col('hd')).filter((f.col('price') < 600) & ((f.col('cd') == '12x') | (f.col('cd') == '24x'))).show()

+-----+-----+----+
|model|speed|  hd|
+-----+-----+----+
| 1232|  500|10.0|
| 1232|  450| 8.0|
| 1232|  450|10.0|
| 1260|  500|10.0|
+-----+-----+----+



6. Для каждого производителя, выпускающего ПК-блокноты c объёмом жесткого диска не менее 10 Гбайт, найти скорости таких ПК-блокнотов. Вывод: производитель, скорость.

In [17]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)
df_laptop = spark.read.jdbc(url=url, table='laptop', properties=properties)

df_product.join(df_laptop, df_product['model'] == df_laptop['model'], how='inner').select(f.col('maker'), f.col('speed')).filter(f.col('hd') >= 10).distinct().show()

+-----+-----+
|maker|speed|
+-----+-----+
|    B|  750|
|    A|  600|
|    A|  750|
|    A|  450|
+-----+-----+



7. Найдите номера моделей и цены всех имеющихся в продаже продуктов (любого типа) производителя B (латинская буква).

In [46]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)
df_pc = spark.read.jdbc(url=url, table='PC', properties=properties)
df_laptop = spark.read.jdbc(url=url, table='laptop', properties=properties)
df_printer = spark.read.jdbc(url=url, table='printer', properties=properties)

df_product_pc = df_product.join(df_pc, df_product['model'] == df_pc['model'], how='inner'). \
                select(df_product['model'], f.col('price')).filter(f.col('maker') == 'B')

df_product_laptop = df_product.join(df_laptop, df_product['model'] == df_laptop['model'], how='inner'). \
                    select(df_product['model'], f.col('price')).filter(f.col('maker') == 'B')

df_product_printer = df_product.join(df_printer, df_product['model'] == df_printer['model'], how='inner'). \
                    select(df_product['model'], f.col('price')).filter(f.col('maker') == 'B')

#делаем объединение сразу по трем временным фреймам
df = reduce(DataFrame.union, [df_product_pc, df_product_laptop, df_product_laptop])
df.distinct().show()

+-----+-------+
|model|  price|
+-----+-------+
| 1750|1200.00|
| 1121| 850.00|
+-----+-------+



8. Найдите производителя, выпускающего ПК, но не ПК-блокноты.

In [53]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)

df_product_pc = df_product.select(f.col('maker')).filter(f.col('type') == 'PC')
df_product_laptop = df_product.select(f.col('maker')).filter(f.col('type') == 'Laptop')

df_product_pc.exceptAll(df_product_laptop).distinct().show()

+-----+
|maker|
+-----+
|    E|
+-----+



9. Найдите производителей ПК с процессором не менее 450 Мгц. Вывести: Maker

In [57]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)
df_pc = spark.read.jdbc(url=url, table='PC', properties=properties)

df_product.join(df_pc, df_product['model'] == df_pc['model'], how='inner').select(f.col('maker')).filter(f.col('speed') >= 450).distinct().show()

+-----+
|maker|
+-----+
|    E|
|    B|
|    A|
+-----+



10. Найдите модели принтеров, имеющих самую высокую цену. Вывести: model, price

In [70]:
df_printer = spark.read.jdbc(url=url, table='printer', properties=properties)

max_price = df_printer.agg(f.max(df_printer['price'])).collect()

df_printer.select(f.col('model'), f.col('price')).filter(f.col('price') == max_price[0][0]).show()

+-----+------+
|model| price|
+-----+------+
| 1276|400.00|
| 1288|400.00|
+-----+------+



11. Найдите среднюю скорость ПК.

In [85]:
df_pc = spark.read.jdbc(url=url, table='PC', properties=properties)

df_pc.agg(f.avg(df_pc['speed'])).select(f.round(f.col('avg(speed)'), 0).alias('avg_speed')).show()

+---------+
|avg_speed|
+---------+
|    608.0|
+---------+



12. Найдите среднюю скорость ПК-блокнотов, цена которых превышает 1000 дол.

In [88]:
df_laptop = spark.read.jdbc(url=url, table='laptop', properties=properties)

df_laptop.filter(f.col('price') > 1000).agg(f.avg(df_laptop['speed']).alias('avg_speed')).show()

+---------+
|avg_speed|
+---------+
|    700.0|
+---------+



13. Найдите среднюю скорость ПК, выпущенных производителем A.

In [91]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)
df_pc = spark.read.jdbc(url=url, table='PC', properties=properties)

df_product_pc = df_product.join(df_pc, df_product['model'] == df_pc['model'], how='inner').filter(f.col('maker') == 'A')

df_product_pc.agg(f.avg(f.col('speed'))).select(f.round(f.col('avg(speed)'), 0).alias('avg_speed')).show()

+---------+
|avg_speed|
+---------+
|    606.0|
+---------+



14. Найдите класс, имя и страну для кораблей из таблицы Ships, имеющих не менее 10 орудий.

In [94]:
df_classes = spark.read.jdbc(url=url, table='classes', properties=properties)
df_ships = spark.read.jdbc(url=url, table='ships', properties=properties)

df_classes.join(df_ships, df_classes['class'] == df_ships['class'], how='inner').select(df_classes['class'], f.col('name'), f.col('country')). \
            filter(f.col('numGuns') > 10).show()

+--------------+--------------+-------+
|         class|          name|country|
+--------------+--------------+-------+
|North Carolina|North Carolina|    USA|
|North Carolina|    Washington|    USA|
|North Carolina|  South Dakota|    USA|
|     Tennessee|    California|    USA|
|     Tennessee|     Tennessee|    USA|
+--------------+--------------+-------+



15. Найдите размеры жестких дисков, совпадающих у двух и более PC. Вывести: HD

In [102]:
df_pc = spark.read.jdbc(url=url, table='PC', properties=properties)

df_pc.groupBy(f.col('hd')).agg(f.count(f.col('hd')).alias('count_hd')).select(f.col('hd')).filter(f.col('count_hd') >= 2).orderBy(['hd']).show()

+----+
|  hd|
+----+
| 5.0|
| 8.0|
|10.0|
|14.0|
|20.0|
+----+



16. Найдите пары моделей PC, имеющих одинаковые скорость и RAM. В результате каждая пара указывается только один раз, т.е. (i,j), но не (j,i), Порядок вывода: модель с большим номером, модель с меньшим номером, скорость и RAM.

In [112]:
df_pc = spark.read.jdbc(url=url, table='PC', properties=properties)
df_pc2 = df_pc.alias('df_pc2')
df_pc2 = df_pc2.select(*(f.col(x).alias(x + '2') for x in df_pc2.columns))

df_pc.join(df_pc2, (df_pc['speed'] == df_pc2['speed2']) & (df_pc['ram'] == df_pc2['ram2']), how='full'). \
        select(df_pc['model'], df_pc2['model2'], df_pc['speed'], df_pc['ram']).filter(df_pc['model'] > df_pc2['model2']).show()

+-----+------+-----+---+
|model|model2|speed|ram|
+-----+------+-----+---+
| 1233|  1232|  500| 64|
| 1233|  1121|  750|128|
| 1260|  1232|  500| 32|
+-----+------+-----+---+



17. Найдите модели ПК-блокнотов, скорость которых меньше скорости каждого из ПК. Вывести: type, model, speed

In [117]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)
df_pc = spark.read.jdbc(url=url, table='PC', properties=properties)
df_laptop = spark.read.jdbc(url=url, table='laptop', properties=properties)

min_speed = df_pc.agg(f.min(df_pc['speed'])).collect()[0][0]

df_product.join(df_laptop, df_product['model'] == df_laptop['model'], how='inner').select(df_product['type'], df_laptop['model'], df_laptop['speed']). \
        filter(df_laptop['speed'] < min_speed).show()

+------+-----+-----+
|  type|model|speed|
+------+-----+-----+
|Laptop| 1298|  350|
+------+-----+-----+



18. Найдите производителей самых дешевых цветных принтеров. Вывести: maker, price

In [126]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)
df_printer = spark.read.jdbc(url=url, table='printer', properties=properties)

min_price = df_printer.filter(f.col('color') == 'y').agg(f.min(f.col('price'))).collect()[0][0]

df_product.join(df_printer, df_product['model'] == df_printer['model'], how='inner').select(f.col('maker'), f.col('price')). \
        filter((df_printer['color'] == 'y') & (df_printer['price'] == min_price)).show()

+-----+------+
|maker| price|
+-----+------+
|    D|270.00|
+-----+------+



19. Для каждого производителя, имеющего модели в таблице Laptop, найдите средний размер экрана выпускаемых им ПК-блокнотов. Вывести: maker, средний размер экрана.

In [127]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)
df_laptop = spark.read.jdbc(url=url, table='laptop', properties=properties)

df_product.join(df_laptop, df_product['model'] == df_laptop['model'], how='inner').groupby(f.col('maker')).agg(f.avg(f.col('screen')).alias('avg_screen')). \
        select(f.col('maker'), f.col('avg_screen')).show()

+-----+----------+
|maker|avg_screen|
+-----+----------+
|    B|      14.0|
|    C|      12.0|
|    A|      13.0|
+-----+----------+



20. Найдите производителей, выпускающих по меньшей мере три различных модели ПК. Вывести: Maker, число моделей ПК.

In [131]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)

df_product.groupBy(['maker', 'type']).agg(f.count(f.col('model')).alias('count_model')).filter((f.col('count_model') > 2) & (f.col('type') == 'PC')). \
        select(['type', 'count_model']).show()

+----+-----------+
|type|count_model|
+----+-----------+
|  PC|          3|
+----+-----------+



21. Найдите максимальную цену ПК, выпускаемых каждым производителем, у которого есть модели в таблице PC. Вывести: maker, максимальная цена.

In [134]:
df_product = spark.read.jdbc(url=url, table='product', properties=properties)
df_pc = spark.read.jdbc(url=url, table='pc', properties=properties)

df_product.join(df_pc, df_product['model'] == df_pc['model'], how='inner').groupby(f.col('maker')).agg(f.max(f.col('price')).alias('max_price')). \
        select(['maker', 'max_price']).show()

+-----+---------+
|maker|max_price|
+-----+---------+
|    E|   350.00|
|    B|   850.00|
|    A|   980.00|
+-----+---------+



22. Для каждого значения скорости ПК, превышающего 600 МГц, определите среднюю цену ПК с такой же скоростью. Вывести: speed, средняя цена.