MVP da Disciplina Engenharia de Dados

Aluno: Marcelo Chagas Mathias Netto

In [0]:
display(dbutils.fs.ls('dbfs:/databricks-datasets/cs100/lab4/data-001/'))

path,name,size,modificationTime
dbfs:/databricks-datasets/cs100/lab4/data-001/movies.dat,movies.dat,171308,1455234596000
dbfs:/databricks-datasets/cs100/lab4/data-001/ratings.dat.gz,ratings.dat.gz,2837683,1455234597000


In [0]:
# Exibir algumas linhas do arquivo
dbutils.fs.head("dbfs:/databricks-datasets/cs100/lab4/data-001/movies.dat", 100)

[Truncated to first 100 bytes]
Out[34]: "1::Toy Story (1995)::Animation|Children's|Comedy\n2::Jumanji (1995)::Adventure|Children's|Fantasy\n3::"

In [0]:
arquivo = "dbfs:/databricks-datasets/cs100/lab4/data-001/movies.dat"

# Ler o arquivo usando '::' como delimitador
movies_spark_df = spark.read.option("delimiter", "::").csv(arquivo, inferSchema=True, header=False)

# Renomear as colunas
movies_spark_df = movies_spark_df.withColumnRenamed("_c0", "movieId") \
       .withColumnRenamed("_c1", "title") \
       .withColumnRenamed("_c2", "genres")

# Mostrar os primeiros registros
movies_spark_df.show(5, truncate=False)

+-------+----------------------------------+----------------------------+
|movieId|title                             |genres                      |
+-------+----------------------------------+----------------------------+
|1      |Toy Story (1995)                  |Animation|Children's|Comedy |
|2      |Jumanji (1995)                    |Adventure|Children's|Fantasy|
|3      |Grumpier Old Men (1995)           |Comedy|Romance              |
|4      |Waiting to Exhale (1995)          |Comedy|Drama                |
|5      |Father of the Bride Part II (1995)|Comedy                      |
+-------+----------------------------------+----------------------------+
only showing top 5 rows



In [0]:
# Tipo dos dados - Movies
movies_spark_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [0]:
arquivo = "dbfs:/databricks-datasets/cs100/lab4/data-001/ratings.dat.gz"

# Ler o arquivo com o delimitador "::"
ratings_spark_df = spark.read.option("delimiter", "::").csv(arquivo, inferSchema=True, header=False)

# Renomear colunas para um formato mais claro
ratings_spark_df = ratings_spark_df.withColumnRenamed("_c0", "userId") \
       .withColumnRenamed("_c1", "movieId") \
       .withColumnRenamed("_c2", "rating") \
       .withColumnRenamed("_c3", "timestamp")

# Exibir as 5 primeiras linhas
ratings_spark_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1193|     5|978300760|
|     1|    661|     3|978302109|
|     1|    914|     3|978301968|
|     1|   3408|     4|978300275|
|     1|   2355|     5|978824291|
+------+-------+------+---------+
only showing top 5 rows



In [0]:
# Tipo dos dados - Ratings
ratings_spark_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)



In [0]:
%sql DROP DATABASE bronze CASCADE

In [0]:
%sql CREATE DATABASE bronze;

In [0]:
movies_spark_df.write.mode("overwrite").saveAsTable("bronze.movies")

In [0]:
ratings_spark_df.write.mode("overwrite").saveAsTable("bronze.ratings")

In [0]:
%sql SELECT * FROM bronze.movies LIMIT 10

movieId,title,genres
1,Toy Story (1995),Animation|Children's|Comedy
2,Jumanji (1995),Adventure|Children's|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children's
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [0]:
%sql SELECT * FROM bronze.ratings LIMIT 10

userId,movieId,rating,timestamp
1,1193,5,978300760
1,661,3,978302109
1,914,3,978301968
1,3408,4,978300275
1,2355,5,978824291
1,1197,3,978302268
1,1287,5,978302039
1,2804,5,978300719
1,594,4,978302268
1,919,4,978301368


In [0]:
%sql DROP DATABASE silver CASCADE

In [0]:
%sql CREATE DATABASE silver;

In [0]:
movies_spark_df.write.mode("overwrite").saveAsTable("silver.movies")

In [0]:
ratings_spark_df.write.mode("overwrite").saveAsTable("silver.ratings")

In [0]:
%sql
SELECT 
    COUNT(CASE WHEN movieId IS NULL THEN 1 END) AS null_count_movieId,
    COUNT(CASE WHEN title IS NULL THEN 1 END) AS null_count_title,
    COUNT(CASE WHEN genres IS NULL THEN 1 END) AS null_count_genres
FROM silver.movies;

null_count_movieId,null_count_title,null_count_genres
0,0,0


In [0]:
%sql
SELECT 
    COUNT(CASE WHEN userId IS NULL THEN 1 END) AS null_count_userId,
    COUNT(CASE WHEN movieId IS NULL THEN 1 END) AS null_count_movieId,
    COUNT(CASE WHEN rating IS NULL THEN 1 END) AS null_count_rating,
    COUNT(CASE WHEN timestamp IS NULL THEN 1 END) AS null_count_timestamp
FROM silver.ratings;

null_count_userId,null_count_movieId,null_count_rating,null_count_timestamp
0,0,0,0


In [0]:
%sql DROP DATABASE gold CASCADE

In [0]:
%sql CREATE DATABASE gold;

In [0]:
movies_spark_df.write.mode("overwrite").saveAsTable("gold.movies")

In [0]:
ratings_spark_df.write.mode("overwrite").saveAsTable("gold.ratings")

In [0]:
# Realizar o JOIN usando a coluna "movieId" e ordenar
movies_ratings_spark_df = ratings_spark_df.join(movies_spark_df, on="movieId", how="inner") \
                              .orderBy("movieId")

# Mostrar os primeiros registros
movies_ratings_spark_df.show(10)

+-------+------+------+---------+----------------+--------------------+
|movieId|userId|rating|timestamp|           title|              genres|
+-------+------+------+---------+----------------+--------------------+
|      1|    21|     3|978139347|Toy Story (1995)|Animation|Childre...|
|      1|    10|     5|978226474|Toy Story (1995)|Animation|Childre...|
|      1|    23|     4|978463614|Toy Story (1995)|Animation|Childre...|
|      1|     6|     4|978237008|Toy Story (1995)|Animation|Childre...|
|      1|    26|     3|978130703|Toy Story (1995)|Animation|Childre...|
|      1|     8|     4|978233496|Toy Story (1995)|Animation|Childre...|
|      1|    28|     3|978985309|Toy Story (1995)|Animation|Childre...|
|      1|    19|     5|978555994|Toy Story (1995)|Animation|Childre...|
|      1|    34|     5|978102970|Toy Story (1995)|Animation|Childre...|
|      1|    38|     5|978046225|Toy Story (1995)|Animation|Childre...|
+-------+------+------+---------+----------------+--------------

In [0]:
from pyspark.sql.functions import from_unixtime, col
movies_ratings_spark_df = movies_ratings_spark_df.withColumn("timestamp", from_unixtime(col("timestamp")))

In [0]:
movies_ratings_spark_df.write.mode("overwrite").saveAsTable("gold.movies_ratings")

In [0]:
%sql SELECT * FROM gold.movies_ratings LIMIT 100

movieId,userId,rating,timestamp,title,genres
1,1,5,2001-01-06 23:37:48,Toy Story (1995),Animation|Children's|Comedy
1,6,4,2000-12-31 04:30:08,Toy Story (1995),Animation|Children's|Comedy
1,8,4,2000-12-31 03:31:36,Toy Story (1995),Animation|Children's|Comedy
1,9,5,2000-12-31 01:25:52,Toy Story (1995),Animation|Children's|Comedy
1,10,5,2000-12-31 01:34:34,Toy Story (1995),Animation|Children's|Comedy
1,18,4,2000-12-30 05:39:28,Toy Story (1995),Animation|Children's|Comedy
1,19,5,2001-01-03 21:06:34,Toy Story (1995),Animation|Children's|Comedy
1,21,3,2000-12-30 01:22:27,Toy Story (1995),Animation|Children's|Comedy
1,23,4,2001-01-02 19:26:54,Toy Story (1995),Animation|Children's|Comedy
1,26,3,2000-12-29 22:58:23,Toy Story (1995),Animation|Children's|Comedy


In [0]:
%sql
SELECT title, COUNT(*) AS cont_rating
FROM gold.movies_ratings
GROUP BY title
ORDER BY cont_rating DESC
LIMIT 5;


title,cont_rating
American Beauty (1999),1775
Star Wars: Episode IV - A New Hope (1977),1447
Star Wars: Episode V - The Empire Strikes Back (1980),1438
Jurassic Park (1993),1423
Star Wars: Episode VI - Return of the Jedi (1983),1390


In [0]:
%sql
SELECT title, AVG(rating) AS avg_rating
FROM gold.movies_ratings
GROUP BY title
ORDER BY avg_rating DESC
LIMIT 3;

title,avg_rating
Bittersweet Motel (2000),5.0
"Baby, The (1973)",5.0
Those Who Love Me Can Take the Train (Ceux qui m'aiment prendront le train) (1998),5.0


In [0]:
%sql
SELECT genres, AVG(rating) AS avg_rating
FROM gold.movies_ratings
GROUP BY genres
ORDER BY avg_rating DESC
LIMIT 1;

genres,avg_rating
Sci-Fi|War,4.45468509984639


In [0]:
%sql
SELECT COUNT(DISTINCT title) AS total_movies
FROM gold.movies_ratings;

total_movies
3615


In [0]:
%sql
SELECT COUNT(DISTINCT userId) AS total_users
FROM gold.movies_ratings;

total_users
2999


In [0]:
%sql
SELECT YEAR(timestamp) AS year, COUNT(*) AS total_ratings
FROM gold.movies_ratings
GROUP BY YEAR(timestamp)
ORDER BY year;

year,total_ratings
2000,424406
2001,47080
2002,14894
2003,1270


In [0]:
%sql
SELECT title, AVG(rating) AS avg_rating
FROM gold.movies_ratings
GROUP BY title
ORDER BY avg_rating, title ASC
LIMIT 10;

title,avg_rating
Autopsy (Macchie Solari) (1975),1.0
Better Living (1998),1.0
"Big Squeeze, The (1996)",1.0
"Blood Spattered Bride, The (La Novia Ensangrentada) (1972)",1.0
"Bloody Child, The (1996)",1.0
Brenda Starr (1989),1.0
Carnosaur 3: Primal Species (1996),1.0
Cheetah (1989),1.0
Diebinnen (1995),1.0
Frogs for Snakes (1998),1.0
