In [102]:
from pyspark.sql import SparkSession, Window, functions as fun

In [84]:
spark = SparkSession.\
    builder.\
    appName("spaaaaaark").\
    config("spark.jars", "postgresql-42.4.0.jar").master("local").\
    getOrCreate()

In [85]:
df_reader = spark.read\
    .format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:8080/pagila")\
    .option("user", "postgres")\
    .option("password", "secret")\
    .option("driver", "org.postgresql.Driver")


In [86]:
actor = df_reader.option('dbtable', 'actor').load()
address = df_reader.option('dbtable', 'address').load()
category = df_reader.option('dbtable', 'category').load()
city = df_reader.option('dbtable', 'city').load()
country = df_reader.option('dbtable', 'country').load()
customer = df_reader.option('dbtable', 'customer').load()
film = df_reader.option('dbtable', 'film').load()
film_actor = df_reader.option('dbtable', 'film_actor').load()
film_category = df_reader.option('dbtable', 'film_category').load()
inventory = df_reader.option('dbtable', 'inventory').load()
language = df_reader.option('dbtable', 'language').load()
payment = df_reader.option('dbtable', 'payment').load()
rental = df_reader.option('dbtable', 'rental').load()
staff = df_reader.option('dbtable', 'staff').load()
store = df_reader.option('dbtable', 'store').load()

In [87]:
actor.show(5)

+--------+----------+------------+-------------------+
|actor_id|first_name|   last_name|        last_update|
+--------+----------+------------+-------------------+
|       1|  PENELOPE|     GUINESS|2020-02-15 12:34:33|
|       2|      NICK|    WAHLBERG|2020-02-15 12:34:33|
|       3|        ED|       CHASE|2020-02-15 12:34:33|
|       4|  JENNIFER|       DAVIS|2020-02-15 12:34:33|
|       5|    JOHNNY|LOLLOBRIGIDA|2020-02-15 12:34:33|
+--------+----------+------------+-------------------+
only showing top 5 rows



In [88]:
# task1

category.join(film_category, on='category_id')\
    .groupBy('name')\
    .count()\
    .orderBy(fun.col('count').desc())\
    .show()

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|      Games|   61|
|     Sci-Fi|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



In [89]:
# task2

actor.join(film_actor, on='actor_id')\
    .join(film, on='film_id')\
    .groupBy('first_name', 'last_name')\
    .agg(fun.sum('rental_duration').alias('all'))\
    .orderBy(fun.col('all').desc())\
    .show(10)

+----------+---------+---+
|first_name|last_name|all|
+----------+---------+---+
|     SUSAN|    DAVIS|242|
|      GINA|DEGENERES|209|
|    WALTER|     TORN|201|
|      MARY|   KEITEL|192|
|   MATTHEW|   CARREY|190|
|   GROUCHO|    DUNST|183|
|    ANGELA|   HUDSON|183|
|    SANDRA|   KILMER|181|
|     HENRY|    BERRY|180|
|       UMA|     WOOD|179|
+----------+---------+---+
only showing top 10 rows



In [90]:
# task3

category.join(film_category, on='category_id')\
    .join(film, on='film_id')\
    .join(inventory, on='film_id')\
    .join(rental, on='inventory_id')\
    .join(payment, on='rental_id')\
    .groupBy('name')\
    .agg(fun.sum('amount').alias('total_sum'))\
    .orderBy(fun.col('total_sum').desc())\
    .show(1)

+------+---------+
|  name|total_sum|
+------+---------+
|Sports|  5314.21|
+------+---------+
only showing top 1 row



In [91]:
# task4

film.join(inventory, on='film_id', how='left')\
    .groupby('film_id', 'title')\
    .agg(fun.count('inventory_id').alias('number'))\
    .filter(fun.col('number') == 0)\
    .select(fun.col('title'))\
    .orderBy('title')\
    .show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



In [92]:
# task5

actor_table = category.join(film_category, on='category_id')\
    .join(film, on='film_id')\
    .join(film_actor, on='film_id')\
    .join(actor, on='actor_id')\
    .where(fun.col('name') == 'Children')\
    .groupBy('first_name', 'last_name').count()

count_filter = actor_table.select(fun.col('count'))\
    .orderBy(fun.col('count').desc())\
    .distinct()\
    .limit(3)\
    .rdd.map(lambda row : row[0]).collect()

actor_table.filter(fun.col('count').isin(count_filter)).orderBy(fun.col('count').desc()).show()

+----------+---------+-----+
|first_name|last_name|count|
+----------+---------+-----+
|     HELEN|   VOIGHT|    7|
|     SUSAN|    DAVIS|    6|
|     KEVIN|  GARLAND|    5|
|     RALPH|     CRUZ|    5|
|      MARY|    TANDY|    5|
|    WHOOPI|     HURT|    5|
+----------+---------+-----+



In [93]:
# task6

city.join(address, on='city_id')\
    .join(customer, on='address_id')\
    .groupBy('city')\
    .agg(fun.sum(fun.when(customer.active == 1, 1).otherwise(0)).alias('active'),
        fun.sum(fun.when(customer.active == 0, 1).otherwise(0)).alias('inactive'))\
    .orderBy(fun.col('inactive').desc())\
    .show()

+------------------+------+--------+
|              city|active|inactive|
+------------------+------+--------+
|         Pingxiang|     0|       1|
|       Szkesfehrvr|     0|       1|
|  Charlotte Amalie|     0|       1|
|         Najafabad|     0|       1|
|           Wroclaw|     0|       1|
|            Ktahya|     0|       1|
|           Bat Yam|     0|       1|
|   Southend-on-Sea|     0|       1|
|            Amroha|     0|       1|
|            Kamyin|     0|       1|
|          Xiangfan|     0|       1|
|            Daxian|     0|       1|
|          Uluberia|     0|       1|
|     Coatzacoalcos|     0|       1|
|        Kumbakonam|     0|       1|
|A Corua (La Corua)|     1|       0|
|          Fengshan|     1|       0|
|          Chisinau|     1|       0|
|           Udaipur|     1|       0|
|              Linz|     1|       0|
+------------------+------+--------+
only showing top 20 rows



In [105]:
# task7

table = city.join(address, on='city_id')\
    .join(customer, on='address_id')\
    .join(rental, on='customer_id')\
    .join(inventory, on='inventory_id')\
    .join(film, on='film_id')\
    .join(film_category, on='film_id')\
    .join(category, on='category_id')\
    .filter(fun.col('return_date').isNotNull())\
    .select(fun.col('name').alias('category_name'), fun.col('return_date'), fun.col('rental_date'), fun.col('city'))

category_table1 = table\
    .filter(fun.col('city').like('A%') | fun.col('city').like('a%'))\
    .groupBy('category_name')\
    .agg(fun.sum(fun.col('return_date') - fun.col('rental_date')).alias('period'))\
    .orderBy(fun.col('period').desc())\
    .limit(1)

category_table2 = table\
    .filter(fun.col('city').like('%-%'))\
    .groupBy('category_name')\
    .agg(fun.sum(fun.col('return_date') - fun.col('rental_date')).alias('period'))\
    .orderBy(fun.col('period').desc())\
    .limit(1)

category_table1.union(category_table2).show()

+-------------+--------------------+
|category_name|              period|
+-------------+--------------------+
|       Sports|INTERVAL '515 00:...|
|      Foreign|INTERVAL '269 16:...|
+-------------+--------------------+

