In [84]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import psycopg2
from pyspark.sql.window import Window

In [2]:
pg_url = 'jdbc:postgresql://192.168.0.108:5432/postgres'
pg_properties = {'user':'pguser', 'password':'secret', "driver":"org.postgresql.Driver"}

In [3]:
spark = SparkSession.builder\
        .config('spark.driver.extraClassPath', '/home/user/shared_folder/postgresql-42.2.20.jar')\
        .master('local')\
        .appName('homework_5')\
        .getOrCreate()

In [4]:
spark.version

'3.1.1'

#### Вывести количество фильмов в каждой категории, отсортировать по убыванию.

In [8]:
category_df = spark.read.jdbc(pg_url, 'public.category', properties=pg_properties)
film_category_df = spark.read.jdbc(pg_url, 'public.film_category', properties=pg_properties)

In [46]:
category_df.join(film_category_df, category_df.category_id == film_category_df.category_id, 'left')\
            .groupBy('name')\
            .agg(F.count('film_id')\
            .alias('movies_count'))\
            .sort('movies_count', ascending=False).show()

+-----------+------------+
|       name|movies_count|
+-----------+------------+
|     Sports|          74|
|    Foreign|          73|
|     Family|          69|
|Documentary|          68|
|  Animation|          66|
|     Action|          64|
|        New|          63|
|      Drama|          62|
|     Sci-Fi|          61|
|      Games|          61|
|   Children|          60|
|     Comedy|          58|
|     Travel|          57|
|   Classics|          57|
|     Horror|          56|
|      Music|          51|
+-----------+------------+



#### Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.

In [47]:
rental_df = spark.read.jdbc(pg_url, 'public.rental', properties=pg_properties)
inventory_df = spark.read.jdbc(pg_url, 'public.inventory', properties=pg_properties)
film_actor_df = spark.read.jdbc(pg_url, 'public.film_actor', properties=pg_properties)
actor_df = spark.read.jdbc(pg_url, 'public.actor', properties=pg_properties)

In [54]:
rental_df.join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id, 'left')\
         .join(film_actor_df, inventory_df.film_id == film_actor_df.film_id, 'left')\
         .join(actor_df, film_actor_df.actor_id == actor_df.actor_id, 'left')\
         .groupBy(F.concat(F.col('first_name'), F.lit(' '), F.col('last_name')).alias('actor'))\
         .agg(F.count('rental_id')\
         .alias('rental_count'))\
         .sort('rental_count', ascending=False)\
         .limit(10).show()

+------------------+------------+
|             actor|rental_count|
+------------------+------------+
|       SUSAN DAVIS|         825|
|    GINA DEGENERES|         753|
|    MATTHEW CARREY|         678|
|       MARY KEITEL|         674|
|ANGELA WITHERSPOON|         654|
|       WALTER TORN|         640|
|       HENRY BERRY|         612|
|       JAYNE NOLTE|         611|
|        VAL BOLGER|         605|
|     SANDRA KILMER|         604|
+------------------+------------+



#### Вывести категорию фильмов, на которую потратили больше всего денег.

In [53]:
payment_df = spark.read.jdbc(pg_url, 'public.payment', properties=pg_properties)

In [55]:
payment_df.join(rental_df, payment_df.rental_id == rental_df.rental_id, 'left')\
          .join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id, 'left')\
          .join(film_category_df, inventory_df.film_id == film_category_df.film_id, 'left')\
          .join(category_df, film_category_df.category_id == category_df.category_id, 'left')\
          .groupBy('name')\
          .agg(F.sum('amount')\
          .alias('rental_sum'))\
          .sort('rental_sum', ascending=False)\
          .limit(1).show()

+------+----------+
|  name|rental_sum|
+------+----------+
|Sports|   5314.21|
+------+----------+



#### Вывести названия фильмов, которых нет в inventory

In [57]:
film_df = spark.read.jdbc(pg_url, 'public.film', properties=pg_properties)

In [66]:
film_df.join(inventory_df, film_df.film_id == inventory_df.film_id, 'left')\
        .where(F.col('inventory_id').isNull())\
        .select('title')\
        .show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



#### Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех

In [99]:
film_actor_df.join(actor_df, film_actor_df.actor_id == actor_df.actor_id, 'left')\
         .join(film_category_df, film_actor_df.film_id == film_category_df.film_id, 'left')\
         .join(category_df, film_category_df.category_id == category_df.category_id, 'left')\
         .where(F.col('name') == 'Children')\
         .groupBy(F.concat(F.col('first_name'), F.lit(' '), F.col('last_name')).alias('actor'))\
         .agg(F.count(film_actor_df.film_id)\
         .alias('count_movies'))\
         .sort('count_movies', ascending=False)\
         .select('actor', 'count_movies')\
         .withColumn('rank', F.rank().over(Window.orderBy(F.desc('count_movies'))))\
         .where(F.col('rank') <= 3).show()

+-------------+------------+----+
|        actor|count_movies|rank|
+-------------+------------+----+
| HELEN VOIGHT|           7|   1|
|  SUSAN DAVIS|           6|   2|
|   MARY TANDY|           5|   3|
|   RALPH CRUZ|           5|   3|
|  WHOOPI HURT|           5|   3|
|KEVIN GARLAND|           5|   3|
+-------------+------------+----+



#### Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию.

In [100]:
customer_df = spark.read.jdbc(pg_url, 'public.customer', properties=pg_properties)
address_df = spark.read.jdbc(pg_url, 'public.address', properties=pg_properties)
city_df = spark.read.jdbc(pg_url, 'public.city', properties=pg_properties)

In [122]:
customer_df.join(address_df, customer_df.address_id == address_df.address_id, 'left')\
           .join(city_df, address_df.city_id == city_df.city_id, 'left')\
           .withColumn('active_users', F.when(F.col('active') == 1,'Active').otherwise(None))\
           .withColumn('no_active_users', F.when(F.col('active') == 0,'No active').otherwise(None))\
           .groupBy(F.col('city'))\
           .agg(F.count('active_users').alias('active_users'), F.count('no_active_users').alias('no_active_users'))\
           .sort('no_active_users', ascending=False).show()   

+----------------+------------+---------------+
|            city|active_users|no_active_users|
+----------------+------------+---------------+
|          Ktahya|           0|              1|
|Charlotte Amalie|           0|              1|
|         Wroclaw|           0|              1|
|       Pingxiang|           0|              1|
|     Szkesfehrvr|           0|              1|
|          Daxian|           0|              1|
|   Coatzacoalcos|           0|              1|
|          Amroha|           0|              1|
| Southend-on-Sea|           0|              1|
|        Uluberia|           0|              1|
|       Najafabad|           0|              1|
|        Xiangfan|           0|              1|
|      Kumbakonam|           0|              1|
|          Kamyin|           0|              1|
|         Bat Yam|           0|              1|
|        Myingyan|           1|              0|
|          Monywa|           1|              0|
|          Jining|           1|         

#### Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. То же самое сделать для городов в которых есть символ “-”

In [145]:
rental_df.join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id, 'left')\
          .join(film_category_df, inventory_df.film_id == film_category_df.film_id, 'left')\
          .join(category_df, film_category_df.category_id == category_df.category_id, 'left')\
          .join(customer_df, rental_df.customer_id == customer_df.customer_id, 'left')\
          .join(address_df, customer_df.address_id == address_df.address_id, 'left')\
          .join(city_df, address_df.city_id == city_df.city_id, 'left')\
          .where(F.year('rental_date')=='2005')\
          .where(F.col('city').like('a%')\
                 |F.col('city').like('A%')\
                 |F.col('city').like('%-%')\
                 |F.col('city').like('-%')\
                 |F.col('city').like('%-'))\
          .groupBy('name')\
          .agg(F.sum((F.month('return_date') - F.month('rental_date'))*720\
                   + (F.dayofmonth('return_date') - F.dayofmonth('rental_date'))*24\
                   + (F.hour('return_date') - F.hour('rental_date')))\
          .alias('sum'))\
          .sort('sum', ascending=False).show()

+-----------+-----+
|       name|  sum|
+-----------+-----+
|     Sports|16198|
|     Action|14643|
|     Sci-Fi|14453|
|      Drama|13080|
|   Children|13006|
|     Family|12723|
|     Comedy|12028|
|    Foreign|11902|
|      Music|11721|
|      Games|11509|
|Documentary|11372|
|  Animation|11227|
|        New|11176|
|   Classics|10701|
|     Horror|10517|
|     Travel| 9908|
+-----------+-----+

