In [1]:
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

In [2]:
# import os
# os.environ['HADOOP_HOME'] = 'Caminho/para/o/Hadoop'
# os.environ['JAVA_HOME'] = 'Caminho/para/o/Java'

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# spark = SparkSession.builder.appName('explore_spark') \
#     .config('spark.master', 'local') \
#     .config("spark.executor.memory", "24g") \
#     .config("spark.executor.instances", "3") \
#     .config("spark.executor.cores", "4") \
#     .getOrCreate()


spark = SparkSession.builder.appName('explore_spark') \
    .config('spark.master', 'local') \
    .config("spark.executor.memory", "24g") \
    .config("spark.executor.cores", "6") \
    .getOrCreate()

# # Configuração manual das bibliotecas do Hadoop no classpath
# spark = SparkSession.builder \
#     .appName("NomeDaAplicacao") \
#     .config("spark.driver.extraClassPath", "Caminho/para/as/bibliotecas/hadoop.dll") \
#     .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
# %timeit ps.range(300000).to_pandas()

## Carregar dados dos ratings/votos dos títulos

In [4]:
title_ratings = spark.read.load('../Data/title.ratings.tsv', format='csv', sep='\t', inferSchema=True, header=True)

In [5]:
# title_ratings.show(2)

## Carregar dados dos títulos e concatena os ratings/votos com as informações dos titulos
- somente aqueles que possuem rating
- somente filmes
- somente os que possuem algum gênero

In [6]:
title_basics = spark.read.load('../Data/title.basics.tsv', format='csv', sep='\t', inferSchema=True, header=True)

In [7]:
# title_basics.select(col('titleType')).distinct().show()
# title_basics.groupBy('titleType').count().orderBy(col('count').desc()).show()

In [8]:
title_basics = title_basics.filter(title_basics['titleType'] == 'movie')
title_basics = title_basics.filter(title_basics['genres'] != '\\N')

In [9]:
# Realiza um join usando 'tconst' como chave e 'inner' como tipo de join
# https://sparkbyexamples.com/pyspark/pyspark-join-explained-with-examples/
title_basics_filtered = title_basics.join(title_ratings, ['tconst'], 'inner').drop('titleType', 'endYear')

In [10]:
# title_basics.count()

In [11]:
# title_basics_filtered.count()

In [12]:
# title_basics_filtered.groupBy('isAdult').count().show()

In [13]:
# Criação da nova coluna com a condição de igualdade
title_basics_filtered = title_basics_filtered.withColumn('popularIsOriginal', when(title_basics_filtered['primaryTitle'] == title_basics_filtered['originalTitle'], 1).otherwise(0))

In [14]:
# Filtrar as linhas onde 'runtimeMinutes' é igual a '\N' e contar o número de ocorrências
# title_basics_filtered.filter(title_basics_filtered['runtimeMinutes'] == '\\N').count()

In [15]:
# title_basics_filtered.show()

In [16]:
title_basics_filtered.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)
 |-- popularIsOriginal: integer (nullable = false)



## dumificar 'genres'

In [17]:
# Dividir a coluna 'genres' por vírgulas e expandir em colunas
genres_split = title_basics_filtered.withColumn('genres', split('genres', ','))

# Usar a função explode() para criar múltiplas linhas para cada gênero
genres_exploded = genres_split.withColumn('genre', explode('genres'))

# Criar dummies para cada gênero usando pivot()
dummies = genres_exploded.groupBy('tconst').pivot('genre').agg(lit(1)).fillna(0)

# Mostrar o DataFrame resultante com as colunas de dummies para gêneros
# dummies.show(5)

In [18]:
dummies.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- Action: integer (nullable = true)
 |-- Adult: integer (nullable = true)
 |-- Adventure: integer (nullable = true)
 |-- Animation: integer (nullable = true)
 |-- Biography: integer (nullable = true)
 |-- Comedy: integer (nullable = true)
 |-- Crime: integer (nullable = true)
 |-- Documentary: integer (nullable = true)
 |-- Drama: integer (nullable = true)
 |-- Family: integer (nullable = true)
 |-- Fantasy: integer (nullable = true)
 |-- Film-Noir: integer (nullable = true)
 |-- History: integer (nullable = true)
 |-- Horror: integer (nullable = true)
 |-- Music: integer (nullable = true)
 |-- Musical: integer (nullable = true)
 |-- Mystery: integer (nullable = true)
 |-- News: integer (nullable = true)
 |-- Reality-TV: integer (nullable = true)
 |-- Romance: integer (nullable = true)
 |-- Sci-Fi: integer (nullable = true)
 |-- Sport: integer (nullable = true)
 |-- Talk-Show: integer (nullable = true)
 |-- Thriller: integer (nullable = tru

## Carregar dados dos atores principais/equipe dos títulos
- somente dos titulos que possuem rating
- somente filmes
- somente os que possuem algum gênero
- somente atores/atrizes

In [19]:
title_principals = spark.read.load('../Data/title.principals.tsv', format='csv', sep='\t', inferSchema=True, header=True)

In [20]:
# title_principals.count()

In [21]:
# Obter os 'tconst' do DataFrame movies
tconst_movies = title_basics_filtered.select('tconst')

# Filtrar o DataFrame title_principals para manter apenas as linhas em que 'tconst' está presente em movies
title_principals_filtered = title_principals.join(tconst_movies, 'tconst', 'inner').drop('job', 'ordering', 'characters')

In [22]:
# title_principals_filtered.count()

In [23]:
# title_principals_filtered.show()

In [24]:
# title_principals_filtered.select('category').distinct().show()

In [25]:
# separar as categorias
actors = ['actress', 'actor', 'self']
producers = ['writer', 'director', 'producer']
crew = ['composer', 'editor', 'cinematographer', 'archive_sound', 'production_designer', 'archive_footage']

title_principals_filtered_actors = title_principals_filtered.filter(col('category').isin(actors))
title_principals_filtered_producers = title_principals_filtered.filter(col('category').isin(producers))
title_principals_filtered_crew = title_principals_filtered.filter(col('category').isin(crew))

In [26]:
# title_principals_filtered_actors.count()

In [27]:
# title_principals_filtered_actors.select('nconst').distinct().count()

In [28]:
# title_principals_filtered_producers.count()

In [29]:
# title_principals_filtered_crew.count()

In [30]:
# title_principals_filtered_actors.show(2)

## Fazer a media de cada grupo pros filmes

In [31]:
# title_basics_filtered.orderBy('tconst').show(2)

### Actors

In [32]:
# separa os dados de nome, filme, avgRating e numVotes para todos atores.
title_principals_filtered_actors_full = title_principals_filtered_actors.join(title_basics_filtered, 'tconst', 'inner').select('nconst','tconst','averageRating','numVotes')

In [33]:
# para cada ator, calcula a média dele das colunas 'averageRating' e 'numVotes'
avg_ratings_votes_actors = title_principals_filtered_actors_full.groupBy('nconst') \
    .agg(avg('averageRating').alias('avgRatingActor'), avg('numVotes').alias('avgNumVotesActor'))

In [34]:
# avg_ratings_votes_actors.show(2)

In [35]:
# Agora para cada filme, pegar a lista de atores dele
# e depois fazer a média das médias dos atores

# aqui tem pares filme, ator
#title_principals_filtered_actors

# Adicionar as médias de cada ator no dataset de atores
title_principals_filtered_actors_with_avg = title_principals_filtered_actors.join(avg_ratings_votes_actors, 'nconst', 'inner')

In [36]:
# No DF com todos os filmes (title_basics_filtered), manter somente as entradas que possuam algum ator. 
title_basics_filtered = title_basics_filtered.join(title_principals_filtered_actors_with_avg.select('tconst').distinct(), 'tconst', 'inner')

In [37]:
# agora a coluna tconst e category não são mais necessária, então podemos removê-la e manter somente as entradas únicas
# Remover a coluna 'tconst'
actors_with_avg = title_principals_filtered_actors_with_avg.drop('tconst', 'category').dropDuplicates()

In [38]:
# actors_with_avg.show(2)

In [39]:
# agora o próximo passo é pegar a lista de todos atores de todos os filmes, 
# juntar com as médias de cada ator, para depois fazer pros filmes

# a coluna category não é relevante, removê-la e manter somente as entradas únicas
title_principals_filtered_actors = title_principals_filtered_actors.drop('category').dropDuplicates()

In [40]:
title_principals_filtered_actors = title_principals_filtered_actors.join(actors_with_avg, 'nconst', 'inner')

In [41]:
# title_principals_filtered_actors.show(2)

In [42]:
# por fim, fazer a média por filme
title_avg_ratings_actors = title_principals_filtered_actors.groupBy('tconst') \
    .agg(avg('avgRatingActor').alias('avgRatingActorInMovie'), avg('avgNumVotesActor').alias('avgNumVotesActorInMovie'))

In [43]:
# title_avg_ratings_actors.show(2)

### Producers

In [44]:
# agora basta replicar oq foi feito para o actors nos outros dois grupos

# separa os dados de nome, filme, avgRating e numVotes para todos producers.
title_principals_filtered_producers_full = title_principals_filtered_producers.join(title_basics_filtered, 'tconst', 'inner').select('nconst','tconst','averageRating','numVotes')

In [45]:
# para cada producer, calcula a média dele das colunas 'averageRating' e 'numVotes'
avg_ratings_votes_producers = title_principals_filtered_producers_full.groupBy('nconst') \
    .agg(avg('averageRating').alias('avgRatingProducer'), avg('numVotes').alias('avgNumVotesProducer'))

In [46]:
# Agora para cada filme, pegar a lista de producers dele
# e depois fazer a média das médias dos producers

# Adicionar as médias de cada producer no dataset de producers
title_principals_filtered_producers_with_avg = title_principals_filtered_producers.join(avg_ratings_votes_producers, 'nconst', 'inner')

In [47]:
# No DF com todos os filmes (title_basics_filtered), manter somente as entradas que possuam algum producer. 
title_basics_filtered = title_basics_filtered.join(title_principals_filtered_producers_with_avg.select('tconst').distinct(), 'tconst', 'inner')

In [48]:
# agora a coluna tconst e category não são mais necessária, então podemos removê-la e manter somente as entradas únicas
# Remover a coluna 'tconst'
producers_with_avg = title_principals_filtered_producers_with_avg.drop('tconst', 'category').dropDuplicates()

In [49]:
# agora o próximo passo é pegar a lista de todos producers de todos os filmes, 
# juntar com as médias de cada producer, para depois fazer pros filmes

# a coluna category não é relevante, removê-la e manter somente as entradas únicas
title_principals_filtered_producers = title_principals_filtered_producers.drop('category').dropDuplicates()

In [50]:
title_principals_filtered_producers = title_principals_filtered_producers.join(producers_with_avg, 'nconst', 'inner')

In [51]:
# por fim, fazer a média por filme
title_avg_ratings_producers = title_principals_filtered_producers.groupBy('tconst') \
    .agg(avg('avgRatingProducer').alias('avgRatingProducerInMovie'), avg('avgNumVotesProducer').alias('avgNumVotesProducerInMovie'))

In [52]:
# title_avg_ratings_producers.show(2)

### Crew

In [53]:
# title_principals_filtered_crew

In [54]:
# separa os dados de nome, filme, avgRating e numVotes para todos crew.
title_principals_filtered_crew_full = title_principals_filtered_crew.join(title_basics_filtered, 'tconst', 'inner').select('nconst','tconst','averageRating','numVotes')

In [55]:
# para cada crew, calcula a média dele das colunas 'averageRating' e 'numVotes'
avg_ratings_votes_crew = title_principals_filtered_crew_full.groupBy('nconst') \
    .agg(avg('averageRating').alias('avgRatingCrew'), avg('numVotes').alias('avgNumVotesCrew'))

In [56]:
# Agora para cada filme, pegar a lista de crew dele
# e depois fazer a média das médias dos crew

# Adicionar as médias de cada crew no dataset de crew
title_principals_filtered_crew_with_avg = title_principals_filtered_crew.join(avg_ratings_votes_crew, 'nconst', 'inner')

In [57]:
# No DF com todos os filmes (title_basics_filtered), manter somente as entradas que possuam algum crew. 
title_basics_filtered = title_basics_filtered.join(title_principals_filtered_crew_with_avg.select('tconst').distinct(), 'tconst', 'inner')

In [58]:
# agora a coluna tconst e category não são mais necessária, então podemos removê-la e manter somente as entradas únicas
# Remover a coluna 'tconst'
crew_with_avg = title_principals_filtered_crew_with_avg.drop('tconst', 'category').dropDuplicates()

In [59]:
# agora o próximo passo é pegar a lista de todos crew de todos os filmes, 
# juntar com as médias de cada crew, para depois fazer pros filmes

# a coluna category não é relevante, removê-la e manter somente as entradas únicas
title_principals_filtered_crew = title_principals_filtered_crew.drop('category').dropDuplicates()

In [60]:
title_principals_filtered_crew = title_principals_filtered_crew.join(crew_with_avg, 'nconst', 'inner')

In [61]:
# por fim, fazer a média por filme
title_avg_ratings_crew = title_principals_filtered_crew.groupBy('tconst') \
    .agg(avg('avgRatingCrew').alias('avgRatingCrewInMovie'), avg('avgNumVotesCrew').alias('avgNumVotesCrewInMovie'))

In [62]:
# title_avg_ratings_crew.show(2)

## Juntar os dados das médias no df de titulos

In [63]:
# title_basics_filtered.show(2)

In [64]:
title_basics_filtered_final = title_basics_filtered.join(title_avg_ratings_actors, 'tconst', 'inner')
title_basics_filtered_final = title_basics_filtered_final.join(title_avg_ratings_producers, 'tconst', 'inner')
title_basics_filtered_final = title_basics_filtered_final.join(title_avg_ratings_crew, 'tconst', 'inner')

In [65]:
# title_basics_filtered_final.show()

## Analisar titulos localizados

In [66]:
title_akas = spark.read.load('../Data/title.akas.tsv', format='csv', sep='\t', inferSchema=True, header=True)

In [67]:
# title_akas.show()

In [68]:
# Obter os 'tconst' do DataFrame movies
# tconst_movies contém os IDs de filmes já selecionados
tconst_movies = title_basics_filtered_final.select('tconst')

In [69]:
# filtrar somente as entradas de titulos que estamos considerando
title_akas_filtered = title_akas.join(tconst_movies, title_akas['titleId'] == tconst_movies['tconst'], 'inner')

In [70]:
# title_akas_filtered.show()

In [71]:
title_akas_filtered_translations = title_akas_filtered.groupBy('tconst').count().withColumnRenamed('count', 'numberOfTranslations')

In [72]:
# title_akas_filtered_translations.show()

## Agregar o resto do dataset de features

In [73]:
titles_features_complete = title_basics_filtered_final.join(title_akas_filtered_translations, 'tconst', 'inner')
titles_features_complete = titles_features_complete.join(dummies, 'tconst', 'inner')

In [74]:
# titles_features_complete.count()

In [75]:
titles_features_complete.show(2)

## Liberar memoria

In [76]:
title_ratings.unpersist()
title_basics.unpersist()
title_basics_filtered.unpersist()
title_basics_filtered_final.unpersist()
dummies.unpersist()
title_avg_ratings_actors.unpersist()
title_avg_ratings_crew.unpersist()
title_avg_ratings_producers.unpersist()
title_principals.unpersist()
title_principals_filtered.unpersist()
title_principals_filtered_actors.unpersist()
title_principals_filtered_actors_full.unpersist()
title_principals_filtered_actors_with_avg.unpersist()
title_principals_filtered_crew.unpersist()
title_principals_filtered_crew_full.unpersist()
title_principals_filtered_crew_with_avg.unpersist()
title_principals_filtered_producers.unpersist()
title_principals_filtered_producers_full.unpersist()
title_principals_filtered_producers_with_avg.unpersist()
title_akas_filtered_translations.unpersist()

DataFrame[tconst: string, numberOfTranslations: bigint]

## Salvar os dados selecionados

In [77]:
# Reduzir o número de partições para 1 antes de salvar
titles_features_complete = titles_features_complete.coalesce(1)

In [None]:
# Salvar como CSV
titles_features_complete.write.mode('overwrite').option('header', 'true').csv('../Data/movies.features.complete.csv')

Py4JJavaError: An error occurred while calling o360.csv.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 142.0 failed 1 times, most recent failure: Lost task 0.0 in stage 142.0 (TID 250) (DESKTOP-OM1DEKF executor driver): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/c:/Users/pichau/Desktop/BigData e Aplicacoes/IMDB-BigData-Spark/Data/movies.features.complete.csv.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:774)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 65536 bytes of memory, got 0.
	at org.apache.spark.errors.SparkCoreErrors$.outOfMemoryError(SparkCoreErrors.scala:467)
	at org.apache.spark.errors.SparkCoreErrors.outOfMemoryError(SparkCoreErrors.scala)
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:415)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:449)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:487)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage800.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage800.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.hashAgg_doAggregateWithKeys_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage805.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage805.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage810.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage810.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/c:/Users/pichau/Desktop/BigData e Aplicacoes/IMDB-BigData-Spark/Data/movies.features.complete.csv.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:774)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 65536 bytes of memory, got 0.
	at org.apache.spark.errors.SparkCoreErrors$.outOfMemoryError(SparkCoreErrors.scala:467)
	at org.apache.spark.errors.SparkCoreErrors.outOfMemoryError(SparkCoreErrors.scala)
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:415)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:449)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:487)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage800.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage800.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.hashAgg_doAggregateWithKeys_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage804.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage805.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage805.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage810.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage810.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 17 more
