In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, BooleanType

import psycopg2
from hdfs import InsecureClient

In [None]:
pg_url = "jdbc:postgresql://192.168.31.218:5432/pagila"
pg_properties = {'user':input('enter user name: '), 'password':input('enter password: ')}

In [3]:
spark = SparkSession.builder\
    .config('spark.driver.extraClassPath', '/home/user/shared_folder/postgresql-42.2.20.jar')\
    .master('local')\
    .appName('homework_6')\
    .getOrCreate()

In [4]:
#loading all tables needed for a task
category_df = spark.read.jdbc(pg_url, "public.category", properties=pg_properties)
film_category_df = spark.read.jdbc(pg_url, "public.film_category", properties=pg_properties)
actor_df = spark.read.jdbc(pg_url, "public.actor", properties=pg_properties)
film_actor_df = spark.read.jdbc(pg_url, "public.film_actor", properties=pg_properties)
inventory_df = spark.read.jdbc(pg_url, "public.inventory", properties=pg_properties)
rental_df = spark.read.jdbc(pg_url, "public.rental", properties=pg_properties)
payment_df = spark.read.jdbc(pg_url, "public.payment", properties=pg_properties)
film_df = spark.read.jdbc(pg_url, "public.film", properties=pg_properties)
customer_df = spark.read.jdbc(pg_url, "public.customer", properties=pg_properties)
address_df = spark.read.jdbc(pg_url, "public.address", properties=pg_properties)
city_df = spark.read.jdbc(pg_url, "public.city", properties=pg_properties)

## Task 1
вывести количество фильмов в каждой категории, отсортировать по убыванию

In [5]:
category_df\
    .join(film_category_df, on=['category_id'])\
    .select('name')\
    .groupBy('name')\
    .count()\
    .orderBy('count',ascending=False)\
    .show()

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|     Sci-Fi|   61|
|      Games|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



## Task 2
вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.

In [6]:
actor_df\
    .join(film_actor_df, on=['actor_id'], how='inner')\
    .join(inventory_df, on=['film_id'], how='inner')\
    .join(rental_df, on=['inventory_id'], how='inner')\
    .select('actor_id','first_name', 'last_name')\
    .groupBy('actor_id','first_name', 'last_name')\
    .count()\
    .orderBy('count', ascending=False)\
    .limit(10)\
    .show()

+--------+----------+-----------+-----+
|actor_id|first_name|  last_name|count|
+--------+----------+-----------+-----+
|     107|      GINA|  DEGENERES|  753|
|     181|   MATTHEW|     CARREY|  678|
|     198|      MARY|     KEITEL|  674|
|     144|    ANGELA|WITHERSPOON|  654|
|     102|    WALTER|       TORN|  640|
|      60|     HENRY|      BERRY|  612|
|     150|     JAYNE|      NOLTE|  611|
|      37|       VAL|     BOLGER|  605|
|      23|    SANDRA|     KILMER|  604|
|      90|      SEAN|    GUINESS|  599|
+--------+----------+-----------+-----+



## Task 3
вывести категорию фильмов, на которую потратили больше всего денег

In [7]:
payment_df\
    .join(rental_df, on=['rental_id'], how='inner')\
    .join(inventory_df, on=['inventory_id'], how='inner')\
    .join(film_category_df, on=['film_id'], how='inner')\
    .join(category_df, on=['category_id'], how='inner')\
    .select('name','amount')\
    .groupBy('name')\
    .agg(F.sum('amount').alias('total amount'))\
    .orderBy('total amount', ascending=False)\
    .limit(1)\
    .show()

+------+------------+
|  name|total amount|
+------+------------+
|Sports|     5314.21|
+------+------------+



## Task 4
вывести названия фильмов, которых нет в inventory.

In [8]:
film_df\
    .join(inventory_df, on=['film_id'], how='left')\
    .filter(F.col('inventory_id').isNull())\
    .select('title')\
    .show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



## Task 5
вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех..

In [9]:
w = Window()
film_actor_df\
        .join(film_category_df, on=['film_id'], how='inner')\
        .join(category_df, on=['category_id'], how='inner')\
        .filter(F.col('name')=='Children')\
        .select('actor_id')\
        .groupBy('actor_id')\
        .count()\
        .withColumn('rank', F.dense_rank().over(w.orderBy(F.col('count').desc())))\
        .join(actor_df, on=['actor_id'], how='inner')\
        .filter(F.col('rank')<4)\
        .select('first_name','last_name', 'count')\
        .orderBy('count', ascending=False)\
        .show()

+----------+---------+-----+
|first_name|last_name|count|
+----------+---------+-----+
|     HELEN|   VOIGHT|    7|
|     RALPH|     CRUZ|    5|
|    WHOOPI|     HURT|    5|
|      MARY|    TANDY|    5|
|     KEVIN|  GARLAND|    5|
|     SUSAN|    DAVIS|    4|
|  SCARLETT|    DAMON|    4|
|     ELLEN|  PRESLEY|    4|
|       VAL|   BOLGER|    4|
|       UMA|     WOOD|    4|
|      JADA|    RYDER|    4|
|   KIRSTEN|   AKROYD|    4|
|     JAYNE|    NOLTE|    4|
|     RENEE|     BALL|    4|
|      ALAN| DREYFUSS|    4|
|    SANDRA|   KILMER|    4|
| SYLVESTER|     DERN|    4|
| CHRISTIAN|   AKROYD|    4|
|      JANE|  JACKMAN|    4|
+----------+---------+-----+



## Task 6
вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию.

In [10]:
customer_df\
    .join(address_df, on=['address_id'], how='inner')\
    .join(city_df, on=['city_id'], how='inner')\
    .select('city','active')\
    .groupBy('city')\
    .agg(F.sum('active').alias('a_customers'), F.sum(1-F.col('active')).alias('na_customers'))\
    .orderBy('a_customers', ascending=False)\
    .show()

+---------------+-----------+------------+
|           city|a_customers|na_customers|
+---------------+-----------+------------+
|         London|          2|           0|
|         Aurora|          2|           0|
|       Fengshan|          1|           0|
|           Linz|          1|           0|
|       Chisinau|          1|           0|
|            Oyo|          1|           0|
|        Udaipur|          1|           0|
|       Myingyan|          1|           0|
|        El Alto|          1|           0|
|       Salzburg|          1|           0|
|   Juiz de Fora|          1|           0|
|    Sultanbeyli|          1|           0|
|        Esfahan|          1|           0|
|         Monywa|          1|           0|
|         Jining|          1|           0|
|      Mit Ghamr|          1|           0|
| Dhule (Dhulia)|          1|           0|
|        Tanauan|          1|           0|
|       Sogamoso|          1|           0|
|Jastrzebie-Zdrj|          1|           0|
+----------

## Task 7
вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. То же самое сделать для городов в которых есть символ “-”.

In [11]:
city_type_map_1 = F.udf(lambda x: x[0].lower()=='a', BooleanType())
city_type_map_2 = F.udf(lambda x: '-' in x, BooleanType())

main_df = category_df\
            .join(film_category_df, on=['category_id'], how='inner')\
            .join(inventory_df, on=['film_id'], how='inner')\
            .join(
                rental_df.select('inventory_id','customer_id', ((F.col('return_date').cast('long')-F.col('rental_date').cast('long'))/3600).alias('watched_hours')), 
                on=['inventory_id'], 
                how='inner')\
            .join(customer_df, on=['customer_id'], how='inner')\
            .join(address_df, on=['address_id'], how='inner')\
            .join(
                city_df.select('city_id','city',city_type_map_1(F.col('city')).alias('starts_with_a'), city_type_map_2(F.col('city')).alias('contains_hyphen')),
                on=['city_id'],
                how='inner')\
            .filter(F.col('starts_with_a') | F.col('contains_hyphen'))\
            .select('name', 'starts_with_a', 'contains_hyphen', 'watched_hours')
print('CONTAINS HYPHEN')
main_df\
        .filter(F.col('contains_hyphen')==True)\
        .select('name', 'watched_hours')\
        .groupBy('name')\
        .agg(F.sum('watched_hours').alias('watched_hours'))\
        .orderBy('watched_hours', ascending=False)\
        .limit(1)\
        .show()
print('STARTS WITH "A"')
main_df\
        .filter(F.col('starts_with_a')==True)\
        .select('name', 'watched_hours')\
        .groupBy('name')\
        .agg(F.sum('watched_hours').alias('watched_hours'))\
        .orderBy('watched_hours', ascending=False)\
        .limit(1)\
        .show()

CONTAINS HYPHEN
+-------+-------------+
|   name|watched_hours|
+-------+-------------+
|Foreign|      6472.15|
+-------+-------------+

STARTS WITH "A"
+------+------------------+
|  name|     watched_hours|
+------+------------------+
|Sports|12360.350000000002|
+------+------------------+

