In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
pg_url = "jdbc:postgresql://192.168.1.39:5432/pagila"
pg_properties = {"user":"pguser", "password":"secret", "driver":"org.postgresql.Driver"}

In [3]:
spark = SparkSession.builder\
    .config('spark.driver.extraClassPath'
            , '/home/user/shared_folder/postgresql-42.2.20.jar')\
    .master('local')\
    .appName("lesson")\
    .getOrCreate()

In [4]:
# вывести количество фильмов в каждой категории, отсортировать по убыванию.

In [5]:
film_df = spark.read.jdbc(pg_url,'public.film',properties=pg_properties)

In [6]:
category_df = spark.read.jdbc(pg_url,'public.category',properties=pg_properties)

In [7]:
film_category_df = spark.read.jdbc(pg_url,'public.film_category',properties=pg_properties)

In [8]:
film_df.join(film_category_df,(film_df.film_id == film_category_df.film_id))\
        .join(category_df,(film_category_df.category_id == category_df.category_id))\
        .groupBy('name')\
        .count()\
        .sort(F.desc('count'))\
        .show(10)   

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|     Sci-Fi|   61|
|      Games|   61|
+-----------+-----+
only showing top 10 rows



In [9]:
# вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.

In [10]:
actor_df = spark.read.jdbc(pg_url,'public.actor',properties=pg_properties)

In [11]:
film_actor_df = spark.read.jdbc(pg_url,'public.film_actor',properties=pg_properties)

In [12]:
inventory_df = spark.read.jdbc(pg_url,'public.inventory',properties=pg_properties)

In [13]:
rental_df = spark.read.jdbc(pg_url,'public.rental',properties=pg_properties)

In [14]:
actor_df.join(film_actor_df,(actor_df.actor_id == film_actor_df.actor_id)).drop(film_actor_df.actor_id)\
        .join(inventory_df,(film_actor_df.film_id == inventory_df.film_id))\
        .join(rental_df, (inventory_df.inventory_id == rental_df.inventory_id))\
        .groupBy('actor_id','first_name','last_name')\
        .agg({'rental_id':'count'})\
        .sort(F.desc('count(rental_id)'))\
        .show(10)

+--------+----------+-----------+----------------+
|actor_id|first_name|  last_name|count(rental_id)|
+--------+----------+-----------+----------------+
|     107|      GINA|  DEGENERES|             753|
|     181|   MATTHEW|     CARREY|             678|
|     198|      MARY|     KEITEL|             674|
|     144|    ANGELA|WITHERSPOON|             654|
|     102|    WALTER|       TORN|             640|
|      60|     HENRY|      BERRY|             612|
|     150|     JAYNE|      NOLTE|             611|
|      37|       VAL|     BOLGER|             605|
|      23|    SANDRA|     KILMER|             604|
|      90|      SEAN|    GUINESS|             599|
+--------+----------+-----------+----------------+
only showing top 10 rows



In [15]:
#вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.

In [16]:
payment_df = spark.read.jdbc(pg_url,'public.payment',properties=pg_properties)

In [17]:
category_df.join(film_category_df,(category_df.category_id == film_category_df.category_id)).drop(film_category_df.category_id)\
        .join(inventory_df,(film_category_df.film_id == inventory_df.film_id))\
        .join(rental_df, (inventory_df.inventory_id == rental_df.inventory_id))\
        .join(payment_df, (payment_df.rental_id == rental_df.rental_id))\
        .groupBy('category_id','name')\
        .agg({'amount':'sum'})\
        .sort(F.desc('sum(amount)'))\
        .show(1)

+-----------+------+-----------+
|category_id|  name|sum(amount)|
+-----------+------+-----------+
|         15|Sports|    5314.21|
+-----------+------+-----------+
only showing top 1 row



In [18]:
#вывести категорию фильмов, на которую потратили больше всего денег.

In [19]:
film_df.join(inventory_df,(film_df.film_id == inventory_df.film_id),'left_outer').drop(film_df.film_id)\
        .filter('film_id is null')\
        .select('title')\
        .show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



In [20]:
#вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех..

In [21]:
ch_df = actor_df.join(film_actor_df,(actor_df.actor_id == film_actor_df.actor_id)).drop(film_actor_df.actor_id)\
        .join(film_category_df,(film_actor_df.film_id == film_category_df.film_id))\
        .join(category_df,(category_df.category_id == film_category_df.category_id))\
        .filter('name = "Children"')\
        .groupBy('actor_id', 'first_name', 'last_name')\
        .count()

In [22]:
cnt_df = ch_df.select('count')\
        .distinct()\
        .select(F.collect_list('count')).first()[0]

cnt = sorted(cnt_df)[-3:]


In [23]:
ch_df.filter(F.col('count').isin(cnt))\
    .sort(F.desc('count'))\
    .show()

+--------+----------+---------+-----+
|actor_id|first_name|last_name|count|
+--------+----------+---------+-----+
|      17|     HELEN|   VOIGHT|    7|
|     140|    WHOOPI|     HURT|    5|
|      80|     RALPH|     CRUZ|    5|
|      66|      MARY|    TANDY|    5|
|     127|     KEVIN|  GARLAND|    5|
|      81|  SCARLETT|    DAMON|    4|
|      23|    SANDRA|   KILMER|    4|
|     109| SYLVESTER|     DERN|    4|
|      92|   KIRSTEN|   AKROYD|    4|
|     101|     SUSAN|    DAVIS|    4|
|     150|     JAYNE|    NOLTE|    4|
|     187|     RENEE|     BALL|    4|
|     142|      JADA|    RYDER|    4|
|     173|      ALAN| DREYFUSS|    4|
|      13|       UMA|     WOOD|    4|
|     131|      JANE|  JACKMAN|    4|
|      58| CHRISTIAN|   AKROYD|    4|
|      93|     ELLEN|  PRESLEY|    4|
|      37|       VAL|   BOLGER|    4|
+--------+----------+---------+-----+



In [24]:
#вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию.

In [25]:
city_df = spark.read.jdbc(pg_url,'public.city',properties=pg_properties)

In [26]:
customer_df = spark.read.jdbc(pg_url,'public.customer',properties=pg_properties)

In [27]:
address_df = spark.read.jdbc(pg_url,'public.address',properties=pg_properties)

In [28]:
city_df.join(address_df,(city_df.city_id == address_df.city_id)).drop(address_df.city_id)\
        .join(customer_df, (customer_df.address_id == address_df.address_id))\
        .select('city','active')\
        .groupBy('city')\
        .agg(
            F.count(F.when(F.col("active") == 1, True)).alias('NN'),
            F.count(F.when(F.col("active") == 0, True)).alias('N')
            )\
        .sort(F.desc('N'))\
        .show()

+----------------+---+---+
|            city| NN|  N|
+----------------+---+---+
|          Ktahya|  0|  1|
|Charlotte Amalie|  0|  1|
|         Wroclaw|  0|  1|
|       Pingxiang|  0|  1|
|     Szkesfehrvr|  0|  1|
|          Daxian|  0|  1|
|   Coatzacoalcos|  0|  1|
|         Bat Yam|  0|  1|
| Southend-on-Sea|  0|  1|
|        Uluberia|  0|  1|
|       Najafabad|  0|  1|
|        Xiangfan|  0|  1|
|      Kumbakonam|  0|  1|
|          Kamyin|  0|  1|
|          Amroha|  0|  1|
|        Chisinau|  1|  0|
|         Esfahan|  1|  0|
|       Mit Ghamr|  1|  0|
|         Udaipur|  1|  0|
|  Dhule (Dhulia)|  1|  0|
+----------------+---+---+
only showing top 20 rows



In [29]:
#вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. То же самое сделать для городов в которых есть символ “-”.

In [30]:
category_df.join(film_category_df,(category_df.category_id == film_category_df.category_id)).drop(film_category_df.category_id)\
        .join(inventory_df,(film_category_df.film_id == inventory_df.film_id))\
        .join(rental_df, (inventory_df.inventory_id == rental_df.inventory_id))\
        .join(customer_df, (customer_df.customer_id == rental_df.customer_id))\
        .join(address_df, (address_df.address_id == customer_df.address_id))\
        .join(city_df, (address_df.city_id == city_df.city_id))\
        .filter('city like "a%"')\
        .withColumn('rent', F.expr('extract(hour from return_date-rental_date)'))\
        .groupBy('category_id','name')\
        .sum('rent')\
        .sort(F.desc('sum(rent)'))\
        .show(1)

+-----------+------+---------+
|category_id|  name|sum(rent)|
+-----------+------+---------+
|         14|Sci-Fi|      243|
+-----------+------+---------+
only showing top 1 row



In [31]:
category_df.join(film_category_df,(category_df.category_id == film_category_df.category_id)).drop(film_category_df.category_id)\
        .join(inventory_df,(film_category_df.film_id == inventory_df.film_id))\
        .join(rental_df, (inventory_df.inventory_id == rental_df.inventory_id))\
        .join(customer_df, (customer_df.customer_id == rental_df.customer_id))\
        .join(address_df, (address_df.address_id == customer_df.address_id))\
        .join(city_df, (address_df.city_id == city_df.city_id))\
        .filter('city like "%-%"')\
        .withColumn('rent', F.expr('extract(hour from return_date-rental_date)'))\
        .groupBy('category_id','name')\
        .sum('rent')\
        .sort(F.desc('sum(rent)'))\
        .show(1)

+-----------+------+---------+
|category_id|  name|sum(rent)|
+-----------+------+---------+
|         14|Sci-Fi|      730|
+-----------+------+---------+
only showing top 1 row

