In [1]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import psycopg2
import configparser
import pandas as pd
import findspark
findspark.init()
import warnings
warnings.filterwarnings("ignore")

In [2]:
config = configparser.ConfigParser()
config.read("config.ini")
config = config['postgresql']
conn = psycopg2.connect(
    host=config['db_host'],
    port=config['db_port'],
    dbname=config['db_name'],
    user=config['db_user'],
    password=config['db_password'])

In [3]:
query = "select tablename FROM pg_catalog.pg_tables where schemaname = 'public' and tablename not like 'payment_%';"
table_names = pd.read_sql_query(query, conn)['tablename'].to_list()
spark = SparkSession(SparkContext.getOrCreate())
table_dict = dict()
for table_name in table_names:
    query = f"select * from {table_name}"
    df_pandas = pd.read_sql_query(query, conn)
    df_pandas_dropna =  df_pandas.dropna(axis='columns', how='all')
    if table_name == 'staff':
        df_pandas_dropna = df_pandas_dropna.drop('picture',axis=1)
    table_dict[table_name] = spark.createDataFrame(df_pandas_dropna)

In [4]:
df_film = table_dict['film']
df_film_category = table_dict['film_category']
df_category = table_dict['category']
df_film_actor = table_dict['film_actor']
df_actor = table_dict['actor']
df_inventory = table_dict['inventory']
df_customer = table_dict['customer']
df_address = table_dict['address']
df_city = table_dict['city']
df_rental = table_dict['rental']

In [5]:
from pyspark.sql.functions import col, desc, sum, dense_rank, lit
from pyspark.sql import Window

In [16]:
spark.conf.set("spark.sql.shuffle.partitions",2)
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", True)
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize",10000)

In [29]:
#Task1. Вывести количество фильмов в каждой категории, отсортировать по убыванию.

df_film.join(df_film_category,'film_id').groupby('category_id').count().join(df_category,'category_id').select(['name','count']).sort(desc("count")).collect()

[Row(name='Sports', count=74),
 Row(name='Foreign', count=73),
 Row(name='Family', count=69),
 Row(name='Documentary', count=68),
 Row(name='Animation', count=66),
 Row(name='Action', count=64),
 Row(name='New', count=63),
 Row(name='Drama', count=62),
 Row(name='Games', count=61),
 Row(name='Sci-Fi', count=61),
 Row(name='Children', count=60),
 Row(name='Comedy', count=58),
 Row(name='Classics', count=57),
 Row(name='Travel', count=57),
 Row(name='Horror', count=56),
 Row(name='Music', count=51)]

In [32]:
#Task2. Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.

df_film.join(df_film_actor,'film_id').join(df_actor,'actor_id').groupby(['first_name','last_name']).agg(sum('rental_duration').alias('sum_ren_dur')).sort(desc('sum_ren_dur')).limit(10).show()

+----------+---------+-----------+
|first_name|last_name|sum_ren_dur|
+----------+---------+-----------+
|     SUSAN|    DAVIS|        242|
|      GINA|DEGENERES|        209|
|    WALTER|     TORN|        201|
|      MARY|   KEITEL|        192|
|   MATTHEW|   CARREY|        190|
|    ANGELA|   HUDSON|        183|
|   GROUCHO|    DUNST|        183|
|    SANDRA|   KILMER|        181|
|     HENRY|    BERRY|        180|
|       UMA|     WOOD|        179|
+----------+---------+-----------+



In [33]:
#Task3. Вывести категорию фильмов, на которую потратили больше всего денег.
df_film.join(df_film_category,'film_id').join(df_category,'category_id').groupby('name').agg(sum('replacement_cost').alias('sum_cost')).sort(desc('sum_cost')).limit(1).show()

+------+------------------+
|  name|          sum_cost|
+------+------------------+
|Sports|1509.2600000000004|
+------+------------------+



In [34]:
#Task4. Вывести названия фильмов, которых нет в inventory.
df_film.join(df_inventory,'film_id','leftanti').select('title').sort('title').show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



In [35]:
#Task5. Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. 
#       Если у нескольких актеров одинаковое кол-во фильмов, вывести всех
df_film.join(df_film_category,'film_id').join(df_category.where(df_category['name']=='Children'),'category_id').join(df_film_actor,'film_id').join(df_actor,'actor_id').groupby(['first_name','last_name']).count() \
.withColumn('dns_rnk',dense_rank().over(Window.orderBy(desc('count')))).where(col('dns_rnk')<=3).select(['first_name','last_name','count']).show()

+----------+---------+-----+
|first_name|last_name|count|
+----------+---------+-----+
|     HELEN|   VOIGHT|    7|
|     SUSAN|    DAVIS|    6|
|    WHOOPI|     HURT|    5|
|     KEVIN|  GARLAND|    5|
|      MARY|    TANDY|    5|
|     RALPH|     CRUZ|    5|
+----------+---------+-----+



In [67]:
#Task6. Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). 
#       Отсортировать по количеству неактивных клиентов по убыванию.
df_customer.join(df_address,'address_id').join(df_city,'city_id').groupby(['city','active']).count().sort(desc('active'),desc('count')).show()

+-----------+------+-----+
|       city|active|count|
+-----------+------+-----+
|     London|     1|    2|
|     Aurora|     1|    2|
|     Moscow|     1|    1|
|       Ondo|     1|    1|
|    Fontana|     1|    1|
|    Sullana|     1|    1|
|    Asuncin|     1|    1|
| Kragujevac|     1|    1|
| Greensboro|     1|    1|
|   Monclova|     1|    1|
|      Tanza|     1|    1|
|  Florencia|     1|    1|
|     al-Ayn|     1|    1|
|   Boksburg|     1|    1|
|     Salala|     1|    1|
|Antofagasta|     1|    1|
|       Fuyu|     1|    1|
|Saint Louis|     1|    1|
|  Syktyvkar|     1|    1|
|  al-Manama|     1|    1|
+-----------+------+-----+
only showing top 20 rows



In [28]:
#Task7. Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. 
#       Тоже самое сделать для городов в которых есть символ “-”.
df_temp = df_film.join(df_film_category,'film_id').join(df_inventory,'film_id').join(df_rental,'inventory_id').join(df_customer,'customer_id').join(df_address,'address_id').join(df_city.where(col('city').like('a%') | col('city').like('A%') | col('city').like('%-%')),'city_id').join(df_category,'category_id') 
df_first = df_temp.where(col('city').like('a%') | col('city').like('A%')).groupby('name').agg(sum('rental_duration').alias('max_rent')).sort(desc('max_rent')).limit(1).withColumn('city_template',lit('a.. or A..'))
df_second = df_temp.where(col('city').like('%-%')).groupby('name').agg(sum('rental_duration').alias('max_rent')).sort(desc('max_rent')).limit(1).withColumn('city_template',lit('..-..'))
df_first.union(df_second).show()

+-------+--------+-------------+
|   name|max_rent|city_template|
+-------+--------+-------------+
| Sports|     469|   a.. or A..|
|Foreign|     275|        ..-..|
+-------+--------+-------------+

