<a href="https://colab.research.google.com/github/Jesteban247/Procesamiento-Datos/blob/main/SparkSQL_corrige.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version


import pyspark
import random

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL demo") \
    .getOrCreate()

print("Initialization successful")



# Spark SQL

**Spark SQL API reference:** https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html


<ul>
<li> <b>Spark SQL</b>: interface for working with structured and semi-structured data in Spark.
<li> Load data from structured sources (JSON, Hive, Parquet, Cassandra...)
<li> Query data using SQL.
<li> Mix SQL and Python/Java/Scala code.
<li> Spark SQL + <b>DataFrames</b>
</ul>
</div>
</div>

# Reading Structured Data

In [None]:
!wget -q https://gquercini.github.io/courses/plp/tutorials/moviesEmbedded.json

In [None]:
movies_data = spark.read.json("./moviesEmbedded.json")

In [None]:
# Displays the content of the DataFrame to stdout
movies_data.show(vertical=True, truncate=False)

In [None]:
# Print the schema in a tree format
movies_data.printSchema()

## Get data from a Dataframe

In [None]:
# Select only the "name" column
movies_data.select("title").show()

In [None]:
title_and_country = movies_data.select("title", "country")
title_and_country.show()

In [None]:
french_movies = title_and_country.filter(title_and_country['country']=='FR')
french_movies.show()

In [None]:
french_movies.rdd.collect()

In [None]:
# french_movies est un DataFrame
# my_french_movies est un RDD
my_french_movies = french_movies.rdd.map(lambda x: (x.title, x.country))
# Le résultat de la collect() est bien une liste Python.
l = my_french_movies.collect()
print(l[2])

In [None]:
my_french_movies = my_french_movies.mapValues(lambda x: "France")
my_french_movies.collect()

In [None]:
# movies_data est un DataFrame.
movies_by_country = movies_data.groupBy("country").count()
movies_by_country.show()

In [None]:
from pyspark.sql.functions import col

movies_by_country = movies_by_country.sort(col("count").desc())
movies_by_country.show()

# Querying Data with SQL

In [None]:
# Register the DataFrame as a SQL temporary view
movies_data.createOrReplaceTempView("movies_embedded")

In [None]:
movies_by_country = spark.sql('''
            SELECT country, count(*) as nbMovies
            FROM movies_embedded
            GROUP BY country
            ORDER BY nbMovies DESC''')
movies_by_country.show()

In [None]:
countries = movies_by_country.first()[0]
print(countries)

In [None]:
countries = movies_by_country.first().country
print(countries)

<h2>Exercice: Normalizing the dataset</h2>

<p>
<div class="prez">
<ul>
 <li> We have one table <b>movies_embedded</b> that contain the information about the movies
        and also includes the information about the actors and directors.
<li> In relational databases parlance the table <b>movies_embedded</b> is not in first normal form.
</ul>

We intend here to normalize this table by breaking it into several tables:

<ul>
<li> A table <b>movies</b> containing: _id, title, year, genre, summary, country
<li>A table <b>artists</b>  containing: _id, first_name, last_name, birth_date
<li> A table <b>movies_actors</b> containing: _id_movie, _id_actor, role
<li> A table <b>movies_directors</b> containing: _id_movie, _id_director
</ul>

<div>
</p>

## Create the table movies


* Create a new dataframe `movies_table` from the dataframe `movies_data`.

* In the process, rename the column `_id` as `movie_id`.

* By using the function `createOrReplaceTempView`, create a new view `movies`.


In [None]:
movies_table = movies_data\
  .select("_id", "title", "year", "genre", "summary", "country")\
  .withColumnRenamed("_id", "movie_id")
movies_table.show()
movies_table.createOrReplaceTempView("movies")

# Another possibility:
#movies_table = movies_data.selectExpr("_id as movie_id", "title", \
#                                      "year", "genre", "summary", "country")
#movies_table.show()

## Create the table movies_actors

From the dataframe `movies_data`, create a new dataframe `movies_actors`, where each row contains a mapping between a movie and an actor.
Also, create a view `movies_actors` from this dataframe.

**HINT.** In the dataframe `movies_data`, the information about the actors of a film are embedded into a list. We need to take the actors out this list.

In [None]:
import pyspark.sql.functions as F

movies_actors = movies_data\
      .select("_id", F.explode("actors").alias("actor"))\
      .selectExpr("_id as movie_id", "actor._id as actor_id", "actor.role as actor_role" )

movies_actors.show(vertical=False, truncate=False)
movies_actors.createOrReplaceTempView("movies_actors")

## Create the table movies_directors

From the dataframe `movies_data`, create a new dataframe `movies_directors`, where each row contains a mapping between a movie and a director.
Also, create a view `movies_directors` from this dataframe.



In [None]:
movies_directors = movies_data.selectExpr("_id as movie_id",
                                          "director._id as director_id")
movies_directors.show()
movies_directors.createOrReplaceTempView("movies_directors")

## Create table artists

From the dataframe `movies_data`, create a new dataframe `artists` that contain the information about actors and directors.
Also, create a view `artists` from this dataframe.

**HINT.** First obtain a dataframe `actors`, then a dataframe `directors` and finally compute the union of the two dataframes. Make sure that the dataframe `artists` does not contain any duplicates.

In [None]:
actors = movies_data.select(F.explode("actors").alias("actor"))\
    .selectExpr("actor._id as artist_id", \
                "actor.first_name as first_name", \
                "actor.last_name as last_name",
               "actor.birth_date as birth_date")

directors = movies_data.select("director")\
    .selectExpr("director._id as artist_id", \
                "director.first_name as first_name", \
                "director.last_name as last_name",
               "director.birth_date as birth_date")

artists = actors.union(directors).distinct()
artists.show()
artists.createOrReplaceTempView("artists")

## Querying the new collection of data

Write a SQL query to count the number of movies for each artist. Sort the artists so that the one with the most movies appears as the first result.

In [None]:
spark.sql("SELECT a.artist_id, a.first_name, a.last_name, count(*) as nb_movies\
             FROM movies m JOIN movies_actors ma USING(movie_id) JOIN artists a \
                ON a.artist_id=ma.actor_id \
             GROUP BY a.artist_id, a.first_name, a.last_name \
             ORDER BY nb_movies DESC").show()

## Writing the new tables to file

In [None]:
movies_table.write.json("movies_table.json")