# üóÇÔ∏è SESI√ìN 1: HDFS OPERACIONES AVANZADAS
## An√°lisis y Optimizaci√≥n de HDFS

### üéØ **OBJETIVO**
Aprender operaciones avanzadas de HDFS y t√©cnicas de optimizaci√≥n

### üìã **CONTENIDO**
- An√°lisis de bloques y replicaci√≥n
- Optimizaci√≥n de almacenamiento
- Monitoreo y recuperaci√≥n

In [None]:
# üîß CONFIGURACI√ìN INICIAL
import subprocess
import os
import json
from hdfs import InsecureClient
from datetime import datetime

# Conectar a HDFS
hdfs_client = InsecureClient('http://namenode:9870')

# Crear directorio para an√°lisis
analysis_dir = '/analisis_avanzado'
hdfs_client.makedirs(analysis_dir)

print("üîó Conexi√≥n HDFS establecida")
print(f"üìÅ Directorio de an√°lisis: {analysis_dir}")

print("‚úÖ Configuraci√≥n completada")

## üìù EJERCICIO 1: An√°lisis de Bloques

Analizaremos la distribuci√≥n de bloques y replicaci√≥n en HDFS

In [None]:
# 1.1 Crear archivo de prueba grande
print("üìÑ CREANDO ARCHIVO DE PRUEBA")
print("=========================")

try:
    # Crear archivo local grande
    test_file = 'test_large.txt'
    with open(test_file, 'w') as f:
        for i in range(1000000):  # 1 mill√≥n de l√≠neas
            f.write(f"L√≠nea de prueba {i}\n")
    
    # Subir a HDFS
    hdfs_path = f'{analysis_dir}/test_large.txt'
    hdfs_client.upload(hdfs_path, test_file)
    print(f"‚úÖ Archivo subido a {hdfs_path}")
    
except Exception as e:
    print(f"‚ùå Error: {e}")

print("\n‚úÖ Archivo creado y subido")

In [None]:
# 1.2 Analizar distribuci√≥n de bloques
print("üìä ANALIZANDO DISTRIBUCI√ìN DE BLOQUES")
print("===============================")

try:
    # Obtener informaci√≥n de bloques
    result = subprocess.run(['hdfs', 'fsck', hdfs_path, '-files', '-blocks', '-locations'], 
                          capture_output=True, text=True)
    
    if result.returncode == 0:
        print("Informaci√≥n de bloques:")
        print(result.stdout)
        
        # Extraer estad√≠sticas
        lines = result.stdout.split('\n')
        for line in lines:
            if 'Total blocks' in line:
                print(f"\nüìä {line}")
            elif 'Average block size' in line:
                print(f"üìä {line}")
    else:
        print(f"‚ùå Error: {result.stderr}")
        
except Exception as e:
    print(f"‚ùå Error: {e}")

print("\n‚úÖ An√°lisis completado")

## üìù EJERCICIO 2: Optimizaci√≥n de Almacenamiento

Implementaremos t√©cnicas de optimizaci√≥n de almacenamiento

In [None]:
# 2.1 Configurar factor de replicaci√≥n
print("‚öôÔ∏è CONFIGURANDO FACTOR DE REPLICACI√ìN")
print("================================")

try:
    # Cambiar factor de replicaci√≥n
    result = subprocess.run(['hdfs', 'dfs', '-setrep', '-w', '2', hdfs_path], 
                          capture_output=True, text=True)
    
    if result.returncode == 0:
        print("‚úÖ Factor de replicaci√≥n actualizado")
        print(result.stdout)
    else:
        print(f"‚ùå Error: {result.stderr}")
        
except Exception as e:
    print(f"‚ùå Error: {e}")

print("\n‚úÖ Configuraci√≥n completada")

In [None]:
# 2.2 Implementar compresi√≥n
print("üóúÔ∏è IMPLEMENTANDO COMPRESI√ìN")
print("========================")

try:
    # Crear archivo comprimido
    compressed_path = f'{analysis_dir}/test_compressed.gz'
    
    # Comprimir archivo
    result = subprocess.run(['hdfs', 'dfs', '-copyFromLocal', '-f', test_file, compressed_path], 
                          capture_output=True, text=True)
    
    if result.returncode == 0:
        # Verificar tama√±os
        original_size = hdfs_client.status(hdfs_path)['length']
        compressed_size = hdfs_client.status(compressed_path)['length']
        
        print(f"üìä Tama√±o original: {original_size} bytes")
        print(f"üìä Tama√±o comprimido: {compressed_size} bytes")
        print(f"üìä Ratio de compresi√≥n: {original_size/compressed_size:.2f}x")
    else:
        print(f"‚ùå Error: {result.stderr}")
        
except Exception as e:
    print(f"‚ùå Error: {e}")

print("\n‚úÖ Compresi√≥n completada")

## üìù EJERCICIO 3: Monitoreo y Recuperaci√≥n

Implementaremos t√©cnicas de monitoreo y recuperaci√≥n

In [None]:
# 3.1 Monitorear estado del cluster
print("üìä MONITOREANDO ESTADO DEL CLUSTER")
print("=============================")

try:
    # Obtener m√©tricas del cluster
    result = subprocess.run(['hdfs', 'dfsadmin', '-report'], 
                          capture_output=True, text=True)
    
    if result.returncode == 0:
        # Extraer m√©tricas importantes
        lines = result.stdout.split('\n')
        metrics = {}
        
        for line in lines:
            if 'Live datanodes' in line:
                metrics['datanodes'] = line.split(':')[1].strip()
            elif 'Dead datanodes' in line:
                metrics['dead_nodes'] = line.split(':')[1].strip()
            elif 'Configured Capacity' in line:
                metrics['capacity'] = line.split(':')[1].strip()
            elif 'DFS Used' in line:
                metrics['used'] = line.split(':')[1].strip()
        
        print("üìä M√©tricas del cluster:")
        for key, value in metrics.items():
            print(f"  - {key}: {value}")
    else:
        print(f"‚ùå Error: {result.stderr}")
        
except Exception as e:
    print(f"‚ùå Error: {e}")

print("\n‚úÖ Monitoreo completado")

In [None]:
# 3.2 Implementar recuperaci√≥n
print("üîÑ IMPLEMENTANDO RECUPERACI√ìN")
print("=========================")

try:
    # Verificar integridad
    result = subprocess.run(['hdfs', 'fsck', '/', '-files', '-blocks', '-locations'], 
                          capture_output=True, text=True)
    
    if result.returncode == 0:
        print("‚úÖ Verificaci√≥n de integridad completada")
        
        # Crear punto de recuperaci√≥n
        backup_dir = f'{analysis_dir}/backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
        hdfs_client.makedirs(backup_dir)
        
        # Copiar archivos importantes
        hdfs_client.copy(hdfs_path, f'{backup_dir}/test_large.txt')
        hdfs_client.copy(compressed_path, f'{backup_dir}/test_compressed.gz')
        
        print(f"‚úÖ Punto de recuperaci√≥n creado en {backup_dir}")
    else:
        print(f"‚ùå Error: {result.stderr}")
        
except Exception as e:
    print(f"‚ùå Error: {e}")

print("\n‚úÖ Recuperaci√≥n implementada")

## üìù LIMPIEZA FINAL

Limpiaremos los recursos utilizados

In [None]:
# Limpiar recursos
print("üßπ LIMPIANDO RECURSOS")
print("===================")

try:
    # Eliminar archivos locales
    if os.path.exists(test_file):
        os.remove(test_file)
        print(f"‚úÖ Archivo local {test_file} eliminado")
    
    # Eliminar directorio de an√°lisis
    hdfs_client.delete(analysis_dir, recursive=True)
    print(f"‚úÖ Directorio {analysis_dir} eliminado de HDFS")
    
except Exception as e:
    print(f"‚ùå Error: {e}")

print("\n‚úÖ Limpieza completada")