# Contador de Palavras com PySpark
Este notebook demonstra como usar PySpark para contar palavras em arquivos de texto armazenados no HDFS.

In [1]:
import findspark
from pyspark.sql.session import SparkSession
from pyspark import SparkContext, SparkConf

## Inicializando Spark

In [2]:
# Parar qualquer SparkContext existente
try:
    spark.stop()
    print("SparkContext anterior finalizado")
except NameError:
    print("Nenhum SparkContext anterior encontrado")
except Exception as e:
    print(f"Erro ao finalizar SparkContext: {e}")

# Inicializando Spark
findspark.init("/usr/spark-3.5.0/")

# Configura√ß√£o para usar YARN e HDFS
spark = (
    SparkSession.builder.appName("contagem_palavras_lab6")
    .master("yarn")  # Usar YARN agora que est√° funcionando
    .config("spark.sql.warehouse.dir", "hdfs://spark-master:9000/tmp/hive")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.hadoop.fs.defaultFS", "hdfs://spark-master:9000")
    .getOrCreate()
)

# criar um contexto de sess√£o do spark (cria um "programa")
sc = spark.sparkContext

print(f"‚úÖ Spark iniciado com sucesso!")
print(f"Spark Version: {spark.version}")
print(f"Spark Master: {sc.master}")
print(f"App Name: {sc.appName}")
print(f"Default FS: {sc._jsc.hadoopConfiguration().get('fs.defaultFS')}")

Nenhum SparkContext anterior encontrado


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/05 14:49:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/05 14:49:10 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


‚úÖ Spark iniciado com sucesso!
Spark Version: 3.5.0
Spark Master: yarn
App Name: contagem_palavras_lab6
Default FS: hdfs://spark-master:9000


## Fun√ß√£o Principal - Contagem de Palavras

In [6]:
def main():
    print("Iniciando an√°lise de palavras...")
    
    try:
        # Lendo arquivos do HDFS agora que est√° funcionando
        print("Lendo arquivos do HDFS...")
        hdfs_path = "hdfs://spark-master:9000/datasets/*.txt"
        
        # Leitura dos arquivos de texto pelo programa spark
        print(f"Lendo arquivos: {hdfs_path}")
        text_files = sc.textFile(hdfs_path)
        
        # Verificar se conseguiu ler algum arquivo
        file_count = text_files.getNumPartitions()
        print(f"N√∫mero de parti√ß√µes: {file_count}")
        
        # Transforma√ß√£o: dividir linhas em palavras e limpar
        print("Processando texto...")
        words = (text_files
                .flatMap(lambda line: line.split())
                .filter(lambda word: len(word) > 2)  # Filtrar palavras muito pequenas
                .map(lambda word: ''.join(c.lower() for c in word if c.isalpha()))  # Apenas letras
                .filter(lambda word: len(word) > 0))  # Remover strings vazias
        
        # Contagem de palavras
        print("Contando palavras...")
        word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
        
        # Coletar estat√≠sticas
        total_words = words.count()
        unique_words = word_counts.count()
        
        print(f"\nüìä Estat√≠sticas:")
        print(f"Total de palavras processadas: {total_words:,}")
        print(f"Palavras √∫nicas: {unique_words:,}")
        
        # Mostrar as 15 palavras mais comuns
        print("\nüîù Top 15 palavras mais comuns:")
        top_words = word_counts.takeOrdered(15, key=lambda x: -x[1])
        for i, (word, count) in enumerate(top_words, 1):
            print(f"  {i:2d}. {word:<15} : {count:>6,}")
        
        # Salvar resultado no HDFS
        output_path = "hdfs://spark-master:9000/datasets_processed/word_count_result"
        print(f"\nüíæ Salvando resultado em: {output_path}")
        
        # Remover diret√≥rio se existir
        try:
            sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration()).delete(
                sc._jvm.org.apache.hadoop.fs.Path(output_path), True)
        except:
            pass
        
        # Salvar resultado
        word_counts.saveAsTextFile(output_path)
        print("‚úÖ Resultado salvo com sucesso no HDFS!")
        
        # Verificar arquivos salvos
        print("\nüìÅ Verificando arquivos salvos:")
        saved_files = sc.textFile(output_path + "/part-*")
        print(f"Linhas salvas: {saved_files.count()}")
        
        return True
        
    except Exception as e:
        print(f"‚ùå Erro durante o processamento: {str(e)}")
        import traceback
        traceback.print_exc()
        return False

# Executar a an√°lise
if __name__ == "__main__":
    success = main()
    if success:
        print("\nüéâ An√°lise conclu√≠da com sucesso!")
    else:
        print("\nüòû An√°lise falhou. Verifique os logs acima.")

Iniciando an√°lise de palavras...
Lendo arquivos do HDFS...
Lendo arquivos: hdfs://spark-master:9000/datasets/*.txt
N√∫mero de parti√ß√µes: 5
Processando texto...
Contando palavras...


                                                                                


üìä Estat√≠sticas:
Total de palavras processadas: 614,956
Palavras √∫nicas: 33,976

üîù Top 15 palavras mais comuns:
   1. the             : 36,279
   2. and             : 21,402
   3. that            : 10,899
   4. was             :  9,166
   5. his             :  8,155
   6. her             :  7,340
   7. you             :  7,174
   8. with            :  6,804
   9. not             :  6,489
  10. had             :  6,323
  11. for             :  6,026
  12. but             :  5,830
  13. she             :  5,726
  14. have            :  4,254
  15. him             :  4,168

üíæ Salvando resultado em: hdfs://spark-master:9000/datasets_processed/word_count_result


                                                                                

‚úÖ Resultado salvo com sucesso no HDFS!

üìÅ Verificando arquivos salvos:
Linhas salvas: 33976

üéâ An√°lise conclu√≠da com sucesso!


## Verificando Resultado Salvo
Vamos verificar o que foi salvo no HDFS:

## Executando a contagem de palavras

In [7]:
# Verificar se o Spark est√° funcionando antes de executar
if 'spark' in globals() and 'sc' in globals():
    print("Executando an√°lise de palavras...")
    main()
else:
    print("‚ùå Spark n√£o est√° inicializado corretamente. Execute a c√©lula de inicializa√ß√£o primeiro.")

Executando an√°lise de palavras...
Iniciando an√°lise de palavras...
Lendo arquivos do HDFS...
Lendo arquivos: hdfs://spark-master:9000/datasets/*.txt
N√∫mero de parti√ß√µes: 5
Processando texto...
Contando palavras...


                                                                                


üìä Estat√≠sticas:
Total de palavras processadas: 614,956
Palavras √∫nicas: 33,976

üîù Top 15 palavras mais comuns:
   1. the             : 36,279
   2. and             : 21,402
   3. that            : 10,899
   4. was             :  9,166
   5. his             :  8,155
   6. her             :  7,340
   7. you             :  7,174
   8. with            :  6,804
   9. not             :  6,489
  10. had             :  6,323
  11. for             :  6,026
  12. but             :  5,830
  13. she             :  5,726
  14. have            :  4,254
  15. him             :  4,168

üíæ Salvando resultado em: hdfs://spark-master:9000/datasets_processed/word_count_result


                                                                                

‚úÖ Resultado salvo com sucesso no HDFS!

üìÅ Verificando arquivos salvos:
Linhas salvas: 33976


## Finalizando Spark

In [8]:
# Finalizar Spark de forma segura
try:
    if 'spark' in globals():
        spark.stop()
        print("‚úÖ Spark finalizado com sucesso!")
    else:
        print("‚ö†Ô∏è  Nenhuma sess√£o Spark ativa encontrada")
except Exception as e:
    print(f"‚ö†Ô∏è  Erro ao finalizar Spark: {e}")

‚úÖ Spark finalizado com sucesso!


In [9]:
# Verificar resultado salvo no HDFS
try:
    result_path = "hdfs://spark-master:9000/datasets_processed/word_count_result"
    print(f"üìÇ Verificando arquivos em: {result_path}")
    
    # Ler resultado salvo
    saved_results = sc.textFile(result_path + "/part-*")
    
    print(f"\nüìä Total de linhas no resultado: {saved_results.count()}")
    print("\nüîç Primeiras 10 linhas do resultado:")
    
    for i, line in enumerate(saved_results.take(10), 1):
        print(f"  {i:2d}. {line}")
        
except Exception as e:
    print(f"‚ùå Erro ao verificar resultado: {e}")

üìÇ Verificando arquivos em: hdfs://spark-master:9000/datasets_processed/word_count_result
‚ùå Erro ao verificar resultado: 'NoneType' object has no attribute 'sc'


In [10]:
# Finalizar SparkContext (boa pr√°tica)
print("\nüîÑ Finalizando Spark...")
try:
    spark.stop()
    print("‚úÖ SparkContext finalizado com sucesso!")
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao finalizar: {e}")


üîÑ Finalizando Spark...
‚úÖ SparkContext finalizado com sucesso!
