In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F

In [2]:
pg_url = 'jdbc:postgresql://xxx.xxx.xxx.xxxx:5432/postgres'
pg_properties = {'user': 'xxx', 'password': 'xxx'}

In [3]:
spark = SparkSession.builder\
        .config('spark.driver.extraClassPath', '/home/user/shared_folder/postgresql-42.2.20.jar')\
        .master('local')\
        .appName('lesson')\
        .getOrCreate()

In [4]:
film_df = spark.read.jdbc(pg_url, 'film', properties=pg_properties)
film_list_df = spark.read.jdbc(pg_url, 'film_list', properties=pg_properties)
film_category_df = spark.read.jdbc(pg_url, 'film_category', properties=pg_properties)
category_df = spark.read.jdbc(pg_url, 'category', properties=pg_properties)

actor_df = spark.read.jdbc(pg_url, 'actor', properties=pg_properties)
film_actor_df = spark.read.jdbc(pg_url, 'film_actor', properties=pg_properties)

inventory_df = spark.read.jdbc(pg_url, 'inventory', properties=pg_properties)
rental_df = spark.read.jdbc(pg_url, 'rental', properties=pg_properties)
payment_df = spark.read.jdbc(pg_url, 'payment', properties=pg_properties)
customer_df = spark.read.jdbc(pg_url, 'customer', properties=pg_properties)

address_df = spark.read.jdbc(pg_url, 'address', properties=pg_properties)
city_df = spark.read.jdbc(pg_url, 'city', properties=pg_properties)
country_df = spark.read.jdbc(pg_url, 'country', properties=pg_properties)

In [5]:
# Task 1
film_list_df.groupBy(F.col('category'))\
            .count()\
            .select('category', F.col('count').alias('category_count'))\
            .orderBy(F.col('category_count').desc())\
            .show()

+-----------+--------------+
|   category|category_count|
+-----------+--------------+
|    Foreign|            73|
|     Sports|            73|
|     Family|            69|
|Documentary|            68|
|  Animation|            66|
|     Action|            64|
|        New|            63|
|      Drama|            61|
|      Games|            61|
|     Sci-Fi|            61|
|   Children|            60|
|     Comedy|            58|
|   Classics|            57|
|     Travel|            56|
|     Horror|            56|
|      Music|            51|
+-----------+--------------+



In [6]:
#Task 2
actor_df.join(film_actor_df, actor_df.actor_id == film_actor_df.actor_id)\
        .join(film_df, film_actor_df.film_id == film_df.film_id)\
        .join(inventory_df, film_df.film_id == inventory_df.film_id)\
        .join(rental_df, inventory_df.inventory_id == rental_df.inventory_id)\
        .groupBy(F.concat(F.col('first_name'), F.lit(' '), F.col('last_name')).alias("top_rented_actor"))\
        .agg(F.count(rental_df.rental_id).alias("total_rental_amount"))\
        .orderBy(F.col('total_rental_amount').desc())\
        .limit(10)\
        .show()

+------------------+-------------------+
|  top_rented_actor|total_rental_amount|
+------------------+-------------------+
|       SUSAN DAVIS|                825|
|    GINA DEGENERES|                753|
|    MATTHEW CARREY|                678|
|       MARY KEITEL|                674|
|ANGELA WITHERSPOON|                654|
|       WALTER TORN|                640|
|       HENRY BERRY|                612|
|       JAYNE NOLTE|                611|
|        VAL BOLGER|                605|
|     SANDRA KILMER|                604|
+------------------+-------------------+



In [7]:
#Task 3
payment_df.join(rental_df, payment_df.rental_id == rental_df.rental_id)\
        .join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id)\
        .join(film_df, inventory_df.film_id == film_df.film_id)\
        .join(film_category_df, film_df.film_id == film_category_df.film_id)\
        .join(category_df, film_category_df.category_id == category_df.category_id)\
        .groupBy(category_df.name)\
        .agg(F.sum(payment_df.amount).alias('total_sales'))\
        .orderBy(F.col('total_sales').desc())\
        .limit(1)\
        .show()

+------+-----------+
|  name|total_sales|
+------+-----------+
|Sports|   26571.05|
+------+-----------+



In [8]:
#Task 4
film_df.join(inventory_df, film_df.film_id == inventory_df.film_id, 'left')\
        .select(film_df.title)\
        .filter(inventory_df.film_id.isNull())\
        .orderBy(film_df.title)\
        .show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



In [9]:
# Task 5
cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0))

actor_df.join(film_actor_df, actor_df.actor_id == film_actor_df.actor_id)\
        .join(film_df, film_actor_df.film_id == film_df.film_id)\
        .join(film_category_df, film_df.film_id == film_category_df.film_id)\
        .join(category_df, film_category_df.category_id == category_df.category_id)\
        .groupBy(F.concat(F.col('first_name'), F.lit(' '), F.col('last_name')).alias("actor_name"))\
        .agg(cnt_cond(category_df.name == 'Children').alias('total_children_films'))\
        .withColumn("denserank",F.dense_rank().over(Window.orderBy(F.col("total_children_films").desc())))\
        .filter(F.col('denserank') <= 3)\
        .select(F.col('actor_name'), F.col('total_children_films'))\
        .show()

+-------------+--------------------+
|   actor_name|total_children_films|
+-------------+--------------------+
| HELEN VOIGHT|                   7|
|  SUSAN DAVIS|                   6|
|   MARY TANDY|                   5|
|   RALPH CRUZ|                   5|
|  WHOOPI HURT|                   5|
|KEVIN GARLAND|                   5|
+-------------+--------------------+



In [10]:
#Task 6
# Как и в домашнем задании с SQL тут вывел не по городам, а странам для наглядности, потому что малое количество неактивных пользователей даже
# в рамках страны
cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0))

customer_df.join(address_df, customer_df.address_id == address_df.address_id)\
        .join(city_df, address_df.city_id == city_df.city_id)\
        .join(country_df, city_df.country_id == country_df.country_id)\
        .groupBy(F.col('country'))\
        .agg(cnt_cond(F.col('active') == 1).alias('total_active'),
             cnt_cond(F.col('active') == 0).alias('total_nonactive'))\
        .orderBy(F.col('total_nonactive').desc())\
        .show()

+--------------------+------------+---------------+
|             country|total_active|total_nonactive|
+--------------------+------------+---------------+
|               China|          50|              3|
|               India|          57|              3|
|              Poland|           7|              1|
|                Iran|           7|              1|
|              Mexico|          29|              1|
|             Hungary|           0|              1|
|Virgin Islands, U.S.|           0|              1|
|  Russian Federation|          27|              1|
|              Israel|           3|              1|
|              Turkey|          14|              1|
|      United Kingdom|           8|              1|
|             Senegal|           1|              0|
|           Sri Lanka|           1|              0|
|            Paraguay|           3|              0|
|              Malawi|           1|              0|
|              Sweden|           1|              0|
|         Ph

In [11]:
unionBase = rental_df.join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id)\
                .join(film_df, inventory_df.film_id == film_df.film_id)\
                .join(film_category_df, film_df.film_id == film_category_df.film_id)\
                .join(category_df, film_category_df.category_id == category_df.category_id)\
                .join(customer_df, rental_df.customer_id == customer_df.customer_id)\
                .join(address_df, customer_df.address_id == address_df.address_id)\
                .join(city_df, address_df.city_id == city_df.city_id)\
                .withColumn('hours_of_rental',
                            F.round((F.col('return_date').cast('long') - F.col('rental_date').cast('long'))/3600, 0))

part_one = unionBase.filter(F.lower(F.substring(city_df.city, 1, 1)) == 'a')\
                    .groupBy(category_df.name)\
                    .agg(F.sum(F.col('hours_of_rental')).alias('category_hours_of_rental'))\
                    .orderBy(F.col('category_hours_of_rental').desc())\
                    .limit(1)

part_two = unionBase.filter(city_df.city.like('%-%'))\
                    .groupBy(category_df.name)\
                    .agg(F.sum(F.col('hours_of_rental')).alias('category_hours_of_rental'))\
                    .orderBy(F.col('category_hours_of_rental').desc())\
                    .limit(1)

part_one.union(part_two).show()

+-------+------------------------+
|   name|category_hours_of_rental|
+-------+------------------------+
| Sports|                 12363.0|
|Foreign|                  6475.0|
+-------+------------------------+

