In [1]:
import pyspark as spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, when
from pyspark.sql.functions import max as mx
import os
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
postgresql_user = os.getenv('POSTGRES_USER')
postgresql_password = os.getenv('POSTGRES_PASSWORD')
postgresql_host = os.getenv('POSTGRES_HOST')
postgresql_port = os.getenv('POSTGRES_PORT')
jdbc_file = os.getenv('JDBC_FILE')


In [3]:
session = SparkSession.builder \
    .config('spark.jars', f"{jdbc_file}") \
    .master('local[*]') \
    .appName('pagila-queries') \
    .getOrCreate()

24/03/18 16:38:24 WARN Utils: Your hostname, yura resolves to a loopback address: 127.0.1.1; using 10.202.34.22 instead (on interface wlo1)
24/03/18 16:38:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/03/18 16:38:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
data_frame_reader = session.read \
    .format('jdbc') \
    .option('driver', 'org.postgresql.Driver') \
    .option('url', f'jdbc:postgresql://{postgresql_host}:{postgresql_port}/{postgresql_user}') \
    .option('user', postgresql_user) \
    .option('password', postgresql_password)

In [14]:

actor: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'actor').load()
address: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'address').load()
category: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'category').load()
city: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'city').load()
country: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'country').load()
customer: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'customer').load()
film: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'film').load()
film_actor: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'film_actor').load()
film_category: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'film_category').load()
inventory: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'inventory').load()
rental: spark.sql.DataFrame = data_frame_reader.option('dbtable', 'rental').load()

Вывести количество фильмов в каждой категории, отсортировать по убыванию.

In [7]:
category\
    .join(film_category, on='category_id', how='left')\
    .groupby('category_id', 'name')\
    .agg(count('film_id').alias('number_of_films'))\
    .select('name', 'number_of_films')\
    .orderBy(col('number_of_films').desc()).show()

                                                                                

+-----------+---------------+
|       name|number_of_films|
+-----------+---------------+
|     Sports|            213|
|    Foreign|            211|
|     Family|            209|
|Documentary|            204|
|     Action|            200|
|      Drama|            200|
|  Animation|            200|
|        New|            199|
|   Children|            199|
|      Games|            199|
|     Travel|            198|
|   Classics|            198|
|     Sci-Fi|            196|
|      Music|            194|
|     Comedy|            193|
|     Horror|            188|
+-----------+---------------+



Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.

In [8]:
actor\
    .join(film_actor, on='actor_id')\
    .join(film, on='film_id')\
    .groupby('actor_id', 'first_name', 'last_name')\
    .agg(sum('rental_duration').alias('sum_rental_duration'))\
    .select('actor_id', 'first_name', 'last_name', 'sum_rental_duration')\
    .orderBy(col('sum_rental_duration').desc()).show()

+--------+----------+-----------+-------------------+
|actor_id|first_name|  last_name|sum_rental_duration|
+--------+----------+-----------+-------------------+
|     107|      GINA|  DEGENERES|                209|
|     102|    WALTER|       TORN|                201|
|     198|      MARY|     KEITEL|                192|
|     181|   MATTHEW|     CARREY|                190|
|      65|    ANGELA|     HUDSON|                183|
|     106|   GROUCHO|      DUNST|                183|
|      23|    SANDRA|     KILMER|                181|
|      60|     HENRY|      BERRY|                180|
|      13|       UMA|       WOOD|                179|
|     119|    WARREN|    JACKMAN|                178|
|      50|   NATALIE|    HOPKINS|                174|
|     144|    ANGELA|WITHERSPOON|                174|
|      37|       VAL|     BOLGER|                173|
|      27|     JULIA|    MCQUEEN|                172|
|     105|    SIDNEY|      CROWE|                172|
|     158|    VIVIEN|   BASI

Вывести категорию фильмов, на которую потратили больше всего денег.

In [9]:
category\
    .join(film_category, on='category_id')\
    .join(film, on='film_id')\
    .groupby('category_id', 'name')\
    .agg(sum('replacement_cost').alias('sum_replacement_cost'))\
    .orderBy(col('sum_replacement_cost').desc())\
    .limit(1).show()

+-----------+------+--------------------+
|category_id|  name|sum_replacement_cost|
+-----------+------+--------------------+
|          8|Family|             4228.91|
+-----------+------+--------------------+



Вывести названия фильмов, которых нет в inventory.

In [10]:
film\
    .join(inventory, on='film_id', how='leftanti')\
    .select('film_id', 'title').show()

+-------+--------------------+
|film_id|               title|
+-------+--------------------+
|    148|      CHOCOLATE DUCK|
|    108|       BUTCH PANTHER|
|    950|        VOLUME HOUSE|
|    642|      ORDER BETRAYED|
|    874|        TADPOLE PARK|
|    497|    KILL BROTHERHOOD|
|    332|FRANKENSTEIN STRA...|
|    192|    CROSSING DIVORCE|
|    860|    SUICIDES SILENCE|
|    128|       CATCH AMISTAD|
|    671|     PERDITION FARGO|
|    325|       FLOATS GARDEN|
|    386|           GUMP DATE|
|    955|        WALLS ARTIST|
|    359|  GLADIATOR WESTWARD|
|    419|         HOCUS FRIDA|
|     41|ARSENIC INDEPENDENCE|
|    607|         MUPPET MILE|
|    318|   FIREHOUSE VIETNAM|
|    742|       ROOF CHAMPION|
+-------+--------------------+
only showing top 20 rows



Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех..

In [11]:
act = actor\
    .join(film_actor, on='actor_id')\
    .join(film, on='film_id')\
    .join(film_category, on='film_id')\
    .join(category, on='category_id')\
    .where(col('name') == 'Children')\
    .groupBy('actor_id', 'first_name', 'last_name')\
    .agg(count('film_id').alias('count_films'))\
    .select('actor_id', 'first_name', 'last_name', 'count_films')

max_count_films = act.agg(mx(col('count_films'))).collect()[0][0]
act.where(col('count_films') == max_count_films).show()

+--------+----------+---------+-----------+
|actor_id|first_name|last_name|count_films|
+--------+----------+---------+-----------+
|     105|    SIDNEY|    CROWE|         12|
|      66|      MARY|    TANDY|         12|
|      17|     HELEN|   VOIGHT|         12|
|     142|      JADA|    RYDER|         12|
+--------+----------+---------+-----------+



Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию.

In [12]:
city\
    .join(address, on='city_id')\
    .join(customer, on='address_id')\
    .groupBy('city_id', 'city')\
    .agg(sum(when(col('active') == 1, 1).otherwise(0)).alias('active_customer'),
         sum(when(col('active') == 0, 1).otherwise(0)).alias('unactive_customer'))\
    .orderBy(col('unactive_customer').desc()).show(truncate=False)

+-------+----------------+---------------+-----------------+
|city_id|city            |active_customer|unactive_customer|
+-------+----------------+---------------+-----------------+
|577    |Wroclaw         |0              |1                |
|578    |Xiangfan        |0              |1                |
|111    |Charlotte Amalie|0              |1                |
|259    |Kamyin          |0              |1                |
|512    |Szkesfehrvr     |0              |1                |
|139    |Daxian          |0              |1                |
|283    |Kumbakonam      |0              |1                |
|57     |Bat Yam         |0              |1                |
|554    |Uluberia        |0              |1                |
|495    |Southend-on-Sea |0              |1                |
|356    |Najafabad       |0              |1                |
|24     |Amroha          |0              |1                |
|125    |Coatzacoalcos   |0              |1                |
|281    |Ktahya         

Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. Тоже самое сделать для городов в которых есть символ “-”.

In [13]:
category\
    .join(film_category, on='category_id')\
    .join(film, on='film_id')\
    .join(inventory, on='film_id')\
    .join(rental, on='inventory_id')\
    .join(customer, on='customer_id')\
    .join(address, on='address_id')\
    .join(city, on='city_id')\
    .where((col('city').startswith('A')) & (col('city').contains('-')))\
    .groupBy('name', 'category_id', 'city', 'city_id')\
    .agg(sum((col('return_date').cast('long') - col('rental_date').cast('long')) / 3600).alias('sum_rental_in_hours'))\
    .select('category_id', 'name', 'sum_rental_in_hours')\
    .orderBy(col('sum_rental_in_hours').desc())\
    .limit(1)\
    .show(truncate=False)

+-----------+-----+-------------------+
|category_id|name |sum_rental_in_hours|
+-----------+-----+-------------------+
|10         |Games|1581.8166666666666 |
+-----------+-----+-------------------+

