# Spark DataFrame Extra Task (готовий розвʼязок)

Вимоги ТЗ:
- **тільки DataFrame API** (без SQL-рядків)
- локальний Spark `master('local[3]')`
- результат кожної задачі відтворюється в ноутбуці через `.show()`

Дані очікуються в папці `data/` (можуть бути й у підпапках — ноутбук знайде файли).


In [1]:
from pyspark.sql import SparkSession, functions as F
from pathlib import Path


In [2]:
# Spark session (LOCAL)
spark = (
    SparkSession.builder
    .master("local[3]")
    .appName("spark-hw")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print("Spark:", spark.version)


26/01/19 21:01:40 WARN Utils: Your hostname, MacBook-Pro-Sergii.local resolves to a loopback address: 127.0.0.1; using 192.168.0.33 instead (on interface en0)
26/01/19 21:01:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/19 21:01:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/19 21:01:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark: 3.5.1


In [3]:
# Robust repo/data detection
cwd = Path.cwd().resolve()
repo_root = None
for p in [cwd, *cwd.parents]:
    if (p / "data").exists():
        repo_root = p
        break
if repo_root is None:
    repo_root = cwd

REPO_ROOT = repo_root
DATA_DIR = (REPO_ROOT / "data").resolve()

print("CWD:", cwd)
print("Repo:", REPO_ROOT)
print("Data dir:", DATA_DIR, "exists:", DATA_DIR.exists())


CWD: /Users/sergiibeshliaga/PycharmProjects/spark-base
Repo: /Users/sergiibeshliaga/PycharmProjects/spark-base
Data dir: /Users/sergiibeshliaga/PycharmProjects/spark-base/data exists: True


In [4]:
def locate_file(name: str, base: Path = None) -> Path | None:
    """Find file under base folder (recursive, case-insensitive)."""
    if base is None:
        base = DATA_DIR
    base = Path(base)
    if not base.exists():
        return None

    p = base / name
    if p.exists():
        return p

    lower = name.lower()
    for x in base.iterdir():
        if x.name.lower() == lower:
            return x

    hits = list(base.rglob(name))
    if hits:
        return hits[0]

    for x in base.rglob("*"):
        if x.is_file() and x.name.lower() == lower:
            return x
    return None

def read_csv(name: str):
    path = locate_file(name)
    if path is None:
        raise FileNotFoundError(f"{name} not found under {DATA_DIR}")
    return spark.read.csv(str(path), header=True, inferSchema=True)


## Load tables (CSV)


In [5]:
actor_df = read_csv('actor.csv')
address_df = read_csv('address.csv')
category_df = read_csv('category.csv')
city_df = read_csv('city.csv')
country_df = read_csv('country.csv')
customer_df = read_csv('customer.csv')
film_df = read_csv('film.csv')
film_actor_df = read_csv('film_actor.csv')
film_category_df = read_csv('film_category.csv')
inventory_df = read_csv('inventory.csv')
language_df = read_csv('language.csv')
payment_df = read_csv('payment.csv')
rental_df = read_csv('rental.csv')
staff_df = read_csv('staff.csv')
store_df = read_csv('store.csv')

print('Tables loaded')


Tables loaded


# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`).
- Використовувати SQL-рядки заборонено — **тільки DataFrame API**.


## 1. Вивести кількість фільмів в кожній категорії. Результат відсортувати за спаданням.


In [6]:
task1 = (
    film_category_df
    .join(category_df, on='category_id', how='inner')
    .groupBy('name')
    .agg(F.countDistinct('film_id').alias('films_cnt'))
    .orderBy(F.desc('films_cnt'))
)
task1.show(truncate=False)


+-----------+---------+
|name       |films_cnt|
+-----------+---------+
|Sports     |74       |
|Foreign    |73       |
|Family     |69       |
|Documentary|68       |
|Animation  |66       |
|Action     |64       |
|New        |63       |
|Drama      |62       |
|Games      |61       |
|Sci-Fi     |61       |
|Children   |60       |
|Comedy     |58       |
|Travel     |57       |
|Classics   |57       |
|Horror     |56       |
|Music      |51       |
+-----------+---------+



## 2. Вивести 10 акторів, чиї фільми брали на прокат найбільше. Результат відсортувати за спаданням.


In [7]:
task2 = (
    rental_df
    .join(inventory_df.select('inventory_id', 'film_id'), on='inventory_id', how='inner')
    .join(film_actor_df, on='film_id', how='inner')
    .join(actor_df, on='actor_id', how='inner')
    .groupBy('actor_id', 'first_name', 'last_name')
    .agg(F.count('*').alias('rentals_cnt'))
    .orderBy(F.desc('rentals_cnt'))
    .limit(10)
)
task2.show(truncate=False)


+--------+----------+-----------+-----------+
|actor_id|first_name|last_name  |rentals_cnt|
+--------+----------+-----------+-----------+
|107     |GINA      |DEGENERES  |753        |
|181     |MATTHEW   |CARREY     |678        |
|198     |MARY      |KEITEL     |674        |
|144     |ANGELA    |WITHERSPOON|654        |
|102     |WALTER    |TORN       |640        |
|60      |HENRY     |BERRY      |612        |
|150     |JAYNE     |NOLTE      |611        |
|37      |VAL       |BOLGER     |605        |
|23      |SANDRA    |KILMER     |604        |
|90      |SEAN      |GUINESS    |599        |
+--------+----------+-----------+-----------+



## 3. Вивести категорію фільмів, на яку було витрачено найбільше грошей в прокаті


In [8]:
task3 = (
    payment_df
    .join(rental_df.select('rental_id', 'inventory_id'), on='rental_id', how='inner')
    .join(inventory_df.select('inventory_id', 'film_id'), on='inventory_id', how='inner')
    .join(film_category_df, on='film_id', how='inner')
    .join(category_df, on='category_id', how='inner')
    .groupBy('name')
    .agg(F.sum('amount').alias('total_amount'))
    .orderBy(F.desc('total_amount'))
    .limit(1)
)
task3.show(truncate=False)


+------+-----------------+
|name  |total_amount     |
+------+-----------------+
|Sports|5314.209999999848|
+------+-----------------+



## 4. Вивести назви фільмів, яких не має в inventory.


In [9]:
task4 = (
    film_df
    .select('film_id', 'title')
    .join(inventory_df.select('film_id').distinct(), on='film_id', how='left_anti')
    .orderBy('title')
)
task4.show(truncate=False)


+-------+----------------------+
|film_id|title                 |
+-------+----------------------+
|14     |ALICE FANTASIA        |
|33     |APOLLO TEEN           |
|36     |ARGONAUTS TOWN        |
|38     |ARK RIDGEMONT         |
|41     |ARSENIC INDEPENDENCE  |
|87     |BOONDOCK BALLROOM     |
|108    |BUTCH PANTHER         |
|128    |CATCH AMISTAD         |
|144    |CHINATOWN GLADIATOR   |
|148    |CHOCOLATE DUCK        |
|171    |COMMANDMENTS EXPRESS  |
|192    |CROSSING DIVORCE      |
|195    |CROWDS TELEMARK       |
|198    |CRYSTAL BREAKING      |
|217    |DAZED PUNK            |
|221    |DELIVERANCE MULHOLLAND|
|318    |FIREHOUSE VIETNAM     |
|325    |FLOATS GARDEN         |
|332    |FRANKENSTEIN STRANGER |
|359    |GLADIATOR WESTWARD    |
+-------+----------------------+
only showing top 20 rows



## 5. Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”


In [10]:
children_cat = category_df.filter(F.col('name') == F.lit('Children')).select('category_id')

task5 = (
    film_category_df
    .join(children_cat, on='category_id', how='inner')
    .join(film_actor_df, on='film_id', how='inner')
    .join(actor_df, on='actor_id', how='inner')
    .groupBy('actor_id', 'first_name', 'last_name')
    .agg(F.countDistinct('film_id').alias('films_in_children'))
    .orderBy(F.desc('films_in_children'))
    .limit(3)
)
task5.show(truncate=False)


+--------+----------+---------+-----------------+
|actor_id|first_name|last_name|films_in_children|
+--------+----------+---------+-----------------+
|17      |HELEN     |VOIGHT   |7                |
|127     |KEVIN     |GARLAND  |5                |
|140     |WHOOPI    |HURT     |5                |
+--------+----------+---------+-----------------+



## Stop Spark session


In [11]:
spark.stop()
print('Spark stopped')


Spark stopped
