In [114]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import max, avg, min
from pyspark.sql.functions import lit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import when
import re

In [115]:
spark = SparkSession.builder.master("local").appName("task").config("spark.driver.extraClassPath", "C:\spark\postgresql-42.5.1.jar").getOrCreate()

In [116]:
url = 'jdbc:postgresql://localhost:5432/postgres'
user = 'postgres'
password = '123456'

In [117]:
queries = ['''SELECT COUNT(*), category.name
            FROM film INNER JOIN film_category USING(film_id)
            INNER JOIN category USING(category_id)
            GROUP BY category.name
            ORDER BY COUNT(*) DESC
''',
'''SELECT COUNT(rental.rental_id), actor.first_name, actor.last_name
            FROM film
            INNER JOIN film_actor USING(film_id)
            INNER JOIN actor USING(actor_id) INNER JOIN inventory USING(film_id)
            INNER JOIN rental USING(inventory_id) GROUP BY film.title, actor.first_name, actor.last_name
            ORDER BY COUNT(rental.rental_id) DESC LIMIT 10
''',
'''SELECT db1.title, db1.numOfRented * db2.replacement_cost as total
            FROM (SELECT COUNT (rental.rental_id) as numOfRented, film.title FROM film INNER JOIN film_actor USING(film_id) INNER JOIN actor USING(actor_id) INNER JOIN inventory USING(film_id) INNER JOIN rental USING(inventory_id) GROUP BY film.title) as db1
            INNER JOIN (SELECT film.title, film.replacement_cost FROM film GROUP BY film.title, film.replacement_cost ORDER BY film.replacement_cost DESC) as db2 
            USING(title) 
            ORDER BY total DESC
''',
'''SELECT film.title, film_id
            FROM film
            LEFT JOIN inventory USING(film_id)
            WHERE inventory.film_id IS NULL
            GROUP BY film.title, film_id
            ORDER BY film_id DESC
''',
'''SELECT COUNT(film.title) as numOfFilms, actor.first_name, actor.last_name, category.name as category
            FROM film
            INNER JOIN film_actor USING(film_id)
            INNER JOIN actor USING(actor_id)
            INNER JOIN film_category USING(film_id)
            INNER JOIN category USING(category_id)
            WHERE category.name = 'Children'
            GROUP BY first_name, last_name, category
            HAVING COUNT(film.title) IN (SELECT COUNT(film.title) as numOfFilms FROM film INNER JOIN film_actor USING(film_id) INNER JOIN actor USING(actor_id) INNER JOIN film_category USING(film_id) INNER JOIN category USING(category_id) WHERE category.name = 'Children' GROUP BY actor.first_name, actor.last_name, category.name ORDER BY COUNT(film.title) DESC LIMIT 3)
            ORDER BY numOfFilms DESC
''',
'''SELECT city, COUNT(active) as num_active, COUNT(inactive) as num_inactive FROM
            (SELECT city,
            CASE
            WHEN customer.active = 1
            THEN 'active'
            ELSE NULL
            END as active,
            CASE
            WHEN customer.active != 1
            THEN 'inactive'
            ELSE NULL
            END as inactive
            FROM city
            INNER JOIN address USING(city_id)
            INNER JOIN customer USING(address_id)
            GROUP BY city, customer.active) as db
            GROUP BY city
            ORDER BY num_inactive DESC
''',
'''SELECT category.name, city.city, SUM(DATE_PART('hour', rental.rental_date::date) - DATE_PART('hour', rental.return_date::date) + 1) as hours
            FROM category 
            INNER JOIN film_category USING(category_id)
            INNER JOIN film USING(film_id)
            INNER JOIN inventory USING(film_id)
            INNER JOIN store USING(store_id)
            INNER JOIN address USING(address_id)
            INNER JOIN city USING(city_id)
            INNER JOIN rental USING(inventory_id)
            GROUP BY city.city, category.name
            ORDER BY hours DESC
''']

In [118]:
['query' + str(i+1) for i in range(7)]

['query1', 'query2', 'query3', 'query4', 'query5', 'query6', 'query7']

In [119]:
counter = 1
for query in queries:
    print(f'query number {counter}:')
    jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load() \
        .show()
    counter+=1

query number 1:
+-----+-----------+
|count|       name|
+-----+-----------+
|   74|     Sports|
|   73|    Foreign|
|   69|     Family|
|   68|Documentary|
|   66|  Animation|
|   64|     Action|
|   63|        New|
|   62|      Drama|
|   61|     Sci-Fi|
|   61|      Games|
|   60|   Children|
|   58|     Comedy|
|   57|     Travel|
|   57|   Classics|
|   56|     Horror|
|   51|      Music|
+-----+-----------+

query number 2:
+-----+----------+-----------+
|count|first_name|  last_name|
+-----+----------+-----------+
|   34|  CHARLIZE|      DENCH|
|   34|       RIP|   CRAWFORD|
|   34|      GARY|    PHOENIX|
|   34|   KIRSTEN|     AKROYD|
|   34|      BURT|     TEMPLE|
|   34|       TIM|    HACKMAN|
|   33|     CHRIS|    BRIDGES|
|   33|     MERYL|      ALLEN|
|   33|      JUDY|       DEAN|
|   33|     JAYNE|SILVERSTONE|
+-----+----------+-----------+

query number 3:
+--------------------+--------------------+
|               title|               total|
+--------------------+------