<a href="https://colab.research.google.com/github/andrey-de/stepik_de_jun_spark_in_collab/blob/main/SparkTask3_movies_and_actors.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import sum, desc, max, min, count
from pyspark.sql.functions import year, current_date
from pyspark.sql.functions import year, month, avg

# Create SparkSession
spark = SparkSession.builder.appName("Read CSV Example").getOrCreate()

In [11]:
# Read CSV-files
df_actors = spark.read.csv("/content/sample_data/actors.csv", header=True, inferSchema=True)
df_movies = spark.read.csv("/content/sample_data/movies.csv", header=True, inferSchema=True)
df_movie_actors = spark.read.csv("/content/sample_data/movie_actors.csv", header=True, inferSchema=True)

In [12]:
# Print schemas
df_actors.printSchema()
df_movies.printSchema()
df_movie_actors.printSchema()

root
 |-- actor_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- country: string (nullable = true)

root
 |-- movie_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- budget: double (nullable = true)

root
 |-- movie_id: integer (nullable = true)
 |-- actor_id: integer (nullable = true)



In [13]:
# Create temp views
df_actors.createOrReplaceTempView("actors")
df_movies.createOrReplaceTempView("movies")
df_movie_actors.createOrReplaceTempView("movie_actors")

In [14]:
spark.sql("SELECT * FROM actors").show()

+--------+--------+----------+---------+
|actor_id|    name|birth_date|  country|
+--------+--------+----------+---------+
|       1| Actor_1|1960-12-31|   Canada|
|       2| Actor_2|1962-12-31|       UK|
|       3| Actor_3|1964-12-31|       UK|
|       4| Actor_4|1966-12-31|       UK|
|       5| Actor_5|1968-12-31|    India|
|       6| Actor_6|1970-12-31|      USA|
|       7| Actor_7|1972-12-31|    India|
|       8| Actor_8|1974-12-31|Australia|
|       9| Actor_9|1976-12-31|      USA|
|      10|Actor_10|1978-12-31|Australia|
|      11|Actor_11|1980-12-31|      USA|
|      12|Actor_12|1982-12-31|    India|
|      13|Actor_13|1984-12-31|       UK|
|      14|Actor_14|1986-12-31|   Canada|
|      15|Actor_15|1988-12-31|       UK|
|      16|Actor_16|1990-12-31|    India|
|      17|Actor_17|1992-12-31|      USA|
|      18|Actor_18|1994-12-31|       UK|
|      19|Actor_19|1996-12-31|    India|
|      20|Actor_20|1998-12-31|Australia|
+--------+--------+----------+---------+
only showing top

In [15]:
spark.sql("SELECT * FROM movies").show()

+--------+--------+------+------------+-------------+
|movie_id|   title| genre|release_date|       budget|
+--------+--------+------+------------+-------------+
|       1| Movie_1|Horror|  2000-12-31|8.660058311E7|
|       2| Movie_2|Comedy|  2001-12-31|1.274740083E7|
|       3| Movie_3|Action|  2002-12-31| 1.80157747E7|
|       4| Movie_4| Drama|  2003-12-31|4.817612061E7|
|       5| Movie_5| Drama|  2004-12-31| 7.40501611E7|
|       6| Movie_6|Action|  2005-12-31|1.476121831E7|
|       7| Movie_7| Drama|  2006-12-31|4.456703643E7|
|       8| Movie_8| Drama|  2007-12-31|4.880227617E7|
|       9| Movie_9|Action|  2008-12-31|2.201627853E7|
|      10|Movie_10|Action|  2009-12-31|1.244027929E7|
|      11|Movie_11|Comedy|  2010-12-31|8.380567138E7|
|      12|Movie_12|Comedy|  2011-12-31|5.074409933E7|
|      13|Movie_13|Action|  2012-12-31|   2423742.36|
|      14|Movie_14|Sci-Fi|  2013-12-31|8.049514883E7|
|      15|Movie_15| Drama|  2014-12-31|9.809858674E7|
|      16|Movie_16|Comedy|  

In [18]:
#result 4.1
spark.sql("""
  SELECT genre, COUNT(*) AS cnt
  FROM movies
  GROUP BY genre
  ORDER BY cnt DESC
  LIMIT 5
""").show()

+------+---+
| genre|cnt|
+------+---+
| Drama|  6|
|Action|  6|
|Comedy|  4|
|Horror|  2|
|Sci-Fi|  2|
+------+---+



In [20]:
#result 4.2
spark.sql("""
  WITH movies_cnt AS (
    SELECT actor_id, COUNT(*) AS num_movies
    FROM movie_actors
    GROUP BY actor_id
    ORDER BY num_movies DESC
    LIMIT 1
    )

  SELECT name, num_movies
  FROM movies_cnt
  LEFT JOIN actors ON movies_cnt.actor_id = actors.actor_id
""").show()

+--------+----------+
|    name|num_movies|
+--------+----------+
|Actor_17|         5|
+--------+----------+



In [21]:
#result 4.3
spark.sql("""
  SELECT genre, AVG(budget) AS avg_budget
  FROM movies
  GROUP BY genre
  ORDER BY avg_budget DESC
""").show()

+------+--------------------+
| genre|          avg_budget|
+------+--------------------+
|Horror|      8.7281876775E7|
|Sci-Fi|       7.809715175E7|
| Drama| 6.076021856166667E7|
|Comedy|     5.20709662225E7|
|Action|2.7492742561666667E7|
+------+--------------------+



In [24]:
#result 4.3
spark.sql("""
  WITH actors_cnt AS (
    SELECT movie_id, country, COUNT(*) AS num_actors
    FROM movie_actors
    LEFT JOIN actors ON movie_actors.actor_id = actors.actor_id
    GROUP BY movie_id, country
    HAVING num_actors > 1
    )

  SELECT title, country, num_actors
  FROM actors_cnt
  LEFT JOIN movies ON actors_cnt.movie_id = movies.movie_id
  ORDER BY title, country, num_actors
""").show()

+--------+---------+----------+
|   title|  country|num_actors|
+--------+---------+----------+
| Movie_1|    India|         3|
|Movie_10|       UK|         2|
|Movie_10|      USA|         2|
|Movie_15|    India|         2|
|Movie_18|Australia|         2|
| Movie_2|      USA|         2|
| Movie_3|      USA|         2|
| Movie_7|    India|         2|
| Movie_7|      USA|         2|
+--------+---------+----------+



In [25]:
spark.stop()