# Comandos para realização do trabalho da matéria de Big Data com uso da biblioteca PySpark.

## <font color=red>Observação importante:</font>

<font color=yellow>Trabalho realizado com uso da biblioteca pandas não será aceito!</font>

## Upload do arquivo `imdb-reviews-pt-br.csv` para dentro do Google Colab

In [1]:
!wget https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip -O imdb-reviews-pt-br.zip
!unzip imdb-reviews-pt-br.zip
!rm imdb-reviews-pt-br.zip

--2024-10-28 23:02:22--  https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49549692 (47M) [application/zip]
Saving to: ‘imdb-reviews-pt-br.zip’


2024-10-28 23:02:23 (94.8 MB/s) - ‘imdb-reviews-pt-br.zip’ saved [49549692/49549692]

Archive:  imdb-reviews-pt-br.zip
  inflating: imdb-reviews-pt-br.csv  


## Instalação manual das dependências para uso do pyspark no Google Colab

In [2]:
!pip install pyspark



## Importar, instanciar e criar a SparkSession

In [3]:
from pyspark.sql import SparkSession

appName = "PySpark Trabalho de Big Data"
master = "local"

spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

## Criar spark dataframe do CSV utilizando o método read.csv do spark

In [4]:
imdb_df = spark.read.csv('imdb-reviews-pt-br.csv',
                         header=True,
                         quote="\"",
                         escape="\"",
                         encoding="UTF-8")

In [5]:
imdb_df.show(5)  # Mostra as 5 primeiras linhas do DataFrame para verificação

+---+--------------------+--------------------+---------+
| id|             text_en|             text_pt|sentiment|
+---+--------------------+--------------------+---------+
|  1|Once again Mr. Co...|Mais uma vez, o S...|      neg|
|  2|This is an exampl...|Este é um exemplo...|      neg|
|  3|First of all I ha...|Primeiro de tudo ...|      neg|
|  4|Not even the Beat...|Nem mesmo os Beat...|      neg|
|  5|Brass pictures mo...|Filmes de fotos d...|      neg|
+---+--------------------+--------------------+---------+
only showing top 5 rows



# Questão 1

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e o "id" como valor do tipo inteiro

In [6]:
# Função map para filtrar e mapear os IDs dos sentimentos negativos
def map1(row):
    # Verifica se o sentimento é "neg"
    if row['sentiment'] == 'neg':
        # Retorna tupla com (sentiment, id)
        return (row['sentiment'], int(row['id']))
    else:
        # Ignora os sentimentos não negativos
        return None


## Cria funções de REDUCE:

- Criar função de reduce para somar os IDs por "sentiment".

In [7]:
# Função reduce para somar os IDs por sentimento
def reduceByKey1(x, y):
    # Soma dos IDs (se ambos x e y forem IDs, simplesmente retorna a soma)
    return x + y


## Aplicação do map/reduce e visualização do resultado

In [8]:
# Converte DataFrame para RDD
rdd = imdb_df.rdd

# Aplica a função map e filtra valores None
mapped_rdd = rdd.map(map1).filter(lambda x: x is not None)

# Aplica reduceByKey para somar os IDs dos sentimentos negativos
result_rdd = mapped_rdd.reduceByKey(reduceByKey1)

# Coleta e exibe o resultado
results = result_rdd.collect()

# Filtrando e exibindo apenas os resultados para 'negativo'
negative_sum = sum(value for sentiment, value in results if sentiment == 'neg')

print("Soma dos IDs para sentimento negativo:", negative_sum)


Soma dos IDs para sentimento negativo: 459568555


# Questão 2:

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e uma tupla com a soma das palavras de cada texto como valor.

In [9]:
# Função map para contar palavras e retornar tupla com o número de palavras e o idioma
def map2(row):
    # Verifica se o sentimento é "negativo"
    if row['sentiment'] == 'neg':
        # Verifica se o texto é em português ou inglês
        if row['text_pt']:
            text = row['text_pt']
            language = 'pt'
        elif row['text_en']:
            text = row['text_en']
            language = 'en'
        else:
            # Se não houver texto em nenhum dos idiomas, ignora
            return None

        # Contagem de palavras
        words = len(text.split())
        # Retorna tupla com (idioma, número de palavras)
        return (language, words)
    else:
        # Ignora os sentimentos não negativos
        return None


## Cria funções de REDUCE:

- Criar função de reduce para somar o numero de palavras de cada texto português e inglês por "sentiment".

In [10]:
# Função reduce para somar as contagens de palavras por idioma
def reduceByKey2(x, y):
    # Soma das contagens de palavras
    return x + y

## Aplicação do map/reduce e visualização do resultado

1. Aplicar o map/reduce no seu dataframe spark e realizar o collect() ao final
2. Selecionar os dados referentes aos textos negativos para realizar a subtração.
3. Realizar a subtração das contagens de palavras dos textos negativos para obter o resultado final

In [12]:
# Converte DataFrame para RDD
rdd = imdb_df.rdd

# Função para mapear o "sentiment" como chave e a contagem de palavras como valor
def map2(row):
    if row['sentiment'] == 'neg':
        # Contando palavras nos textos em português e inglês
        pt_words = len(row['text_pt'].split()) if row['text_pt'] else 0
        en_words = len(row['text_en'].split()) if row['text_en'] else 0
        # Retorna duas tuplas: uma para português e outra para inglês
        return [('pt', pt_words), ('en', en_words)]
    return []  # Retorna uma lista vazia em vez de None

# Aplica a função map e usa flatMap para desdobrar as tuplas
mapped_rdd = rdd.flatMap(map2).filter(lambda x: x) # remove empty lists

# Função para somar as contagens de palavras
def reduceByKey2(x, y):
    return x + y

# Agrupa por idioma e soma as contagens de palavras
result_rdd = mapped_rdd.reduceByKey(reduceByKey2)

# Coleta os resultados
results = result_rdd.collect()

# Inicializa contadores
portuguese_count = 0
english_count = 0

# Itera pelos resultados para calcular os totais
for lang, count in results:
    if lang == 'pt':
        portuguese_count += count
    elif lang == 'en':
        english_count += count

# Calcula a diferença
difference = portuguese_count - english_count

# Exibe os resultados
print("Total de palavras nos textos negativos em português:", portuguese_count)
print("Total de palavras nos textos negativos em inglês:", english_count)
print("Diferença total de palavras (português - inglês):", difference)


Total de palavras nos textos negativos em português: 5455273
Total de palavras nos textos negativos em inglês: 5400324
Diferença total de palavras (português - inglês): 54949
