In [2]:
#importing libraries
import pyspark
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql import Window
from pyspark.sql.functions import col

In [3]:
#check env variable
os.environ['SPARK_HOME']

'C:\\Spark\\spark'

In [4]:
#Create a spark session
spark = SparkSession \
    .builder \
    .appName("Task4") \
    .config("spark.jars", "postgresql-42.4.0.jar") \
    .getOrCreate()

In [5]:
spark

In [6]:
#Get all table names available in db
all_tables = spark.read.format("jdbc"). \
options(
         url='jdbc:postgresql://localhost:5432/Test', # jdbc:postgresql://<host>:<port>/<database>
         dbtable = 'information_schema.tables',
         user='user_1',
         password='password').\
load()

In [7]:
#Filtering table_names
df_names = all_tables.select('table_name').filter((all_tables.table_schema == 'public')&(all_tables.table_type == 'BASE TABLE'))
table_names = [data[0] for data in df_names.select('table_name').collect()]

In [8]:
table_names

['athletes',
 'routes',
 'payment_p2022_07',
 'payment',
 'film',
 'actor',
 'address',
 'category',
 'city',
 'country',
 'customer',
 'film_actor',
 'film_category',
 'inventory',
 'language',
 'rental',
 'staff',
 'store',
 'payment_p2022_01',
 'payment_p2022_02',
 'payment_p2022_03',
 'payment_p2022_04',
 'payment_p2022_05',
 'payment_p2022_06']

In [9]:
#Function for creating a pyspark DataFrame; As well as seeing if it was done successfully
def create_table(name):
    try:
        result = spark.read.format("jdbc"). \
        options(
                 url='jdbc:postgresql://localhost:5432/Test', # jdbc:postgresql://<host>:<port>/<database>
                 dbtable = name,
                 user='user_1',
                 password='password').\
        load()
        print(f"Table '{name}' loaded")
        return result
    except Exception:
        return f'Error table [{name}] not loaded'

In [10]:
#Creating pyspark DataFrame using all fetched table_names
for t in table_names:
    value = create_table(t)
    globals()[f"{t}"] = value

Table 'athletes' loaded
Table 'routes' loaded
Table 'payment_p2022_07' loaded
Table 'payment' loaded
Table 'film' loaded
Table 'actor' loaded
Table 'address' loaded
Table 'category' loaded
Table 'city' loaded
Table 'country' loaded
Table 'customer' loaded
Table 'film_actor' loaded
Table 'film_category' loaded
Table 'inventory' loaded
Table 'language' loaded
Table 'rental' loaded
Table 'staff' loaded
Table 'store' loaded
Table 'payment_p2022_01' loaded
Table 'payment_p2022_02' loaded
Table 'payment_p2022_03' loaded
Table 'payment_p2022_04' loaded
Table 'payment_p2022_05' loaded
Table 'payment_p2022_06' loaded


<div class="alert alert-block alert-info">
<b>&#9658</b> #Q1 Вывести количество фильмов в каждой категории, отсортировать по убыванию.</div>

In [11]:
joined = film_category.join(film, film_category.film_id == film.film_id)\
.join(category, film_category.category_id == category.category_id)\
.select([film["film_id"],"title", category["category_id"], category["name"]])

joined.groupBy('name').count().orderBy('count', ascending = False).show()

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|      Games|   61|
|     Sci-Fi|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



<div class="alert alert-block alert-info">
<b>&#9658</b> #Q2 Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.</div>

In [12]:
joined = film.join(film_actor, film['film_id'] == film_actor['film_id'])
joined.select([film['film_id'], film_actor["actor_id"], "rental_duration"])

df = joined.groupBy(film_actor["actor_id"]).sum("rental_duration").orderBy("sum(rental_duration)", ascending=False)
df.join(actor, df['actor_id'] == actor['actor_id']).\
select(actor['actor_id'], 'sum(rental_duration)', 'first_name', 'last_name').\
orderBy("sum(rental_duration)", ascending=False).show(10)

+--------+--------------------+----------+---------+
|actor_id|sum(rental_duration)|first_name|last_name|
+--------+--------------------+----------+---------+
|     107|                 209|      GINA|DEGENERES|
|     102|                 201|    WALTER|     TORN|
|     198|                 192|      MARY|   KEITEL|
|     181|                 190|   MATTHEW|   CARREY|
|      65|                 183|    ANGELA|   HUDSON|
|     106|                 183|   GROUCHO|    DUNST|
|      23|                 181|    SANDRA|   KILMER|
|      60|                 180|     HENRY|    BERRY|
|      13|                 179|       UMA|     WOOD|
|     119|                 178|    WARREN|  JACKMAN|
+--------+--------------------+----------+---------+
only showing top 10 rows



<div class="alert alert-block alert-info">
<b>&#9658</b> #Q3 Вывести категорию фильмов, на которую потратили больше всего денег.</div>

In [13]:
joined = film.join(film_category, film['film_id'] == film_category['film_id'])\
.join(category, film_category['category_id'] == category['category_id'])

df = joined.withColumn("total_costs", joined.rental_rate * joined.rental_duration + joined.replacement_cost)

df.groupBy("name").sum("total_costs").orderBy("sum(total_costs)", ascending = False).show(1)

+------+----------------+
|  name|sum(total_costs)|
+------+----------------+
|Sports|         2640.77|
+------+----------------+
only showing top 1 row



<div class="alert alert-block alert-info">
<b>&#9658</b> #Q4 Вывести названия фильмов, которых нет в inventory.</div>

In [14]:
film.join(inventory, film['film_id'] == inventory['film_id'], 'left_anti').\
select(film['film_id'], film['title']).orderBy('film_id').show(film.count())

+-------+--------------------+
|film_id|               title|
+-------+--------------------+
|     14|      ALICE FANTASIA|
|     33|         APOLLO TEEN|
|     36|      ARGONAUTS TOWN|
|     38|       ARK RIDGEMONT|
|     41|ARSENIC INDEPENDENCE|
|     87|   BOONDOCK BALLROOM|
|    108|       BUTCH PANTHER|
|    128|       CATCH AMISTAD|
|    144| CHINATOWN GLADIATOR|
|    148|      CHOCOLATE DUCK|
|    171|COMMANDMENTS EXPRESS|
|    192|    CROSSING DIVORCE|
|    195|     CROWDS TELEMARK|
|    198|    CRYSTAL BREAKING|
|    217|          DAZED PUNK|
|    221|DELIVERANCE MULHO...|
|    318|   FIREHOUSE VIETNAM|
|    325|       FLOATS GARDEN|
|    332|FRANKENSTEIN STRA...|
|    359|  GLADIATOR WESTWARD|
|    386|           GUMP DATE|
|    404|       HATE HANDICAP|
|    419|         HOCUS FRIDA|
|    495|    KENTUCKIAN GIANT|
|    497|    KILL BROTHERHOOD|
|    607|         MUPPET MILE|
|    642|      ORDER BETRAYED|
|    669|       PEARL DESTINY|
|    671|     PERDITION FARGO|
|    701

<div class="alert alert-block alert-info">
<b>&#9658</b> Q5 Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех.</div>

In [20]:
joined = film_category.join(film_actor, film_category['film_id'] == film_actor['film_id']).\
join(category, film_category['category_id'] == category['category_id']).\
where(category['name'] == 'Children').select(film_category["film_id"], "name", "actor_id")

df = joined.groupBy('actor_id').count()
window = Window.partitionBy().orderBy(functions.desc('count'))
df = df.withColumn('rank', functions.dense_rank().over(window))

In [21]:
df = df.withColumn('rank', functions.dense_rank().over(window))
df = df.join(actor, df['actor_id'] == actor['actor_id'])
df.where(df['rank'] <= 3).select(actor['actor_id'], 'first_name', 'last_name', 'count', 'rank').orderBy('rank').show(df.count())

+--------+----------+---------+-----+----+
|actor_id|first_name|last_name|count|rank|
+--------+----------+---------+-----+----+
|      17|     HELEN|   VOIGHT|    7|   1|
|      66|      MARY|    TANDY|    5|   2|
|     127|     KEVIN|  GARLAND|    5|   2|
|     140|    WHOOPI|     HURT|    5|   2|
|      80|     RALPH|     CRUZ|    5|   2|
|     101|     SUSAN|    DAVIS|    4|   3|
|      93|     ELLEN|  PRESLEY|    4|   3|
|      81|  SCARLETT|    DAMON|    4|   3|
|      13|       UMA|     WOOD|    4|   3|
|     142|      JADA|    RYDER|    4|   3|
|      37|       VAL|   BOLGER|    4|   3|
|      23|    SANDRA|   KILMER|    4|   3|
|      58| CHRISTIAN|   AKROYD|    4|   3|
|     131|      JANE|  JACKMAN|    4|   3|
|      92|   KIRSTEN|   AKROYD|    4|   3|
|     173|      ALAN| DREYFUSS|    4|   3|
|     109| SYLVESTER|     DERN|    4|   3|
|     150|     JAYNE|    NOLTE|    4|   3|
|     187|     RENEE|     BALL|    4|   3|
+--------+----------+---------+-----+----+



<div class="alert alert-block alert-info">
<b>&#9658</b> #Q6 Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию.</div>

In [22]:
joined = customer.join(address, customer['address_id'] == address['address_id']).\
join(city, address['city_id'] == city['city_id'])

In [23]:
joined = joined.withColumn('non_active', functions.when(joined['active'] == 1, 0).otherwise(1))
joined.select('customer_id', address['city_id'], 'city', 'active', 'non_active').show(10)

+-----------+-------+----------+------+----------+
|customer_id|city_id|      city|active|non_active|
+-----------+-------+----------+------+----------+
|        144|    340| Molodetno|     1|         0|
|        458|    532|     Tegal|     1|         0|
|        466|    145|  Dongying|     1|         0|
|        491|    337| Mit Ghamr|     1|         0|
|        239|    118|    Ciomas|     1|         0|
|        387|    315|Lubumbashi|     1|         0|
|        534|     24|    Amroha|     0|         1|
|         27|    138|     Davao|     1|         0|
|        511|    502|    Suihua|     1|         0|
|         81|    319| Mahajanga|     1|         0|
+-----------+-------+----------+------+----------+
only showing top 10 rows



In [24]:
df = joined.groupBy('city').sum('active', 'non_active')
df.orderBy('sum(non_active)', ascending = False).show()

+------------------+-----------+---------------+
|              city|sum(active)|sum(non_active)|
+------------------+-----------+---------------+
|         Pingxiang|          0|              1|
|       Szkesfehrvr|          0|              1|
|  Charlotte Amalie|          0|              1|
|         Najafabad|          0|              1|
|           Wroclaw|          0|              1|
|            Ktahya|          0|              1|
|            Amroha|          0|              1|
|   Southend-on-Sea|          0|              1|
|           Bat Yam|          0|              1|
|            Kamyin|          0|              1|
|          Xiangfan|          0|              1|
|            Daxian|          0|              1|
|          Uluberia|          0|              1|
|     Coatzacoalcos|          0|              1|
|        Kumbakonam|          0|              1|
|          Fengshan|          1|              0|
|A Corua (La Corua)|          1|              0|
|           El Alto|

<div class="alert alert-block alert-info">
<b>&#9658</b> #Q7 Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. Тоже самое сделать для городов в которых есть символ “-”.</div>

In [25]:
joined = film.join(film_category, film['film_id'] == film_category['film_id'])\
.join(category, film_category['category_id'] == category['category_id']).\
select(film['film_id'], film_category['category_id'], category['name'])

df = joined.join(inventory, joined['film_id'] == inventory['film_id'])\
.join(rental, inventory['inventory_id'] == rental['inventory_id']).\
select(joined['film_id'], joined['category_id'], joined['name'], inventory['inventory_id']\
, 'rental_id', 'customer_id', 'rental_date', 'return_date')

df = df.withColumn('duration_days', functions.datediff(df['return_date'], df['rental_date']))
df.show(5)

+-------+-----------+---------+------------+---------+-----------+-------------------+-------------------+-------------+
|film_id|category_id|     name|inventory_id|rental_id|customer_id|        rental_date|        return_date|duration_days|
+-------+-----------+---------+------------+---------+-----------+-------------------+-------------------+-------------+
|    697|          1|   Action|        3175|    14173|        152|2022-08-21 05:01:01|2022-08-22 04:40:01|            1|
|    697|          1|   Action|        3175|     7933|        297|2022-07-28 14:27:27|2022-07-29 12:34:27|            1|
|    583|          2|Animation|        2659|    13714|        277|2022-08-20 11:41:09|2022-08-22 08:28:09|            2|
|    583|          2|Animation|        2659|     9176|        260|2022-07-30 13:50:54|2022-08-02 16:25:54|            3|
|    583|          2|Animation|        2659|     6235|        379|2022-07-11 16:17:51|2022-07-17 13:14:51|            6|
+-------+-----------+---------+-

In [26]:
df_final = df.join(customer, df['customer_id'] == customer['customer_id']).\
join(address, customer['address_id'] == address['address_id']).\
join(city, address['city_id'] == city['city_id']).\
select('film_id', customer['store_id'], 'name', 'duration_days', customer['address_id'], df['customer_id'], city['city_id'], city['city'])

window = Window.partitionBy('city').orderBy(functions.desc('sum(duration_days)'))
df_final = df_final.groupBy('city', 'name').sum('duration_days')
df_final = df_final.withColumn('rank', functions.dense_rank().over(window))
df_final.where((df_final['rank'] == 1) & ((col('city').like('A%')) | (col('city').like('%-%')))).show(df_final.count())

+--------------------+-----------+------------------+----+
|                city|       name|sum(duration_days)|rank|
+--------------------+-----------+------------------+----+
|  A Corua (La Corua)|     Comedy|                24|   1|
|                Abha|     Sci-Fi|                23|   1|
|           Abu Dhabi|     Sci-Fi|                21|   1|
|                Acua|      Drama|                23|   1|
|               Adana|     Comedy|                22|   1|
|         Addis Abeba|     Family|                21|   1|
|                Aden|        New|                33|   1|
|               Adoni|   Children|                17|   1|
|          Ahmadnagar|   Children|                25|   1|
|            Akishima|   Children|                26|   1|
|               Akron|     Sports|                22|   1|
|               Akron|     Horror|                22|   1|
|         Alessandria|     Comedy|                15|   1|
|Allappuzha (Allep...|        New|                23|   