# Imports

In [153]:
from pyspark.sql import SparkSession,Window
from pyspark.sql.functions import col,dense_rank,when,rank

# Configure connection

In [154]:
spark=SparkSession.builder.appName("PagilaSpark")\
    .config("spark.jars", "jars/postgresql-42.7.3.jar")\
    .getOrCreate()
jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
pg_properties = {"user": "postgres", "password": "123456", "driver": "org.postgresql.Driver"}


# Adding all tables from DB

In [155]:
actor=spark.read.jdbc(url=jdbc_url,table="public.actor",properties=pg_properties)
address=spark.read.jdbc(url=jdbc_url,table="address",properties=pg_properties)
category=spark.read.jdbc(url=jdbc_url,table="category",properties=pg_properties)
city=spark.read.jdbc(url=jdbc_url,table="city",properties=pg_properties)
country=spark.read.jdbc(url=jdbc_url,table="country",properties=pg_properties)
customer=spark.read.jdbc(url=jdbc_url,table="customer",properties=pg_properties)
film=spark.read.jdbc(url=jdbc_url,table="film",properties=pg_properties)
film_actor=spark.read.jdbc(url=jdbc_url,table="film_actor",properties=pg_properties)
film_category=spark.read.jdbc(url=jdbc_url,table="film_category",properties=pg_properties)
inventory=spark.read.jdbc(url=jdbc_url,table="inventory",properties=pg_properties)
language=spark.read.jdbc(url=jdbc_url,table="language",properties=pg_properties)
payment=spark.read.jdbc(url=jdbc_url,table="payment",properties=pg_properties)
rental=spark.read.jdbc(url=jdbc_url,table="rental",properties=pg_properties)


**1 Task:** Output the number of movies in each category, sorted in descending order.

In [156]:
category_counts=film_category.select("category_id").groupBy("category_id").count()
task_1_result=category_counts.join(on="category_id", how="inner",other=category)\
                             .orderBy(col("count").desc()).drop("last_update")\
                             .withColumnRenamed("count","number_of_films")\
                             .withColumnRenamed("name","category")

task_1_result.show()

+-----------+---------------+-----------+
|category_id|number_of_films|   category|
+-----------+---------------+-----------+
|         12|            152|      Music|
|          7|            152|      Drama|
|         16|            151|     Travel|
|          3|            150|   Children|
|          9|            150|    Foreign|
|         10|            150|      Games|
|          1|            149|     Action|
|         14|            149|     Sci-Fi|
|          2|            148|  Animation|
|         13|            147|        New|
|          4|            147|   Classics|
|          8|            147|     Family|
|          6|            145|Documentary|
|         15|            145|     Sports|
|          5|            143|     Comedy|
|         11|            142|     Horror|
+-----------+---------------+-----------+



**2 Task:** Output the 10 actors whose movies rented the most, sorted in descending order.

In [157]:
film_actor_data=film_actor.select("film_id","actor_id")\
                          .join(actor.select("actor_id","first_name","last_name"),on="actor_id",how="inner")\
                          .join(film.select("film_id"),on="film_id",how="inner")
film_rental_rate=rental.select("inventory_id")\
                       .join(other=inventory.select("inventory_id","film_id"),on="inventory_id",how="inner")\
                       .groupBy("film_id").count()
task_2_result=film_actor_data.select("film_id","actor_id" ,"first_name", "last_name")\
                             .join(other=film_rental_rate,on="film_id",how="inner")\
                             .groupBy("actor_id" ,"first_name", "last_name")\
                             .sum("count").sort(col("sum(count)").desc())\
                             .withColumnRenamed("sum(count)","sum").limit(10)
task_2_result.show()


+--------+----------+-----------+---+
|actor_id|first_name|  last_name|sum|
+--------+----------+-----------+---+
|     107|      GINA|  DEGENERES|753|
|     181|   MATTHEW|     CARREY|678|
|     198|      MARY|     KEITEL|674|
|     144|    ANGELA|WITHERSPOON|654|
|     102|    WALTER|       TORN|640|
|      60|     HENRY|      BERRY|612|
|     150|     JAYNE|      NOLTE|611|
|      37|       VAL|     BOLGER|605|
|      23|    SANDRA|     KILMER|604|
|      90|      SEAN|    GUINESS|599|
+--------+----------+-----------+---+



**3 Task:** Output the category of movies on which the most money was spent.

In [158]:
task_3_result=payment.select("amount","rental_id")\
                     .join(other=rental,on="rental_id",how="inner")\
                     .join(other=inventory,on="inventory_id",how="inner")\
                     .join(other=film_category,on="film_id",how="inner")\
                     .join(other=category,on="category_id",how="inner")\
                     .groupBy("category_id","name").sum("amount").orderBy(col("sum(amount)").desc())\
                     .select("name").limit(1)
task_3_result.show()

+-------+
|   name|
+-------+
|Foreign|
+-------+



**4 Task** Output the names of movies that are not in the inventory.

In [159]:
task_4_result=film.join(other=inventory,on="film_id",how="left_anti").select("title")
task_4_result.show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows


**5 Task** Output the top 3 actors who have appeared most in movies in the “Children” category. If several actors have the same number of movies, output all of them.

In [160]:
all_movies = film_category.join(other=category.filter("name='Children'")\
                                ,on="category_id",how="inner").select("film_id")
film_count = film_actor.join(other=all_movies,on="film_id",how="inner")\
                       .groupBy("actor_id").count()

window1=Window.orderBy(col("count").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
task_5_result = actor.join(other=film_count,on="actor_id",how="left")\
                     .select("first_name", "last_name", "count").fillna(-1)\
                     .withColumn("rank",dense_rank().over(window1))\
                     .filter(col("rank")<=3)
task_5_result.show(task_5_result.count())

25/09/03 15:37:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/03 15:37:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/03 15:37:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/03 15:37:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/03 15:37:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/03 15:37:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/03 1

+----------+---------+-----+----+
|first_name|last_name|count|rank|
+----------+---------+-----+----+
|   RICHARD|     PENN|    9|   1|
|      EWAN|  GOODING|    9|   1|
|    SIDNEY|    CROWE|    9|   1|
|      JADA|    RYDER|    8|   2|
|   SPENCER|     PECK|    8|   2|
|   MATTHEW|   CARREY|    8|   2|
|      ALEC|    WAYNE|    8|   2|
|       KIM|    ALLEN|    8|   2|
|       DAN|   HARRIS|    8|   2|
|      JANE|  JACKMAN|    8|   2|
|   RUSSELL|   TEMPLE|    8|   2|
|      MARY|    TANDY|    8|   2|
|    MINNIE|ZELLWEGER|    7|   3|
|    ANGELA|   HUDSON|    7|   3|
|    WARREN|    NOLTE|    7|   3|
|    AUDREY|  OLIVIER|    7|   3|
|   KENNETH|     TORN|    7|   3|
|      GENE|   WILLIS|    7|   3|
|     HELEN|   VOIGHT|    7|   3|
|     JAMES|     PITT|    7|   3|
|     DARYL| WAHLBERG|    7|   3|
|  JULIANNE|    DENCH|    7|   3|
+----------+---------+-----+----+



25/09/03 15:37:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/03 15:37:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


**6 Task** Output cities with the number of active and inactive customers (active - customer.active = 1). Sort by the number of inactive customers in descending order.

In [161]:
task_5_result=customer.select("active","address_id").join(other=address,on="address_id",how="inner")\
                      .withColumn("inactive", 1 - col("active"))
task_5_result=task_5_result.join(other=city,on="city_id",how="inner").orderBy(col("inactive").desc())\
                           .select("city","active","inactive").distinct()
task_5_result.show()

+-----------+------+--------+
|       city|active|inactive|
+-----------+------+--------+
|  Liaocheng|     1|       0|
|       Otsu|     1|       0|
|     Yantai|     1|       0|
|     Tabriz|     1|       0|
|       Vinh|     1|       0|
|      Laiwu|     1|       0|
|      Beira|     1|       0|
|    Uruapan|     1|       0|
|   Xiangfan|     0|       1|
|     London|     1|       0|
|      Sucre|     1|       0|
|    Memphis|     1|       0|
|  Newcastle|     1|       0|
|    Balaiha|     1|       0|
|    Bat Yam|     0|       1|
|Gandhinagar|     1|       0|
|      Cuman|     1|       0|
| al-Qadarif|     1|       0|
|  Halisahar|     1|       0|
| Portoviejo|     1|       0|
+-----------+------+--------+
only showing top 20 rows


**7 Task** Output the category of movies that have the highest number of total rental hours in the cities (customer.address_id in this city), and that start with the letter “a”. Do the same for cities with a “-” symbol.

In [162]:
film_time = rental.join(other=inventory,on="inventory_id",how="inner")\
                .join(other=film_category,on="film_id",how="inner")\
                .join(other=category,on="category_id",how="inner")\
                .withColumn("time",col("return_date").cast("long")-col("rental_date").cast("long"))\
                .select("customer_id","name","time").withColumnRenamed("name","category")


city_category_time = film_time.join(other=customer,on="customer_id",how="inner")\
                            .join(other=address,on="address_id",how="inner")\
                            .join(other=city,on="city_id",how="inner")\
                            .groupBy("city","category").sum("time")\
                            .select("city","category","sum(time)")

city_category_max_time = city_category_time.withColumn("city_type",when(col("city").startswith("A"),"A")\
                                                       .when(col("city").contains("-"),"-")\
                                                       .otherwise(None))
window2=Window.orderBy(col("total_time").desc()).partitionBy("city_type")
ranked=city_category_max_time.groupby("city_type","category").sum("sum(time)")\
                             .withColumnRenamed("sum(sum(time))","total_time")\
                             .withColumn("rank",rank().over(window2))\
                             .filter((col("rank")==1) & ((col("city_type")=="A") | (col("city_type")=="-")))\
                             .select("city_type","category",((col("total_time")/60/60/24).cast("int")).alias("total_time"))
ranked.show()



+---------+--------+----------+
|city_type|category|total_time|
+---------+--------+----------+
|        -|   Drama|       581|
|        A|Children|      1017|
+---------+--------+----------+

