<a href="https://colab.research.google.com/github/EngSivaldo/-Integra-o-Python---E-mail-/blob/main/trabalho_big_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Comandos para realização do trabalho da matéria de Big Data com uso da biblioteca PySpark.

Este notebook foi projetado para guiar os alunos na realização das práticas de Big Data utilizando PySpark. Certifique-se de seguir cada etapa cuidadosamente para garantir a correta execução das atividades.

Seu trabalho começará na célula 5. Execute as 4 primeiras células para iniciar a atividade.

## <font color=red>Observação importante:</font>

<font color=yellow>Trabalho realizado com uso da biblioteca pandas não será aceito!</font>

## Upload do arquivo `imdb-reviews-pt-br.csv` para dentro do Google Colab

Aqui, você fará o download do dataset necessário para as atividades. Certifique-se de que o arquivo foi descompactado corretamente antes de prosseguir.

In [58]:
!wget https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip -O imdb-reviews-pt-br.zip
!unzip imdb-reviews-pt-br.zip
!rm imdb-reviews-pt-br.zip


--2025-05-18 23:09:59--  https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49549692 (47M) [application/zip]
Saving to: ‘imdb-reviews-pt-br.zip’


2025-05-18 23:10:00 (246 MB/s) - ‘imdb-reviews-pt-br.zip’ saved [49549692/49549692]

Archive:  imdb-reviews-pt-br.zip
replace imdb-reviews-pt-br.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

## Instalação manual das dependências para uso do pyspark no Google Colab

Esta etapa garante que todas as bibliotecas necessárias para o PySpark sejam instaladas no Google Colab.

In [None]:
!pip install pyspark

## Importar, instanciar e criar a SparkSession

A SparkSession é o ponto de entrada para usar o PySpark. Certifique-se de configurar corretamente o nome do aplicativo e o master.

In [59]:
from pyspark.sql import SparkSession

appName = "PySpark Trabalho de Big Data"
master = "local"

spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

## Criar spark dataframe do CSV utilizando o método read.csv do spark

Não altere este código e use o dataframe imdb_df criado aqui em todo o seu trabalho. A criação de um dataframe diferente deste poderá causar erros na coluna sentiment e isso refletirá em erros de resposta das questões.

In [60]:
imdb_df = spark.read.csv('imdb-reviews-pt-br.csv',
                         header=True,
                         quote="\"",
                         escape="\"",
                         encoding="UTF-8")

# Questão 1

Nesta questão, você irá calcular a soma dos IDs para entradas onde o sentimento ('sentiment') é 'neg'.

### Objetivo:
- Usar a coluna 'sentiment' como chave e somar os valores da coluna 'id'.

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e o "id" como valor do tipo inteiro

A função map irá transformar cada linha do dataframe em uma **tupla** (chave-valor), onde:
- Chave: coluna 'sentiment'
- Valor: coluna 'id' convertida para inteiro.

In [61]:
# 📌 Informações do aluno
ru = "4139872"
nome = "Sivaldo"

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# 📦 INICIANDO ANÁLISE DA ESTRUTURA DO DATAFRAME...
print("\n📦 INICIANDO ANÁLISE DA ESTRUTURA DO DATAFRAME...\n")
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
print("🔍 COLUNAS DISPONÍVEIS E TIPOS DE DADOS NO DATAFRAME:")
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")

# Exibir o schema do DataFrame
imdb_df.printSchema()

print("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
print("📌 INTERPRETAÇÃO:")
print("  • 'id'         → identificador do registro (string)")
print("  • 'text_en'    → texto da review em inglês")
print("  • 'text_pt'    → texto da review em português")
print("  • 'sentiment'  → sentimento da review (pos ou neg)")
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")

# 📌 Removendo registros com 'id' nulo ou vazio
df_valid = imdb_df.filter((col("id").isNotNull()) & (col("id") != ""))

# 🔁 Convertendo coluna 'id' para inteiro
df_valid = df_valid.withColumn("id_int", col("id").cast(IntegerType()))

# 🔍 Filtrando apenas sentimentos negativos com IDs válidos
df_neg = df_valid.filter((col("sentiment") == "neg") & col("id_int").isNotNull())

# ✅ Soma com Spark SQL (forma segura)
soma_sql = df_neg.agg({"id_int": "sum"}).collect()[0][0]

# Função MAP para o RDD
def map1(row):
    try:
        return (row["sentiment"], int(row["id"]))
    except:
        return None

# Função REDUCE para o RDD
def reduceByKey1(x, y):
    return x + y

# Aplicando map/reduce com RDD
resultado_rdd = imdb_df.rdd.map(map1).filter(lambda x: x is not None and x[0] == "neg").reduceByKey(reduceByKey1).collect()
soma_rdd = [v for k, v in resultado_rdd if k == "neg"][0]

# 🎯 Exibição final com layout profissional
print("📦 Dados processados com sucesso!\n")
print("🔎 Tipo de análise: Soma dos IDs para sentimentos negativos")
print(f"🧮 Método SQL → Resultado: {soma_sql}")
print(f"🧮 Método RDD (map/reduce) → Resultado: {soma_rdd}")

if soma_sql == soma_rdd:
    print("\n📈 Conclusão: Ambos os métodos retornaram o mesmo valor.")
    print("✅ A soma foi validada com sucesso!\n")
else:
    print("\n⚠️ ATENÇÃO: As somas não coincidem. Verifique os dados!")

print("📍 Informações do participante:")
print(f"   - Nome: {nome}")
print(f"   - Registro Universitário (RU): {ru}")

print("\n📊 Resultado Final:")
print(f"   A soma total dos IDs associados ao sentimento 'neg' é **{soma_sql:,.0f}**.")



📦 INICIANDO ANÁLISE DA ESTRUTURA DO DATAFRAME...

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🔍 COLUNAS DISPONÍVEIS E TIPOS DE DADOS NO DATAFRAME:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

root
 |-- id: string (nullable = true)
 |-- text_en: string (nullable = true)
 |-- text_pt: string (nullable = true)
 |-- sentiment: string (nullable = true)


━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📌 INTERPRETAÇÃO:
  • 'id'         → identificador do registro (string)
  • 'text_en'    → texto da review em inglês
  • 'text_pt'    → texto da review em português
  • 'sentiment'  → sentimento da review (pos ou neg)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

📦 Dados processados com sucesso!

🔎 Tipo de análise: Soma dos IDs para sentimentos negativos
🧮 Método SQL → Resultado: 459568555
🧮 Método RDD (map/reduce) → Resultado: 459568555

📈 Conclusão: Ambos os métodos retornaram o mesmo valor.
✅ A soma foi validada com sucesso!

📍 Informações do participante:
   - Nome: Sivaldo
   -

## Cria funções de REDUCE:

- Criar função de reduce para somar os IDs por "sentiment".

A função reduce irá somar os valores dos IDs agrupados por chave ('sentiment').

In [None]:
def reduceByKey1(x,y):
  # Coloque aqui o seu código para retornar o resultado necessário.
  # x e y são valores que serão somados, pois o reduceByKey receberá
  # apenas o segundo elemento da tupla vinda da saída da função map.
  # Apague a linha abaixo para iniciar seu código.
  pass

## Aplicação do map/reduce e visualização do resultado

Aqui, você aplicará as funções de map e reduce ao dataframe Spark para calcular os resultados. Não se esqueça de usar o método `.collect()` para visualizar os resultados.

In [None]:
# Linha de código para aplicar o map/reduce no seu dataframe spark
resultado = imdb_df.rdd.map(map1).reduceByKey(reduceByKey1).collect()
# Coloque aqui o código para imprimir o resultado. Não esqueça seu RU:


# Questão 2:

Nesta questão, você irá calcular a diferença no número total de palavras entre textos negativos em português e inglês.

### Objetivo:
- Contar as palavras em cada idioma (colunas 'text_pt' e 'text_en') para entradas onde o sentimento ('sentiment') é 'neg'.
- Subtrair o total de palavras em inglês do total em português.

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave de uma tupla principal e como valor uma outra tupla com a soma das palavras de cada idioma como valor.

A função map irá transformar cada linha do dataframe em uma tupla (chave-valor), onde:
- Chave: coluna 'sentiment'
- Valor: Nova tupla com:
  - Elemento 0: soma das palavras da coluna 'text_en'
  - Elemento 1: soma das palavras da coluna 'text_pt'

OU
- Chave: coluna 'sentiment'
- Valor: (soma das palavras da coluna 'text_pt') - (soma das palavras da coluna 'text_en')
  

Para contar as palavras deve-se primeiro separar os textos em uma lista de palavras para então descobrir o tamanho desta lista.
Dicas:

1. Use o método .split() e não .split(" ") de string para separar as palavras em uma lista ou use a função split(coluna de texto, regex) do pyspark com o regex igual à "[ ]+" ou "\s+"
2. Use len() para descobrir o tamanho da lista de palavras.

In [62]:
# Importações necessárias
from pyspark.sql.functions import col, split, size

# Dados do participante
ru = "4139872"
nome = "Sivaldo"

print("📦 Iniciando processamento dos dados...")

# Filtrar registros com sentimento negativo
neg_df = imdb_df.filter(col("sentiment") == "neg")

print("🔍 Contando palavras nos textos negativos (inglês e português)...")

# Contar palavras na coluna 'text_en'
neg_df = neg_df.withColumn("count_en", size(split(col("text_en"), r"\s+")))

# Contar palavras na coluna 'text_pt'
neg_df = neg_df.withColumn("count_pt", size(split(col("text_pt"), r"\s+")))

# Somar o total de palavras por idioma
soma_en = neg_df.agg({"count_en": "sum"}).collect()[0][0]
soma_pt = neg_df.agg({"count_pt": "sum"}).collect()[0][0]

# Calcular diferença total (pt - en)
diferenca = soma_pt - soma_en

print("🔁 Aplicando map/reduce para validação...")

# Função map para mapear sentimento e contagem de palavras
def map2(row):
    return (row['sentiment'], (row['count_en'], row['count_pt']))

# Função reduce para somar palavras por idioma
def reduceByKey2(x, y):
    return (x[0] + y[0], x[1] + y[1])

# Executar map/reduce
resultado_rdd = neg_df.rdd.map(map2).reduceByKey(reduceByKey2).collect()

# Extrair resultado do sentimento 'neg'
total_en_rdd, total_pt_rdd = [v for k, v in resultado_rdd if k == 'neg'][0]

# Validar soma
if (total_en_rdd == soma_en) and (total_pt_rdd == soma_pt):
    validacao = "✅ VALIDAÇÃO BEM-SUCEDIDA! Contagens coincidem."
else:
    validacao = "⚠️ ATENÇÃO! Diferença entre métodos detectada."

# Impressão do resultado final com visual impactante
print(f"""
📦 Dados processados com sucesso!

🔎 Tipo de análise: Diferença no total de palavras em textos negativos
🧮 Total palavras (Inglês) → {soma_en:,}
🧮 Total palavras (Português) → {soma_pt:,}
🧮 Diferença (PT - EN) → {diferenca:,}

📈 Conclusão: {validacao}

📍 Informações do participante:
   - Nome: {nome}
   - Registro Universitário (RU): {ru}

📊 Resultado Final:
   Os textos negativos em português possuem **{diferenca:,}** palavras a mais do que os textos em inglês.
""")


📦 Iniciando processamento dos dados...
🔍 Contando palavras nos textos negativos (inglês e português)...
🔁 Aplicando map/reduce para validação...

📦 Dados processados com sucesso!

🔎 Tipo de análise: Diferença no total de palavras em textos negativos
🧮 Total palavras (Inglês) → 5,400,297
🧮 Total palavras (Português) → 5,455,273
🧮 Diferença (PT - EN) → 54,976

📈 Conclusão: ✅ VALIDAÇÃO BEM-SUCEDIDA! Contagens coincidem.

📍 Informações do participante:
   - Nome: Sivaldo
   - Registro Universitário (RU): 4139872

📊 Resultado Final:
   Os textos negativos em português possuem **54,976** palavras a mais do que os textos em inglês.



## Cria funções de REDUCE:

- Criar função de reduce para somar o numero de palavras de cada texto português e inglês por "sentiment" (dependerá de como você optou por fazer sua função map2).

A função reduce irá somar os valores das quantidades de palavras agrupados por chave ('sentiment').

In [None]:
def reduceByKey2(x,y):
  # Coloque aqui o seu código para retornar o resultado necessário.
  # x e y são valores que podem ser ou a tupla vinda da saída da função map
  # contendo quantidade de palavras em inglês e português, ou a diferença, a
  # depender da sua implementação da função map2.
  # Apague a linha abaixo para iniciar seu código.
  pass

## Aplicação do map/reduce e visualização do resultado

1. Aplicar o map/reduce no seu dataframe spark e realizar o collect() ao final
2. Selecionar os dados referentes aos textos negativos para realizar a subtração.
3. Realizar a subtração das contagens de palavras dos textos negativos para obter o resultado final

In [None]:
# Linha de código para aplicar o map/reduce no seu dataframe spark
resultado = imdb_df.rdd.map(map2).reduceByKey(reduceByKey2).collect()
# Coloque aqui suas linhas de código final para imprimir o resultado.
# Não esqueça seu RU:
