In [1]:
print('Hello Anton')

Hello Anton


In [2]:
from pyspark.sql import SparkSession

import pyspark.sql.functions as f

spark = SparkSession.builder\
        .config('spark.driver.extraClassPath'
                ,'/home/user/shared_folder/postgresql-42.3.1.jar')\
        .master('local')\
        .appName('hw')\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
pg_url = "jdbc:postgresql://127.0.0.1:5432/postgres"
pg_creds = {"user": "pguser", "password": "secret"}

In [4]:
film = spark.read.jdbc(pg_url, table = 'film', properties = pg_creds)
film_category = spark.read.jdbc(pg_url, table = 'film_category', properties = pg_creds)
category = spark.read.jdbc(pg_url, table = 'category', properties = pg_creds)
rental = spark.read.jdbc(pg_url, table = 'rental', properties = pg_creds)
inventory = spark.read.jdbc(pg_url, table = 'inventory', properties = pg_creds)
film_actor = spark.read.jdbc(pg_url, table = 'film_actor', properties = pg_creds)
actor = spark.read.jdbc(pg_url, table = 'actor', properties = pg_creds)
payment = spark.read.jdbc(pg_url, table = 'payment', properties = pg_creds)
customer = spark.read.jdbc(pg_url, table = 'customer', properties = pg_creds)
address = spark.read.jdbc(pg_url, table = 'address', properties = pg_creds)
city = spark.read.jdbc(pg_url, table = 'city', properties = pg_creds)

In [5]:
film.createOrReplaceTempView('film')
film_category.createOrReplaceTempView('film_category')
category.createOrReplaceTempView('category')
rental.createOrReplaceTempView('rental')
inventory.createOrReplaceTempView('inventory')
film_actor.createOrReplaceTempView('film_actor')
actor.createOrReplaceTempView('actor')
payment.createOrReplaceTempView('payment')
customer.createOrReplaceTempView('customer')
address.createOrReplaceTempView('address')
city.createOrReplaceTempView('city')

### Task 1

In [6]:
spark.sql("""
select c.name AS cat
      ,COUNT(a.film_id) AS f_cnt
FROM film AS a
JOIN film_category AS b ON b.film_id=a.film_id
JOIN category AS c ON c.category_id=b.category_id
group BY 1
order BY 2 DESC;
""").show(100,False)



+-----------+-----+
|cat        |f_cnt|
+-----------+-----+
|Sports     |74   |
|Foreign    |73   |
|Family     |69   |
|Documentary|68   |
|Animation  |66   |
|Action     |64   |
|New        |63   |
|Drama      |62   |
|Sci-Fi     |61   |
|Games      |61   |
|Children   |60   |
|Comedy     |58   |
|Travel     |57   |
|Classics   |57   |
|Horror     |56   |
|Music      |51   |
+-----------+-----+





Task 2

In [7]:
spark.sql("""
SELECT CONCAT(e.first_name,' ',e.last_name) AS full_name
      ,count(DISTINCT a.rental_id) AS rent_cnt
FROM rental AS a
JOIN inventory AS b ON b.inventory_id=a.inventory_id
JOIN film AS c ON c.film_id=b.film_id
JOIN film_actor AS d ON d.film_id=c.film_id
JOIN actor AS e ON e.actor_id=d.actor_id
group BY 1
order by 2 DESC
LIMIT 10

""").show(100,False)



+------------------+--------+
|full_name         |rent_cnt|
+------------------+--------+
|SUSAN DAVIS       |825     |
|GINA DEGENERES    |753     |
|MATTHEW CARREY    |678     |
|MARY KEITEL       |674     |
|ANGELA WITHERSPOON|654     |
|WALTER TORN       |640     |
|HENRY BERRY       |612     |
|JAYNE NOLTE       |611     |
|VAL BOLGER        |605     |
|SANDRA KILMER     |604     |
+------------------+--------+





Task 3

In [8]:
spark.sql("""
SELECT f.name
      ,SUM(a.amount) AS revenue
FROM payment AS a
JOIN rental AS b ON b.rental_id=a.rental_id
JOIN inventory AS c ON c.inventory_id=b.inventory_id
JOIN film AS d ON d.film_id=c.film_id
JOIN film_category AS e ON e.film_id=d.film_id
JOIN category AS f ON f.category_id=e.category_id
group BY 1
order BY 2 DESC
limit 1;
""").show(100,False)



+------+-------+
|name  |revenue|
+------+-------+
|Sports|5314.21|
+------+-------+





Task 4

In [9]:
spark.sql("""
SELECT a.title
FROM film AS a
left JOIN inventory i ON a.film_id = i.film_id
WHERE 1=1
  AND i.film_id is null;
""").show(100,False)

+----------------------+
|title                 |
+----------------------+
|CHOCOLATE DUCK        |
|BUTCH PANTHER         |
|VOLUME HOUSE          |
|ORDER BETRAYED        |
|TADPOLE PARK          |
|KILL BROTHERHOOD      |
|FRANKENSTEIN STRANGER |
|CROSSING DIVORCE      |
|SUICIDES SILENCE      |
|CATCH AMISTAD         |
|PERDITION FARGO       |
|FLOATS GARDEN         |
|GUMP DATE             |
|WALLS ARTIST          |
|GLADIATOR WESTWARD    |
|HOCUS FRIDA           |
|ARSENIC INDEPENDENCE  |
|MUPPET MILE           |
|FIREHOUSE VIETNAM     |
|ROOF CHAMPION         |
|DAZED PUNK            |
|PEARL DESTINY         |
|RAINBOW SHOCK         |
|KENTUCKIAN GIANT      |
|BOONDOCK BALLROOM     |
|COMMANDMENTS EXPRESS  |
|HATE HANDICAP         |
|ARK RIDGEMONT         |
|CROWDS TELEMARK       |
|DELIVERANCE MULHOLLAND|
|RAIDERS ANTITRUST     |
|SISTER FREDDY         |
|VILLAIN DESPERATE     |
|APOLLO TEEN           |
|ALICE FANTASIA        |
|CRYSTAL BREAKING      |
|TREASURE COMMAND      |


Task 5

In [10]:
spark.sql("""
SELECT a.full_name
      ,a.f_cnt
FROM(
    SELECT COUNT(DISTINCT a.title) AS f_cnt
          ,CONCAT(e.first_name,' ',e.last_name) AS full_name
          ,RANK() OVER(order BY COUNT(DISTINCT a.title) DESC) AS r
    FROM film AS a
    JOIN film_category AS b ON b.film_id=a.film_id
    JOIN category AS c ON c.category_id=b.category_id
    JOIN film_actor AS d ON d.film_id=a.film_id
    join actor AS e ON e.actor_id=d.actor_id
    WHERE 1=1
      AND c.name = 'Children'
    group BY 2
    order BY 1 DESC) AS a
WHERE 1=1
  and r <=3;
""").show(100,False)

21/11/04 17:19:09 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+-------------+-----+
|full_name    |f_cnt|
+-------------+-----+
|HELEN VOIGHT |7    |
|SUSAN DAVIS  |6    |
|MARY TANDY   |5    |
|RALPH CRUZ   |5    |
|KEVIN GARLAND|5    |
|WHOOPI HURT  |5    |
+-------------+-----+



                                                                                

Task 6

In [11]:
spark.sql("""
select c.city
      ,COUNT(CASE WHEN a.active=1 then a.customer_id ELSE NULL end) AS cnt_active
      ,COUNT(CASE WHEN a.active=0 then a.customer_id ELSE NULL end) AS cnt_inactive
  from customer AS a
JOIN address AS b ON b.address_id=a.address_id
JOIN city AS c ON c.city_id=b.city_id
group BY 1
order BY 3 desc;
""").show(100,False)



+---------------------+----------+------------+
|city                 |cnt_active|cnt_inactive|
+---------------------+----------+------------+
|Kamyin               |0         |1           |
|Xiangfan             |0         |1           |
|Charlotte Amalie     |0         |1           |
|Wroclaw              |0         |1           |
|Southend-on-Sea      |0         |1           |
|Najafabad            |0         |1           |
|Pingxiang            |0         |1           |
|Kumbakonam           |0         |1           |
|Szkesfehrvr          |0         |1           |
|Daxian               |0         |1           |
|Coatzacoalcos        |0         |1           |
|Ktahya               |0         |1           |
|Amroha               |0         |1           |
|Uluberia             |0         |1           |
|Bat Yam              |0         |1           |
|Toulon               |1         |0           |
|Abha                 |1         |0           |
|Chisinau             |1         |0     

                                                                                

Task 7

In [12]:
spark.sql("""
SELECT a.city_group
      ,name AS Category
      ,rent_hours
FROM(
      SELECT e.name
            ,CASE WHEN h.city LIKE 'a%' OR h.city LIKE 'A%' THEN 'start_w_A'
                  WHEN h.city LIKE '%-%' THEN 'contains_-'
                  ELSE 'T_T' end AS city_group
            ,SUM(date_part('day',return_date - rental_date)* 24 + date_part('hour',return_date - rental_date)) AS rent_hours
            ,ROW_NUMBER() OVER (partition BY CASE WHEN h.city LIKE 'a%' OR h.city LIKE 'A%' THEN 'start_w_A'
                                                  WHEN h.city LIKE '%-%' THEN 'contains_-'
                                                  ELSE 'T_T' END  
                                order BY SUM(date_part('day',return_date - rental_date)* 24 + date_part('hour',return_date - rental_date)) desc) AS rn
      FROM rental AS a
      JOIN inventory AS b ON b.inventory_id=a.inventory_id
      JOIN film AS c ON c.film_id=b.film_id
      JOIN film_category AS d ON d.film_id=c.film_id
      JOIN category AS e ON e.category_id=d.category_id
      JOIN customer AS f ON f.customer_id=a.customer_id
      JOIN address AS g ON g.address_id=f.address_id
      JOIN city AS h ON h.city_id=g.city_id
      WHERE 1=1
        AND h.city LIKE 'a%' OR h.city LIKE 'A%' OR h.city LIKE '%-%'
      GROUP BY 1,2
      order BY 3 desc, 2) AS a
WHERE 1=1
AND rn=1;
""").show(100,False)



+----------+--------+----------+
|city_group|Category|rent_hours|
+----------+--------+----------+
|start_w_A |Sports  |12309     |
|contains_-|Foreign |5538      |
+----------+--------+----------+



                                                                                