# 📊 Pipeline de Análise do MovieLens com DuckDB + DagsHub

Este projeto demonstra como construir um pipeline de dados **generalizável** para análise exploratória do dataset **MovieLens 10M**, utilizando:

- [DuckDB](https://duckdb.org/) para consultas SQL rápidas em arquivos locais ou remotos.
- [DagsHub](https://dagshub.com/) como repositório de dados e código, permitindo acesso direto via HTTP.
- Conversão dos arquivos originais `.dat` para o formato **Parquet**, mais eficiente e portátil.

---

## 🚀 Etapas do Pipeline

### 1. Extração dos dados (`.dat`)
- Os arquivos originais (`ratings.dat`, `movies.dat`, `tags.dat`) são lidos **diretamente da URL do DagsHub**.
- O DuckDB é usado com `read_csv_auto` para interpretar os arquivos delimitados por `::`.
- Parâmetros como `ignore_errors=true` e `quote=''` foram usados para lidar com linhas problemáticas e aspas malformadas.

### 2. Transformação para Parquet
- Após a leitura, os DataFrames são convertidos e salvos em **arquivos Parquet**:
  - `ratings.parquet`
  - `movies.parquet`
  - `tags.parquet`
- O Parquet é colunar, ocupa menos espaço e é muito mais rápido para consultas.

### 3. Upload para o DagsHub
- Os Parquets gerados são enviados de volta para o repositório remoto no DagsHub com:
  ```bash
  !dagshub upload Matheuskcode/Big-Data-Found "ratings.parquet" data/ratings.parquet --update
  !dagshub upload Matheuskcode/Big-Data-Found "movies.parquet" data/movies.parquet --update
  !dagshub upload Matheuskcode/Big-Data-Found "tags.parquet" data/tags.parquet --update

  

4. Consultas diretas nos Parquets remotos
- O DuckDB acessa os arquivos Parquet direto do link do DagsHub, sem precisar baixá-los manualmente.
- São criadas views (ratings, movies, tags) a partir dos Parquets remotos.
5. Análise Exploratória
Consultas SQL realizadas:
- Contagem de registros em cada tabela.
- Distribuição de notas.
- Top 10 filmes mais avaliados.
- Top 10 filmes com melhor média (mínimo 100 avaliações).
- Popularidade por gênero.
- Usuários mais ativos.
- Tags mais frequentes.

📂 Estrutura dos Dados
- ratings: userId, movieId, rating, timestamp
- movies: movieId, title, genres
- tags: userId, movieId, tag, timestamp

🖥️ Como Rodar as Consultas Online
Você pode rodar as consultas diretamente nos arquivos Parquet hospedados no DagsHub, sem precisar baixar nada.
Exemplo em Python


import duckdb

base_url = "https://dagshub.com/Matheuskcode/Big-Data-Found/raw/main/data"

files = {
    "ratings": f"{base_url}/ratings.parquet",
    "movies": f"{base_url}/movies.parquet",
    "tags": f"{base_url}/tags.parquet"
}

con = duckdb.connect()

con.execute(f"CREATE OR REPLACE VIEW ratings AS SELECT * FROM parquet_scan('{files['ratings']}')")
con.execute(f"CREATE OR REPLACE VIEW movies  AS SELECT * FROM parquet_scan('{files['movies']}')")
con.execute(f"CREATE OR REPLACE VIEW tags    AS SELECT * FROM parquet_scan('{files['tags']}')")

df = con.execute("""
    SELECT m.title, COUNT(r.rating) as n_ratings, AVG(r.rating) as avg_rating
    FROM ratings r
    JOIN movies m ON r.movieId = m.movieId
    GROUP BY m.title
    ORDER BY n_ratings DESC
    LIMIT 5
""").df()

print(df)

✅ Benefícios do Pipeline
• 	Generalizável: qualquer pessoa pode rodar em qualquer computador, basta ter Python + DuckDB.
• 	Sem download manual: os dados são lidos direto do DagsHub via HTTP.
• 	Eficiente: uso de Parquet acelera consultas e economiza espaço.
• 	Exploratório: consultas SQL permitem análises rápidas e flexíveis.



---

### 💻 Código Python (pipeline completo)

```python
import duckdb
import pandas as pd

# --- 1. Definir URLs dos arquivos no DagsHub (.dat) ---
base_url = "https://dagshub.com/Matheuskcode/Big-Data-Found/raw/main/data"

files = {
    "ratings": f"{base_url}/ratings.dat",
    "movies": f"{base_url}/movies.dat",
    "tags": f"{base_url}/tags.dat"
}

con = duckdb.connect()

# --- 2. Ler arquivos .dat direto da URL ---
ratings = con.execute(f"""
    SELECT * FROM read_csv_auto('{files['ratings']}',
        delim='::',
        columns={{'userId':'BIGINT','movieId':'BIGINT','rating':'DOUBLE','timestamp':'BIGINT'}},
        ignore_errors=true,
        quote=''
    )
""").df()

movies = con.execute(f"""
    SELECT * FROM read_csv_auto('{files['movies']}',
        delim='::',
        columns={{'movieId':'BIGINT','title':'VARCHAR','genres':'VARCHAR'}},
        ignore_errors=true,
        quote=''
    )
""").df()

tags = con.execute(f"""
    SELECT * FROM read_csv_auto('{files['tags']}',
        delim='::',
        columns={{'userId':'BIGINT','movieId':'BIGINT','tag':'VARCHAR','timestamp':'BIGINT'}},
        ignore_errors=true,
        quote=''
    )
""").df()

# --- 3. Salvar em Parquet ---
ratings.to_parquet("ratings.parquet", index=False)
movies.to_parquet("movies.parquet", index=False)
tags.to_parquet("tags.parquet", index=False)

# --- 4. Upload para DagsHub (executar em notebook com dagshub-cli instalado) ---
# !dagshub upload Matheuskcode/Big-Data-Found "ratings.parquet" data/ratings.parquet --update
# !dagshub upload Matheuskcode/Big-Data-Found "movies.parquet" data/movies.parquet --update
# !dagshub upload Matheuskcode/Big-Data-Found "tags.parquet" data/tags.parquet --update

# --- 5. Consultas diretas nos Parquets remotos ---
files_parquet = {
    "ratings": f"{base_url}/ratings.parquet",
    "movies": f"{base_url}/movies.parquet",
    "tags": f"{base_url}/tags.parquet"
}

con.execute(f"CREATE OR REPLACE VIEW ratings AS SELECT * FROM parquet_scan('{files_parquet['ratings']}')")
con.execute(f"CREATE OR REPLACE VIEW movies  AS SELECT * FROM parquet_scan('{files_parquet['movies']}')")
con.execute(f"CREATE OR REPLACE VIEW tags    AS SELECT * FROM parquet_scan('{files_parquet['tags']}')")

# --- 6. Análises Exploratórias ---
print("Tamanhos:")
print(con.execute("SELECT COUNT(*) AS n_ratings FROM ratings").df())
print(con.execute("SELECT COUNT(*) AS n_movies FROM movies").df())
print(con.execute("SELECT COUNT(*) AS n_tags FROM tags").df())

print("\nDistribuição de notas:")
print(con.execute("""
    SELECT rating, COUNT(*) as freq
    FROM ratings
    GROUP BY rating
    ORDER BY rating
""").df())

print("\nTop 10 filmes mais avaliados:")
print(con.execute("""
    SELECT m.title, COUNT(r.rating) as n_ratings, AVG(r.rating) as avg_rating
    FROM ratings r
    JOIN movies m ON r.movieId = m.movieId
    GROUP BY m.title
    ORDER BY n_ratings DESC
    LIMIT 10
""").df())

print("\nTop 10 filmes com melhor média (>=100 avaliações):")
print(con.execute("""
    SELECT m.title, COUNT(r.rating) as n_ratings, AVG(r.rating) as avg_rating
    FROM ratings r
    JOIN movies m ON r.movieId = m.movieId
    GROUP BY m.title
    HAVING COUNT(r.rating) >= 100
    ORDER BY avg_rating DESC
    LIMIT 10
""").df())

print("\nPopularidade por gênero:")
print(con.execute("""
    SELECT genre, COUNT(*) as n_movies
    FROM (
        SELECT movieId, UNNEST(STRING_SPLIT(genres, '|')) as genre
        FROM movies
    )
    GROUP BY genre
    ORDER BY n_movies DESC
""").df())

print("\nTop 10 usuários mais ativos:")
print(con.execute("""
    SELECT userId, COUNT(*) as n_ratings, AVG(rating) as avg_rating
    FROM ratings
    GROUP BY userId
    ORDER BY n_ratings DESC
    LIMIT 10
""").df())

print("\nTop 10 tags mais usadas:")
print(con.execute("""
    SELECT LOWER(tag) as tag, COUNT(*) as freq
    FROM tags
    GROUP