In [1]:
%%bash
if command -v uv > /dev/null; then
    uv add --upgrade "dask[dataframe]"
    uv add --upgrade "dask[distributed]"
    uv add pandas
    uv add numpy
    uv add plotly
else
    pip install --upgrade "dask[dataframe]"
    pip install --upgrade "dask[distributed]"
    pip install pandas
    pip install numpy
    pip install plotly
fi

[2mResolved [1m61 packages[0m [2min 385ms[0m[0m
[2mAudited [1m51 packages[0m [2min 0.06ms[0m[0m
[2mResolved [1m61 packages[0m [2min 189ms[0m[0m
[2mAudited [1m51 packages[0m [2min 0.04ms[0m[0m
[2mResolved [1m61 packages[0m [2min 2ms[0m[0m
[2mAudited [1m51 packages[0m [2min 0.07ms[0m[0m
[2mResolved [1m61 packages[0m [2min 0.85ms[0m[0m
[2mAudited [1m51 packages[0m [2min 0.03ms[0m[0m
[2mResolved [1m61 packages[0m [2min 0.63ms[0m[0m
[2mAudited [1m51 packages[0m [2min 0.04ms[0m[0m


In [2]:
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client, LocalCluster

# Existe integração com pandas, então customização da configuração do pandas
# também funciona aqui

# Não limitar a largura das colunas apresentadas
pd.options.display.max_colwidth = None
# Não usar a notação científica (ex: 6.125000e-02) e usar 6 casas decimais
# (ex: 0.061250)
pd.options.display.float_format = "{:.6f}".format
# Não utilizar matplotlib como engine de gráficos e usar plotly
pd.options.plotting.backend = "plotly"

In [3]:
# Criar cluster local anexado ao kernel do notebook
cluster = LocalCluster()
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 31.09 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:44139,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 31.09 GiB

0,1
Comm: tcp://127.0.0.1:43289,Total threads: 4
Dashboard: http://127.0.0.1:35753/status,Memory: 7.77 GiB
Nanny: tcp://127.0.0.1:46093,
Local directory: /tmp/dask-scratch-space/worker-mpp8bf0x,Local directory: /tmp/dask-scratch-space/worker-mpp8bf0x

0,1
Comm: tcp://127.0.0.1:35381,Total threads: 4
Dashboard: http://127.0.0.1:46803/status,Memory: 7.77 GiB
Nanny: tcp://127.0.0.1:35535,
Local directory: /tmp/dask-scratch-space/worker-3ycqnl01,Local directory: /tmp/dask-scratch-space/worker-3ycqnl01

0,1
Comm: tcp://127.0.0.1:32867,Total threads: 4
Dashboard: http://127.0.0.1:39589/status,Memory: 7.77 GiB
Nanny: tcp://127.0.0.1:38061,
Local directory: /tmp/dask-scratch-space/worker-tqxuf9in,Local directory: /tmp/dask-scratch-space/worker-tqxuf9in

0,1
Comm: tcp://127.0.0.1:44111,Total threads: 4
Dashboard: http://127.0.0.1:39459/status,Memory: 7.77 GiB
Nanny: tcp://127.0.0.1:33861,
Local directory: /tmp/dask-scratch-space/worker-hhfawug9,Local directory: /tmp/dask-scratch-space/worker-hhfawug9


In [4]:
ROOT_DATA_PATH = "/home/mgrb/Workspaces/CESAR.School/big_data/data/ml-25m"

In [5]:
movies_df = dd.read_csv(f"{ROOT_DATA_PATH}/movies.csv")
movies_df

Unnamed: 0_level_0,movieId,title,genres
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,int64,string,string
,...,...,...


In [6]:
movies_df.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [7]:
len(movies_df)

62423

In [8]:
tags_df = dd.read_csv(f"{ROOT_DATA_PATH}/tags.csv")
tags_df

Unnamed: 0_level_0,userId,movieId,tag,timestamp
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,int64,string,int64
,...,...,...,...


In [9]:
tags_df.head()

Unnamed: 0,userId,movieId,tag,timestamp
0,3,260,classic,1439472355
1,3,260,sci-fi,1439472256
2,4,1732,dark comedy,1573943598
3,4,1732,great dialogue,1573943604
4,4,7569,so bad it's good,1573943455


In [10]:
len(tags_df)

1093360

In [11]:
# Converter os valores da coluna 'timestamp' de epoch em segundos para datetime
tags_df["timestamp"] = dd.to_datetime(tags_df["timestamp"], unit="s")
tags_df.head()

Unnamed: 0,userId,movieId,tag,timestamp
0,3,260,classic,2015-08-13 13:25:55
1,3,260,sci-fi,2015-08-13 13:24:16
2,4,1732,dark comedy,2019-11-16 22:33:18
3,4,1732,great dialogue,2019-11-16 22:33:24
4,4,7569,so bad it's good,2019-11-16 22:30:55


In [12]:
ratings_df = dd.read_csv(f"{ROOT_DATA_PATH}/ratings.csv")
ratings_df

Unnamed: 0_level_0,userId,movieId,rating,timestamp
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,int64,float64,int64
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


In [13]:
ratings_df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,296,5.0,1147880044
1,1,306,3.5,1147868817
2,1,307,5.0,1147868828
3,1,665,5.0,1147878820
4,1,899,3.5,1147868510


In [14]:
# Converter os valores da coluna 'timestamp' de epoch em segundos para datetime
ratings_df["timestamp"] = dd.to_datetime(ratings_df["timestamp"], unit="s")
ratings_df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,296,5.0,2006-05-17 15:34:04
1,1,306,3.5,2006-05-17 12:26:57
2,1,307,5.0,2006-05-17 12:27:08
3,1,665,5.0,2006-05-17 15:13:40
4,1,899,3.5,2006-05-17 12:21:50


In [15]:
%%time

len(ratings_df)

CPU times: user 68.4 ms, sys: 10.2 ms, total: 78.6 ms
Wall time: 1.01 s


25000095

In [16]:
links_df = dd.read_csv(
    f"{ROOT_DATA_PATH}/links.csv",
    dtype={"movieId": int, "imdbId": int},
    assume_missing=True,
)
links_df

Unnamed: 0_level_0,movieId,imdbId,tmdbId
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,int64,int64,float64
,...,...,...


In [17]:
links_df.head()

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862.0
1,2,113497,8844.0
2,3,113228,15602.0
3,4,114885,31357.0
4,5,113041,11862.0


In [18]:
len(links_df)

62423

In [19]:
links_df.isnull().sum().compute()

movieId      0
imdbId       0
tmdbId     107
dtype: int64

In [20]:
gtags_df = dd.read_csv(f"{ROOT_DATA_PATH}/genome-tags.csv")
gtags_df

Unnamed: 0_level_0,tagId,tag
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,int64,string
,...,...


In [21]:
gtags_df.head()

Unnamed: 0,tagId,tag
0,1,007
1,2,007 (series)
2,3,18th century
3,4,1920s
4,5,1930s


In [22]:
len(gtags_df)

1128

In [23]:
gscores_df = dd.read_csv(f"{ROOT_DATA_PATH}/genome-scores.csv")
gscores_df

Unnamed: 0_level_0,movieId,tagId,relevance
npartitions=6,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,int64,int64,float64
,...,...,...
...,...,...,...
,...,...,...
,...,...,...


In [24]:
gscores_df.head()

Unnamed: 0,movieId,tagId,relevance
0,1,1,0.02875
1,1,2,0.02375
2,1,3,0.0625
3,1,4,0.07575
4,1,5,0.14075


In [25]:
%%time

len(gscores_df)

CPU times: user 28.4 ms, sys: 7.12 ms, total: 35.5 ms
Wall time: 577 ms


15584448

## Executando merde de Dados


In [26]:
movies_df.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [27]:
links_df.head()

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862.0
1,2,113497,8844.0
2,3,113228,15602.0
3,4,114885,31357.0
4,5,113041,11862.0


In [28]:
# Calculando total de linhas
len(movies_df)

62423

In [29]:
len(links_df)

62423

In [30]:
# Criando operação de merge entre filmes e links
merged_df = dd.merge(movies_df, links_df, on="movieId", how="inner")
merged_df

Unnamed: 0_level_0,movieId,title,genres,imdbId,tmdbId
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,int64,string,string,int64,float64
,...,...,...,...,...


In [31]:
merged_df.head()

Unnamed: 0,movieId,title,genres,imdbId,tmdbId
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,114709,862.0
1,2,Jumanji (1995),Adventure|Children|Fantasy,113497,8844.0
2,3,Grumpier Old Men (1995),Comedy|Romance,113228,15602.0
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance,114885,31357.0
4,5,Father of the Bride Part II (1995),Comedy,113041,11862.0


In [32]:
len(merged_df)

62423

In [33]:
# Uma forma alternativa de fazer merge seria utilizando um dos dataframes
movies_df = movies_df.merge(links_df, on="movieId", how="inner")
movies_df

Unnamed: 0_level_0,movieId,title,genres,imdbId,tmdbId
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,int64,string,string,int64,float64
,...,...,...,...,...


In [34]:
movies_df.head()

Unnamed: 0,movieId,title,genres,imdbId,tmdbId
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,114709,862.0
1,2,Jumanji (1995),Adventure|Children|Fantasy,113497,8844.0
2,3,Grumpier Old Men (1995),Comedy|Romance,113228,15602.0
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance,114885,31357.0
4,5,Father of the Bride Part II (1995),Comedy,113041,11862.0


In [35]:
len(movies_df)

62423

## Quais são os top 10 fimes mais avaliados?


In [36]:
%%time

df_top10 = (
    ratings_df.groupby("movieId")\
        ["rating"]\
        .count()\
        .rename('ratings_count')\
        .nlargest(10)\
        .persist()
)

CPU times: user 10.8 ms, sys: 45 μs, total: 10.9 ms
Wall time: 10.3 ms


In [37]:
df_top10

Dask Series Structure:
npartitions=1
    int64
      ...
Dask Name: nlargest-tree, 1 expression
Expr=FromGraph(f918aba)

In [38]:
%%time

df_top10.compute()

CPU times: user 90.6 ms, sys: 15 ms, total: 106 ms
Wall time: 1.08 s


movieId
356     81491
318     81482
296     79672
593     74127
2571    72674
260     68717
480     64144
527     60411
110     59184
2959    58773
Name: ratings_count, dtype: int64

In [39]:
movies_df[["movieId", "title"]].compute()

Unnamed: 0,movieId,title
0,1,Toy Story (1995)
1,2,Jumanji (1995)
2,3,Grumpier Old Men (1995)
3,4,Waiting to Exhale (1995)
4,5,Father of the Bride Part II (1995)
...,...,...
62418,209157,We (2018)
62419,209159,Window of the Soul (2001)
62420,209163,Bad Poems (2018)
62421,209169,A Girl Thing (2001)


In [40]:
# Fazendo o merge da pior maneira possível

df_top10.compute()\
    .reset_index()\
    .merge(
        movies_df[['movieId', 'title']].compute(), # Atenção, essa linha pode estourar a memória do client
        on='movieId',
        how='inner',
    )\
    .set_index('movieId')

Unnamed: 0_level_0,ratings_count,title
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
356,81491,Forrest Gump (1994)
318,81482,"Shawshank Redemption, The (1994)"
296,79672,Pulp Fiction (1994)
593,74127,"Silence of the Lambs, The (1991)"
2571,72674,"Matrix, The (1999)"
260,68717,Star Wars: Episode IV - A New Hope (1977)
480,64144,Jurassic Park (1993)
527,60411,Schindler's List (1993)
110,59184,Braveheart (1995)
2959,58773,Fight Club (1999)


In [41]:
# Solução alternativa mais segura porque o merge acontece de maneira distribuída no cluster
# Assim, somente o resultado final que terá 10 linhas será enviado para o client

df = movies_df[['movieId', 'title']].merge(
    df_top10.reset_index(),
    on='movieId',
    how='inner',
).sort_values(by='ratings_count', ascending=False)\
    .compute()\
    .set_index('movieId') # O set_index do Dask Dataframe ordena os dados por padrão, por isso usei o do pandas
df

Unnamed: 0_level_0,title,ratings_count
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
356,Forrest Gump (1994),81491
318,"Shawshank Redemption, The (1994)",81482
296,Pulp Fiction (1994),79672
593,"Silence of the Lambs, The (1991)",74127
2571,"Matrix, The (1999)",72674
260,Star Wars: Episode IV - A New Hope (1977),68717
480,Jurassic Park (1993),64144
527,Schindler's List (1993),60411
110,Braveheart (1995),59184
2959,Fight Club (1999),58773


In [43]:
# Também é possível fazer merge de DataFrame Dask com DataFrame Pandas

result_df = movies_df[['movieId', 'title']].merge(
    df.reset_index(), # Esse DataFrame é Pandas
    on='movieId',
    how='inner',
)
result_df.compute()

Unnamed: 0,movieId,title_x,title_y,ratings_count
0,110,Braveheart (1995),Braveheart (1995),59184
1,260,Star Wars: Episode IV - A New Hope (1977),Star Wars: Episode IV - A New Hope (1977),68717
2,296,Pulp Fiction (1994),Pulp Fiction (1994),79672
3,318,"Shawshank Redemption, The (1994)","Shawshank Redemption, The (1994)",81482
4,356,Forrest Gump (1994),Forrest Gump (1994),81491
5,480,Jurassic Park (1993),Jurassic Park (1993),64144
6,527,Schindler's List (1993),Schindler's List (1993),60411
7,593,"Silence of the Lambs, The (1991)","Silence of the Lambs, The (1991)",74127
8,2571,"Matrix, The (1999)","Matrix, The (1999)",72674
9,2959,Fight Club (1999),Fight Club (1999),58773


In [55]:
# Testando com left
result_df = df.reset_index().merge(
    movies_df[['movieId', 'title']].compute(),
    on='movieId',
    how='left',
)
result_df

Unnamed: 0,movieId,title_x,ratings_count,title_y
0,356,Forrest Gump (1994),81491,Forrest Gump (1994)
1,318,"Shawshank Redemption, The (1994)",81482,"Shawshank Redemption, The (1994)"
2,296,Pulp Fiction (1994),79672,Pulp Fiction (1994)
3,593,"Silence of the Lambs, The (1991)",74127,"Silence of the Lambs, The (1991)"
4,2571,"Matrix, The (1999)",72674,"Matrix, The (1999)"
5,260,Star Wars: Episode IV - A New Hope (1977),68717,Star Wars: Episode IV - A New Hope (1977)
6,480,Jurassic Park (1993),64144,Jurassic Park (1993)
7,527,Schindler's List (1993),60411,Schindler's List (1993)
8,110,Braveheart (1995),59184,Braveheart (1995)
9,2959,Fight Club (1999),58773,Fight Club (1999)


In [56]:
ratings_df.dtypes

userId              float64
movieId             float64
rating              float64
timestamp    datetime64[ns]
dtype: object

# Atividade Turma


## Quais são os top 10 filmes com maior total da soma das avaliações?

In [None]:
print("Fim da execução")

In [None]:
client.close()
cluster.close()