# Comandos para realização do trabalho da matéria de Big Data com uso da biblioteca PySpark.

## <font color=red>Observação importante:</font>

<font color=yellow>Trabalho realizado com uso da biblioteca pandas não será aceito!</font>

## Upload do arquivo `imdb-reviews-pt-br.csv` para dentro do Google Colab

In [None]:
!wget https://raw.githubusercontent.com/N-CPUninter/Big_Data/main/data/imdb-reviews-pt-br.zip -O imdb-reviews-pt-br.zip
!unzip imdb-reviews-pt-br.zip
!rm imdb-reviews-pt-br.zip

## Instalação manual das dependências para uso do pyspark no Google Colab

In [None]:
!pip install pyspark

## Importar, instanciar e criar a SparkSession

In [None]:
from pyspark.sql import SparkSession

appName = "PySpark Trabalho de Big Data"
master = "local"

spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

## Criar spark dataframe do CSV utilizando o método read.csv do spark

In [None]:
imdb_df = spark.read.csv('imdb-reviews-pt-br.csv',
                         header=True,
                         quote="\"",
                         escape="\"",
                         encoding="UTF-8")

# Questão 1

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e o "id" como valor do tipo inteiro

In [33]:
from google.colab import files

# Fazer upload do arquivo (ex: CSV)
uploaded = files.upload()




Saving imdb-reviews-pt-br.csv to imdb-reviews-pt-br (3).csv


In [71]:
# Obter o nome do arquivo a partir do dicionário de uploads
file_name = next(iter(uploaded))  # Pega a primeira chave do dicionário, que é o nome do arquivo

# Carregar o arquivo CSV em um DataFrame do PySpark
df = spark.read.csv(file_name, header=True, inferSchema=True)


In [72]:
df.show(5)


+---+--------------------+--------------------+--------------------+
| id|             text_en|             text_pt|           sentiment|
+---+--------------------+--------------------+--------------------+
|  1|Once again Mr. Co...|Mais uma vez, o S...|                 neg|
|  2|This is an exampl...|Este é um exemplo...|                 neg|
|  3|First of all I ha...|"Primeiro de tudo...| exceto Paxton e ...|
|  4|Not even the Beat...|Nem mesmo os Beat...|                 neg|
|  5|Brass pictures mo...|Filmes de fotos d...|                 neg|
+---+--------------------+--------------------+--------------------+
only showing top 5 rows



In [73]:
from pyspark.sql import functions as F

# 1. Carregar o arquivo CSV em um DataFrame do PySpark
df = spark.read.csv(file_name, header=True, inferSchema=True)

# 2. Alterar a coluna 'sentiment' para manter apenas 'neg' e 'pos', trocando outros por null
df_corrected = df.withColumn(
    'sentiment',
    F.when(df['sentiment'].isin(['neg', 'pos']), df['sentiment']).otherwise(None)  # ou outro valor padrão
)

# 3. Visualizar o resultado
df_corrected.show()


+---+--------------------+--------------------+---------+
| id|             text_en|             text_pt|sentiment|
+---+--------------------+--------------------+---------+
|  1|Once again Mr. Co...|Mais uma vez, o S...|      neg|
|  2|This is an exampl...|Este é um exemplo...|      neg|
|  3|First of all I ha...|"Primeiro de tudo...|     NULL|
|  4|Not even the Beat...|Nem mesmo os Beat...|      neg|
|  5|Brass pictures mo...|Filmes de fotos d...|      neg|
|  6|"A funny thing ha...| the hero is a de...|     NULL|
|  7|This German horro...|Este filme de ter...|      neg|
|  8|"Being a long-tim...| plots and twists...|     NULL|
|  9|"""Tokyo Eyes"" t...|                talk|     NULL|
| 10|Wealthy horse ran...|Fazendeiros ricos...|      neg|
| 11|"Cage plays a dru...|"Cage interpreta ...|      neg|
| 12|First of all, I w...|Primeiro de tudo,...|      neg|
| 13|So tell me - what...|Então me diga - q...|      neg|
| 14|A big disappointm...|Uma grande decepç...|      neg|
| 15|This film

In [74]:
# 1. Carregar o arquivo CSV em um DataFrame do PySpark
df = spark.read.csv(file_name, header=True, inferSchema=True)

# 2. Alterar a coluna 'sentiment' para manter apenas 'neg' e 'pos', trocando outros por null
df_corrected = df.withColumn(
    'sentiment',
    F.when(df['sentiment'].isin(['neg', 'pos']), df['sentiment']).otherwise(None)
)

# 3. Excluir as linhas que contêm valores null na coluna 'sentiment'
df_cleaned = df_corrected.dropna(subset=['sentiment'])

# 4. Visualizar o resultado
df_cleaned.show()


+---+--------------------+--------------------+---------+
| id|             text_en|             text_pt|sentiment|
+---+--------------------+--------------------+---------+
|  1|Once again Mr. Co...|Mais uma vez, o S...|      neg|
|  2|This is an exampl...|Este é um exemplo...|      neg|
|  4|Not even the Beat...|Nem mesmo os Beat...|      neg|
|  5|Brass pictures mo...|Filmes de fotos d...|      neg|
|  7|This German horro...|Este filme de ter...|      neg|
| 10|Wealthy horse ran...|Fazendeiros ricos...|      neg|
| 11|"Cage plays a dru...|"Cage interpreta ...|      neg|
| 12|First of all, I w...|Primeiro de tudo,...|      neg|
| 13|So tell me - what...|Então me diga - q...|      neg|
| 14|A big disappointm...|Uma grande decepç...|      neg|
| 15|This film is abso...|Este filme é abso...|      neg|
| 16|Heres a decidedly...|Heres um decidida...|      neg|
| 17|At the bottom end...|Na parte inferior...|      neg|
| 18|Earth has been de...|A terra foi destr...|      neg|
| 19|Many peop

In [75]:
# 1. Carregar o arquivo CSV em um DataFrame do PySpark
df = spark.read.csv(file_name, header=True, inferSchema=True)

# 2. Alterar a coluna 'sentiment' para manter apenas 'neg' e 'pos', trocando outros por null
df = df.withColumn(
    'sentiment',
    F.when(df['sentiment'].isin(['neg', 'pos']), df['sentiment']).otherwise(None)
)

# 3. Excluir as linhas que contêm valores null na coluna 'sentiment'
df = df.dropna(subset=['sentiment'])

# 4. Visualizar o resultado
df.show()


+---+--------------------+--------------------+---------+
| id|             text_en|             text_pt|sentiment|
+---+--------------------+--------------------+---------+
|  1|Once again Mr. Co...|Mais uma vez, o S...|      neg|
|  2|This is an exampl...|Este é um exemplo...|      neg|
|  4|Not even the Beat...|Nem mesmo os Beat...|      neg|
|  5|Brass pictures mo...|Filmes de fotos d...|      neg|
|  7|This German horro...|Este filme de ter...|      neg|
| 10|Wealthy horse ran...|Fazendeiros ricos...|      neg|
| 11|"Cage plays a dru...|"Cage interpreta ...|      neg|
| 12|First of all, I w...|Primeiro de tudo,...|      neg|
| 13|So tell me - what...|Então me diga - q...|      neg|
| 14|A big disappointm...|Uma grande decepç...|      neg|
| 15|This film is abso...|Este filme é abso...|      neg|
| 16|Heres a decidedly...|Heres um decidida...|      neg|
| 17|At the bottom end...|Na parte inferior...|      neg|
| 18|Earth has been de...|A terra foi destr...|      neg|
| 19|Many peop

In [86]:

  # Coloque aqui o seu código para retornar a tupla necessária.
df = df.dropna(subset=['sentiment'])
# 2. Transformar o DataFrame em um RDD
rdd = df.rdd

# 3. Criar uma função de mapeamento
mapped_rdd = rdd.map(lambda row: (row['sentiment'], int(row['id'])))

# 4. (Opcional) Converter o RDD de volta para um DataFrame, se você precisar dele em formato de DataFrame
mapped_df = mapped_rdd.toDF(["sentiment", "id"])

# 5. Mostrar o resultado
mapped_df.show()



  # Apague a linha abaixo para iniciar seu código.


+--------------------+---+
|           sentiment| id|
+--------------------+---+
|                 neg|  1|
|                 neg|  2|
| exceto Paxton e ...|  3|
|                 neg|  4|
|                 neg|  5|
|           excuse me|  6|
|                 neg|  7|
|         forcedly so|  8|
| and more talk. Y...|  9|
|                 neg| 10|
|                 neg| 11|
|                 neg| 12|
|                 neg| 13|
|                 neg| 14|
|                 neg| 15|
|                 neg| 16|
|                 neg| 17|
|                 neg| 18|
|                 neg| 19|
|"A família de Nov...| 20|
+--------------------+---+
only showing top 20 rows



## Cria funções de REDUCE:

- Criar função de reduce para somar os IDs por "sentiment".

In [84]:

# Coloque aqui o seu código para retornar o resultado necessário.
# 1. Carregar o arquivo CSV em um DataFrame do PySpark
df = df.dropna(subset=['sentiment'])

# 2. Transformar o DataFrame em um RDD
rdd = df.rdd

# 3. Criar um RDD com "sentiment" como chave e "id" como valor
mapped_rdd = rdd.map(lambda row: (row['sentiment'], int(row['id'])))

# 4. Criar a função de reduce para somar os IDs por "sentiment"
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: a + b)

# 5. Mostrar o resultado
reduced_df.show()


  # Apague a linha abaixo para iniciar seu código.


+--------------------+------------+
|           sentiment|total_id_sum|
+--------------------+------------+
|                 neg|   247015948|
| exceto Paxton e ...|           3|
|           excuse me|           6|
|         forcedly so|           8|
| and more talk. Y...|           9|
|"A família de Nov...|          20|
|      the dumb teens|          22|
| the bestest nurs...|          23|
| and is covered i...|          24|
| the back-story e...|          25|
| what was there t...|          26|
| they did. To be ...|          28|
| insufferable cha...|          29|
| campers are goin...|          30|
| youll want to fa...|          31|
| no pics of the m...|          33|
| but I think the ...|          34|
| and answers it a...|          36|
|"Eu não estou ent...|          44|
| from a novel by ...|          45|
+--------------------+------------+
only showing top 20 rows



## Aplicação do map/reduce e visualização do resultado

In [79]:
# Coloque aqui a sua linha de código para aplicar o map/reduce no seu
# dataframe spark e realize o collect() ao final para visualizar os dados.

# 1. Carregar o arquivo CSV em um DataFrame do PySpark
df = spark.read.csv(file_name, header=True, inferSchema=True)

# 2. Transformar o DataFrame em um RDD
rdd = df.rdd

# 3. Criar um RDD com "sentiment" como chave e "id" como valor
mapped_rdd = rdd.map(lambda row: (row['sentiment'], int(row['id'])))

# 4. Criar a função de reduce para somar os IDs por "sentiment"
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: a + b)

# 5. Coletar os resultados
result = reduced_rdd.collect()

# 6. Visualizar os resultados
for sentiment, total_id_sum in result:
    print(f"Sentiment: {sentiment}, Total ID Sum: {total_id_sum}")



[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
Sentiment:  no ""showdown"" scenes. Thank Heaven. You have to get into the zone when watching this movie, Total ID Sum: 37140
Sentiment:  while others feel as if they have to make the story with the camera. I really appreciate when someone these days has the courage to just use the camera as its supposed to be utilized, Total ID Sum: 37141
Sentiment:  but throughout the film, Total ID Sum: 37142
Sentiment:  trite by-the-numbers script, Total ID Sum: 37143
Sentiment:  ""The Cat and the Canary"", Total ID Sum: 37144
Sentiment:  its not morbid at all. Such a horrifying testimony about how some childhood trauma can turn a man into a monster.My rate: 7/10", Total ID Sum: 37145
Sentiment: "Apenas dois comentários .... Sete anos separados? Dificilmente provas dos filmes implacável puxando poder! Como já foi mencionado, o status de telemovias de baixo orçamento de 13 GANTRY ROW é um fator atenuante em seu apelo limitado. 

# Questão 2:

## Criar funções de MAP:
- Criar função para mapear o "sentiment" como chave e uma tupla com a soma das palavras de cada texto como valor.

In [44]:

  # Coloque aqui o seu código para retornar a tupla necessária.

  # 1. Carregar o arquivo CSV em um DataFrame do PySpark
df = spark.read.csv(file_name, header=True, inferSchema=True)

# 2. Transformar o DataFrame em um RDD
rdd = df.rdd

# 3. Criar uma função de mapeamento para contar as palavras
def map_sentiment_word_count(row):
    # Contar o número de palavras no texto
    word_count = len(row['text'].split())  # Split para contar as palavras
    return (row['sentiment'], word_count)  # Retorna uma tupla

# 4. Aplicar a função de mapeamento
mapped_rdd = rdd.map(map_sentiment_word_count)


  # Apague a linha abaixo para iniciar seu código.


## Cria funções de REDUCE:

- Criar função de reduce para somar o numero de palavras de cada texto português e inglês por "sentiment".

In [47]:

  # Coloque aqui o seu código para retornar o resultado necessário.

  # 1. Carregar o arquivo CSV em um DataFrame do PySpark
df = spark.read.csv(file_name, header=True, inferSchema=True)

# 2. Transformar o DataFrame em um RDD
rdd = df.rdd

# 3. Criar uma função de mapeamento para contar as palavras
def map_sentiment_word_count(row):
    word_count = len(row['text'].split())  # Contar palavras
    return (row['sentiment'], word_count)  # Retornar tupla (sentiment, word_count)

# 4. Aplicar a função de mapeamento
mapped_rdd = rdd.map(map_sentiment_word_count)

# 5. Criar a função de reduce para somar o número de palavras por "sentiment"
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: a + b)


  # Apague a linha abaixo para iniciar seu código.


## Aplicação do map/reduce e visualização do resultado

1. Aplicar o map/reduce no seu dataframe spark e realizar o collect() ao final
2. Selecionar os dados referentes aos textos negativos para realizar a subtração.
3. Realizar a subtração das contagens de palavras dos textos negativos para obter o resultado final

In [48]:
# Coloque aqui suas linhas de código final
# 1. Carregar o arquivo CSV em um DataFrame do PySpark
df = spark.read.csv(file_name, header=True, inferSchema=True)

# 2. Transformar o DataFrame em um RDD
rdd = df.rdd

# 3. Criar uma função de mapeamento para contar as palavras
def map_sentiment_word_count(row):
    word_count = len(row['text'].split())  # Contar palavras
    return (row['sentiment'], word_count)  # Retornar tupla (sentiment, word_count)

# 4. Aplicar a função de mapeamento
mapped_rdd = rdd.map(map_sentiment_word_count)

# 5. Criar a função de reduce para somar o número de palavras por "sentiment"
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: a + b)

# 6. Coletar os resultados
result = reduced_rdd.collect()

# 7. Criar um dicionário para armazenar as contagens de palavras por sentimento
word_count_dict = dict(result)

# 8. Obter a contagem de palavras para textos negativos
negative_word_count = word_count_dict.get('negative', 0)

# 9. Obter a contagem de palavras para textos positivos (ou outro sentimento)
positive_word_count = word_count_dict.get('positive', 0)

# 10. Realizar a subtração das contagens de palavras
result_final = positive_word_count - negative_word_count

# 11. Visualizar o resultado final
print(f"Total Word Count (Positive): {positive_word_count}")
print(f"Total Word Count (Negative): {negative_word_count}")
print(f"Final Result (Positive - Negative): {result_final}")


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 48) (c9e5b65a1b07 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 2378, in __getitem__
    idx = self.__fields__.index(item)
ValueError: 'text' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1237, in process
    out_iter = func(split_index, iterator)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/lib/python3.10/dist-packages/pyspark/rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/lib/python3.10/dist-packages/pyspark/rdd.py", line 840, in func
    return f(iterator)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/rdd.py", line 3983, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-48-f9974bd70bcf>", line 10, in map_sentiment_word_count
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 2383, in __getitem__
    raise PySparkValueError(item)
pyspark.errors.exceptions.base.PySparkValueError: text

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 2378, in __getitem__
    idx = self.__fields__.index(item)
ValueError: 'text' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1237, in process
    out_iter = func(split_index, iterator)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/lib/python3.10/dist-packages/pyspark/rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/lib/python3.10/dist-packages/pyspark/rdd.py", line 840, in func
    return f(iterator)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/rdd.py", line 3983, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-48-f9974bd70bcf>", line 10, in map_sentiment_word_count
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 2383, in __getitem__
    raise PySparkValueError(item)
pyspark.errors.exceptions.base.PySparkValueError: text

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
