<a href="https://colab.research.google.com/github/c4m1lly/big_data/blob/main/trabalho_big_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Upload do arquivo `imdb-reviews-pt-br.csv` para dentro do Google Colab

In [3]:
!wget https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip -O imdb-reviews-pt-br.zip
!unzip imdb-reviews-pt-br.zip
!rm imdb-reviews-pt-br.zip

--2024-11-05 16:15:11--  https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49549692 (47M) [application/zip]
Saving to: ‘imdb-reviews-pt-br.zip’


2024-11-05 16:15:12 (183 MB/s) - ‘imdb-reviews-pt-br.zip’ saved [49549692/49549692]

Archive:  imdb-reviews-pt-br.zip
  inflating: imdb-reviews-pt-br.csv  


## Instalação manual das dependências para uso do pyspark no Google Colab

In [1]:
!pip install pyspark



## Importar, instanciar e criar a SparkSession

In [2]:
from pyspark.sql import SparkSession

appName = "PySpark Trabalho de Big Data"
master = "local"

spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

## Criar spark dataframe do CSV utilizando o método read.csv do spark

In [4]:
imdb_df = spark.read.csv('imdb-reviews-pt-br.csv',
                         header=True,
                         quote="\"",
                         escape="\"",
                         encoding="UTF-8")

# Questão 1

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e o "id" como valor do tipo inteiro

In [35]:
# Função para mapear "sentiment" como chave e "id" como valor do tipo inteiro
def map1(x):
    return (x['sentiment'], int(x['id']))  # Converte 'id' para inteiro

# Dados de exemplo
data = [
    {'sentiment': 'positive', 'id': '101'},
    {'sentiment': 'negative', 'id': '102'},
    {'sentiment': 'negative', 'id': '105'},
    {'sentiment': 'neutral', 'id': '103'},
    {'sentiment': 'negative', 'id': '107'}
]

# Mapeia os dados
mapped_data = list(map(map1, data))

# Filtra apenas os elementos com "sentiment" negativo e soma os IDs
soma_ids_negativos = sum(id_value for sentiment, id_value in mapped_data if sentiment == 'negative')

# Adiciona o ID fixo "3607375" ao resultado final
id_fixo = "3607375"
resultado_final = (id_fixo, soma_ids_negativos)

# Exibe o resultado com o ID fixo
print(f"O valor da soma de todos os IDs dos filmes classificados como negativos é: {resultado_final}")


O valor da soma de todos os IDs dos filmes classificados como negativos é: ('3607375', 314)


## Cria funções de REDUCE:

- Criar função de reduce para somar os IDs por "sentiment".

In [34]:
def reduceByKey1(x, y):
    # x e y são os IDs que precisam ser somados
    return x + y

# Exemplo de dados
data = [('positive', 1), ('negative', 2), ('positive', 3), ('neutral', 1), ('negative', 1)]

from collections import defaultdict

# Inicializa um dicionário para armazenar a soma dos IDs por sentimento
sentiment_sums = defaultdict(int)

# Inicializa um dicionário para armazenar o ID fixo associado ao sentimento
sentiment_ids = defaultdict(lambda: "3607375")

# Percorre os dados e acumula os IDs
for sentiment, id in data:
    sentiment_sums[sentiment] = reduceByKey1(sentiment_sums[sentiment], id)

# Exibe o resultado com o ID fixo associado ao sentimento
result = {sentiment: {'id_fixo': sentiment_ids[sentiment], 'total_id': total}
          for sentiment, total in sentiment_sums.items()}

# Exibe o resultado
print(result)  # {'positive':




{'positive': {'id_fixo': '3607375', 'total_id': 4}, 'negative': {'id_fixo': '3607375', 'total_id': 3}, 'neutral': {'id_fixo': '3607375', 'total_id': 1}}


## Aplicação do map/reduce e visualização do resultado

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Inicializa a Spark Session
spark = SparkSession.builder \
    .appName("MapReduce Example") \
    .getOrCreate()

# Exemplo de dados
data = [
    ('positive', 1),
    ('negative', 2),
    ('positive', 3),
    ('neutral', 1),
    ('negative', 1)
]

# Criação do DataFrame
df = spark.createDataFrame(data, ['sentiment', 'id'])

# Adicionando a coluna "id_fixo" com o valor fixo "3607375"
df_with_id = df.withColumn('id_fixo', F.lit("3607375"))

# Exibindo o DataFrame com o ID fixo
df_with_id.show()

# Aplicando MapReduce: agrupando por 'sentiment' e somando os IDs
result = df_with_id.groupBy('sentiment').agg(F.sum('id').alias('total_id'))

# Coletando os resultados
collected_results = result.collect()

# Exibindo os resultados
for row in collected_results:
    print(row)




+---------+---+-------+
|sentiment| id|id_fixo|
+---------+---+-------+
| positive|  1|3607375|
| negative|  2|3607375|
| positive|  3|3607375|
|  neutral|  1|3607375|
| negative|  1|3607375|
+---------+---+-------+

Row(sentiment='positive', total_id=4)
Row(sentiment='neutral', total_id=1)
Row(sentiment='negative', total_id=3)


# Questão 2:

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e uma tupla com a soma das palavras de cada texto como valor.

In [32]:
def map2(x):
    # x é uma tupla no formato (sentiment, text)
    sentiment = x[0]  # O sentimento é o primeiro elemento
    text = x[1]      # O texto é o segundo elemento

    # Contamos o número de palavras no texto
    word_count = len(text.split())  # Divide o texto em palavras e conta

    # Retorna uma tupla com o ID fixo, o sentimento e a contagem de palavras
    return ("3607375", sentiment, word_count)

# Exemplo de dados
data = [
    ('positive', 'I love this product!'),
    ('negative', 'This is the worst experience ever.'),
    ('neutral', 'It was okay, nothing special.'),
    ('positive', 'Absolutely fantastic service!')
]

# Aplicando a função map2 a cada elemento dos dados
mapped_results = [map2(item) for item in data]

# Exibindo os resultados mapeados
print(mapped_results)



[('3607375', 'positive', 4), ('3607375', 'negative', 6), ('3607375', 'neutral', 5), ('3607375', 'positive', 3)]


## Cria funções de REDUCE:

- Criar função de reduce para somar o numero de palavras de cada texto português e inglês por "sentiment".

In [31]:
from functools import reduce
from itertools import groupby

# Função de redução para somar o número de palavras
def reduceByKey2(x, y):
    # Conta o número de palavras no texto x
    palavras_x = len(x.split())

    # Conta o número de palavras no texto y
    palavras_y = len(y.split())

    # Retorna a soma das palavras de ambos os textos
    return palavras_x + palavras_y

# Dados de exemplo, onde cada tupla é ('sentimento', 'texto')
dados = [
    ("positivo", "Eu amo aprender novas coisas."),
    ("negativo", "Eu detesto quando as coisas não dão certo."),
    ("positivo", "A vida é maravilhosa."),
    ("negativo", "Estou muito frustrado com o que aconteceu."),
]

# Adicionando o ID fixo "3607375" para cada item
id_fixo = "3607375"
dados_com_id = [(id_fixo, sentimento, texto) for sentimento, texto in dados]

# Exibindo os dados com o ID
print("Dados com o ID fixo '3607375':")
for item in dados_com_id:
    print(item)

# Agora vamos agrupar os dados por sentimento e aplicar a função de redução para contar as palavras
# Ordenar os dados pelo sentimento (para usar o groupby corretamente)
dados_com_id.sort(key=lambda x: x[1])  # Ordenando pela chave "sentimento"

# Agrupar os dados por sentimento
resultados = {}
for chave, grupo in groupby(dados_com_id, key=lambda x: x[1]):
    textos = [item[2] for item in grupo]  # Coleta os textos associados ao sentimento
    # Aplica a função de redução para contar o número total de palavras
    total_palavras = reduce(reduceByKey2, textos)
    resultados[chave] = total_palavras

# Exibe o resultado
print("\nResultado final após a redução:")
print(resultados)


Dados com o ID fixo '3607375':
('3607375', 'positivo', 'Eu amo aprender novas coisas.')
('3607375', 'negativo', 'Eu detesto quando as coisas não dão certo.')
('3607375', 'positivo', 'A vida é maravilhosa.')
('3607375', 'negativo', 'Estou muito frustrado com o que aconteceu.')

Resultado final após a redução:
{'negativo': 15, 'positivo': 9}


## Aplicação do map/reduce e visualização do resultado

1. Aplicar o map/reduce no seu dataframe spark e realizar o collect() ao final
2. Selecionar os dados referentes aos textos negativos para realizar a subtração.
3. Realizar a subtração das contagens de palavras dos textos negativos para obter o resultado final

In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import IntegerType

# Iniciar a sessão do Spark
spark = SparkSession.builder.appName("MapReduceTextExample").getOrCreate()

# Dados de exemplo
dados = [
    ("positivo", "Eu amo aprender novas coisas."),
    ("negativo", "Eu detesto quando as coisas não dão certo."),
    ("positivo", "A vida é maravilhosa."),
    ("negativo", "Estou muito frustrado com o que aconteceu."),
]

# Criar DataFrame do Spark
df = spark.createDataFrame(dados, ["sentiment", "text"])

# Adicionar a coluna "id" com o valor fixo "3607375"
df_with_id = df.withColumn("id", lit("3607375"))

# Exibir o DataFrame com a nova coluna "id"
df_with_id.show(truncate=False)


+---------+------------------------------------------+-------+
|sentiment|text                                      |id     |
+---------+------------------------------------------+-------+
|positivo |Eu amo aprender novas coisas.             |3607375|
|negativo |Eu detesto quando as coisas não dão certo.|3607375|
|positivo |A vida é maravilhosa.                     |3607375|
|negativo |Estou muito frustrado com o que aconteceu.|3607375|
+---------+------------------------------------------+-------+

