In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [0]:
actor = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/actor.csv")
store = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/store.csv")
language = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/language.csv")
staff = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/staff.csv")
inventory = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/inventory.csv")
film_category = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/film_category.csv")
film_actor = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/film_actor.csv")
film = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/film.csv")
customer = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/customer.csv")
country = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/country.csv")
city = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/city.csv")
category = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/category.csv")
rental = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/rental.csv")
address = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/address.csv")

In [0]:
#Вывести количество фильмов в каждой категории, отсортировать по убыванию.
def filmsByCategory():
    result = film.join(film_category, "film_id", "inner").drop(film_category.film_id)
    result = result.join(category, "category_id", "inner").drop(film_category.category_id).withColumnRenamed("name", "category_name")
    result = result.groupBy("category_name").count().orderBy(F.col("count").desc()).withColumnRenamed("count", "film_count")
    return result

result = filmsByCategory()
result.show(5)

+-------------+----------+
|category_name|film_count|
+-------------+----------+
|       Sports|        74|
|      Foreign|        73|
|       Family|        69|
|  Documentary|        68|
|    Animation|        66|
+-------------+----------+
only showing top 5 rows



In [0]:
#Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.
def top10actors():
    result = film.join(film_actor, "film_id", "inner").drop(film_actor.film_id)
    result = result.groupBy("actor_id").agg(F.sum(F.col("rental_duration").cast("integer")).alias("total_hours"))
    result = result.join(actor, "actor_id", "inner").drop(actor.actor_id)
    return result.select("first_name", "last_name", "total_hours").orderBy(F.col("total_hours").desc())

result = top10actors()
result.show(20)

+----------+-----------+-----------+
|first_name|  last_name|total_hours|
+----------+-----------+-----------+
|      GINA|  DEGENERES|        209|
|    WALTER|       TORN|        201|
|      MARY|     KEITEL|        192|
|   MATTHEW|     CARREY|        190|
|   GROUCHO|      DUNST|        183|
|    ANGELA|     HUDSON|        183|
|    SANDRA|     KILMER|        181|
|     HENRY|      BERRY|        180|
|       UMA|       WOOD|        179|
|    WARREN|    JACKMAN|        178|
|    ANGELA|WITHERSPOON|        174|
|   NATALIE|    HOPKINS|        174|
|       VAL|     BOLGER|        173|
|    SIDNEY|      CROWE|        172|
|     JULIA|    MCQUEEN|        172|
|    VIVIEN|   BASINGER|        172|
|      MARY|      TANDY|        172|
|   RUSSELL|     TEMPLE|        171|
|      SEAN|    GUINESS|        171|
|     HELEN|     VOIGHT|        169|
+----------+-----------+-----------+
only showing top 20 rows



In [0]:
#Вывести названия фильмов, которых нет в inventory.
def notInInvenotry():
    result = film.join(inventory, "film_id", "anti")
    return result.select("title")
result = notInInvenotry()
result.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



In [0]:
#Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех.
def top3actors():
    result = film.join(film_actor, "film_id", "inner").drop(film_actor.film_id) 
    result = result.join(film_category, "film_id", "inner").drop(film_category.film_id)
    result = result.join(actor, "actor_id", "inner").drop(actor.actor_id)
    result = result.join(category, "category_id", "inner").drop(category.category_id).withColumnRenamed("name", "category_name").filter(F.col("category_name") == "Children")
    
    windowSpec = Window.orderBy("actor_id")
    result = result.withColumn("rank", F.rank().over(windowSpec))
    
    result = result.filter(result["rank"] <= 3).orderBy("rank")
    return result.select("first_name", "last_name", "rank", "category_name")

result = top3actors()
result.show()

+----------+---------+----+-------------+
|first_name|last_name|rank|category_name|
+----------+---------+----+-------------+
|  PENELOPE|  GUINESS|   1|     Children|
| CHRISTIAN|    GABLE|   2|     Children|
|     SUSAN|    DAVIS|   3|     Children|
|     SUSAN|    DAVIS|   3|     Children|
|     SUSAN|    DAVIS|   3|     Children|
|     SUSAN|    DAVIS|   3|     Children|
+----------+---------+----+-------------+



In [0]:
#Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию.
def activeClients():
    result = customer.join(address, customer.address_id == address.address_id)
    result = result.join(city, address.city_id == city.city_id)
    result = result.groupBy("city").agg(F.count(F.when(F.col("active") == 1, 1)).alias("active_us"),
             F.count(F.when(F.col("active") == 0, 1)).alias("inactive_us"))
    result = result.orderBy(F.col("inactive_us").desc())
    return result

result = activeClients()
result.show()

+------------------+---------+-----------+
|              city|active_us|inactive_us|
+------------------+---------+-----------+
|          Uluberia|        0|          1|
|         Najafabad|        0|          1|
|         Pingxiang|        0|          1|
|          Xiangfan|        0|          1|
|        Kumbakonam|        0|          1|
|       Szkesfehrvr|        0|          1|
|  Charlotte Amalie|        0|          1|
|            Kamyin|        0|          1|
|            Daxian|        0|          1|
|     Coatzacoalcos|        0|          1|
|           Wroclaw|        0|          1|
|            Ktahya|        0|          1|
|   Southend-on-Sea|        0|          1|
|           Bat Yam|        0|          1|
|            Amroha|        0|          1|
|A Corua (La Corua)|        1|          0|
|          Fengshan|        1|          0|
|          Myingyan|        1|          0|
|          Chisinau|        1|          0|
|              Linz|        1|          0|
+----------

In [0]:
#Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. 
#Тоже самое сделать для городов в которых есть символ “-”.
def cityByHours():
    query1 = (
        category
        .join(film_category, category.category_id == film_category.category_id)
        .join(film, film.film_id == film_category.film_id)
        .join(inventory, film.film_id == inventory.film_id)
        .join(rental, inventory.inventory_id == rental.inventory_id)
        .join(customer, rental.customer_id == customer.customer_id)
        .join(address, customer.address_id == address.address_id)
        .join(city, address.city_id == city.city_id)
        .where(F.col("city").like("A%"))
        .groupBy("city")
        .agg(F.sum(F.col("rental_duration").cast("integer")).alias("total_hours"))
        .select("city", "total_hours")
    )
    query2 = (
        category
        .join(film_category, category.category_id == film_category.category_id)
        .join(film, film.film_id == film_category.film_id)
        .join(inventory, film.film_id == inventory.film_id)
        .join(rental, inventory.inventory_id == rental.inventory_id)
        .join(customer, rental.customer_id == customer.customer_id)
        .join(address, customer.address_id == address.address_id)
        .join(city, address.city_id == city.city_id)
        .where(F.col("city").like("%-%"))
        .groupBy("city")
        .agg(F.sum(F.col("rental_duration").cast("integer")).alias("total_hours"))
        .select("city", "total_hours")
    )
    result = query1.union(query2).orderBy(F.col("total_hours").desc())
    return result 

result = cityByHours()
result.show()

+-------------------+-----------+
|               city|total_hours|
+-------------------+-----------+
|             Aurora|        253|
|        Saint-Denis|        223|
|          Apeldoorn|        189|
|         Ahmadnagar|        169|
|            Arecibo|        166|
|  Usolje-Sibirskoje|        165|
|          Abu Dhabi|        164|
|            Atlixco|        162|
|               Aden|        161|
|   Shubra al-Khayma|        159|
|         Avellaneda|        157|
|    Jastrzebie-Zdrj|        157|
|              Akron|        154|
|          Lapu-Lapu|        153|
|             Ashdod|        153|
|Kamjanets-Podilskyi|        152|
|     Tel Aviv-Jaffa|        147|
| A Corua (La Corua)|        145|
|     Angra dos Reis|        145|
|             Atinsk|        145|
+-------------------+-----------+
only showing top 20 rows

