# Comandos para realização do trabalho da matéria de Big Data com uso da biblioteca PySpark.

## <font color=red>Observação importante:</font>

<font color=yellow>Trabalho realizado com uso da biblioteca pandas não será aceito!</font>

## Upload do arquivo `imdb-reviews-pt-br.csv` para dentro do Google Colab

In [1]:
import os
import requests
import zipfile

url = "https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip"
filename = "imdb-reviews-pt-br.zip"

response = requests.get(url, stream=True)

if response.status_code == 200:
    with open(filename, "wb") as f:
        for chunk in response.iter_content(1024):
            f.write(chunk)
    print(f"Downloaded {filename} successfully!")
else:
    print(f"Error downloading file: {response.status_code}")

with zipfile.ZipFile(filename, 'r') as zip_ref:
    zip_ref.extractall()
    print("Extracted files from", filename)

os.remove(filename)
print(f"Removed {filename}")

Downloaded imdb-reviews-pt-br.zip successfully!
Extracted files from imdb-reviews-pt-br.zip
Removed imdb-reviews-pt-br.zip


## Importar, instanciar e criar a SparkSession

In [2]:
from pyspark.sql import SparkSession

MEU_RU = "3803786"
appName = f"PySpark Trabalho de Big Data - {MEU_RU}"
master = "local"

print(f"Iniciando spark session para Raquel - RU {MEU_RU}")
spark: SparkSession = SparkSession.builder.appName(appName).master(master).getOrCreate()
print("Sessão iniciado com sucesso! 🚀")

Iniciando spark session para Raquel - RU 3803786
Sessão iniciado com sucesso! 🚀


## Criar spark dataframe do CSV utilizando o método read.csv do spark

In [3]:
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
  StructField("id", StringType(), True),
  StructField("text_en", StringType(), True),
  StructField("text_pt", StringType(), True),
  StructField("sentiment", StringType(), True),
])


imdb_df: DataFrame = spark.read.csv('imdb-reviews-pt-br.csv',
                         header=True,
                         quote="\"",
                         escape="\"",
                         encoding="UTF-8")

# Questão 1

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e o "id" como valor do tipo inteiro

In [4]:
from pyspark.sql import DataFrame

def filter_negative_reviews(data: DataFrame) -> DataFrame:
    MEU_RU = "3803786"
    print(f"Meu RU é {MEU_RU}")

    return data.filter(data["sentiment"] == "neg")

## Cria funções de REDUCE:

- Criar função de reduce para somar os IDs por "sentiment".

In [5]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, sum

def sum_negative_ids(reviews: DataFrame) -> DataFrame:
  MEU_RU = "3803786"
  print(f"Lembrando que meu RU é {MEU_RU}")

  return reviews.withColumn("id", col("id").cast("int")).select(sum("id"))

## Aplicação do map/reduce e visualização do resultado

In [6]:
negative_reviews = filter_negative_reviews(imdb_df)
sum_of_negative_ids = sum_negative_ids(negative_reviews).collect()[0][0]

print(f"Soma de IDs das reviews negativas: {sum_of_negative_ids}")

Meu RU é 3803786
Lembrando que meu RU é 3803786
Soma de IDs das reviews negativas: 459568555


# Questão 2:

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e uma tupla com a soma das palavras de cada texto como valor.

In [7]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, split, size

def map_sentiment_to_word_count(data: DataFrame) -> DataFrame:
  MEU_RU = "3803786"
  print(f"Oi! Sou Raquel e meu RU é {MEU_RU}")

  return data.select(col("sentiment"), size(split(col("text_en"), "\\s+")).alias("text_en_word_count"), size(split(col("text_pt"), "\\s+")).alias("text_pt_word_count"))

## Cria funções de REDUCE:

- Criar função de reduce para somar o numero de palavras de cada texto português e inglês por "sentiment".

In [8]:
from pyspark.sql import DataFrame

def reduce_word_count_by_sentiment(sentiment_word_counts: DataFrame) -> DataFrame:
  MEU_RU = "3803786"
  print(f"Já falei que meu RU é {MEU_RU}?")

  return sentiment_word_counts.groupBy("sentiment").agg(sum("text_en_word_count").alias("total_text_en_words"), sum("text_pt_word_count").alias("total_text_pt_words"))


## Aplicação do map/reduce e visualização do resultado

1. Aplicar o map/reduce no seu dataframe spark e realizar o collect() ao final
2. Selecionar os dados referentes aos textos negativos para realizar a subtração.
3. Realizar a subtração das contagens de palavras dos textos negativos para obter o resultado final

In [9]:
negative_data = filter_negative_reviews(imdb_df)

sentiment_word_counts = map_sentiment_to_word_count(negative_data)
total_word_counts = reduce_word_count_by_sentiment(sentiment_word_counts)
result = total_word_counts.collect()[0]
word_count_difference = result[2] - result[1]

print(f"Diferença entre a contagem de palavras: {word_count_difference} (Texto em PT - Texto em EN)")


Meu RU é 3803786
Oi! Sou Raquel e meu RU é 3803786
Já falei que meu RU é 3803786?
Diferença entre a contagem de palavras: 54976 (Texto em PT - Texto em EN)
