# IMAT

**Asignatura:** Arquitectura de Big Data.

In [1]:
from pyspark.sql import SparkSession
import time
import sys
import os
import pandas as pd
from datetime import datetime

### VARIABLES DE ENTORNO

In [2]:
print(sys.executable)
print(sys.version)

print("PYSPARK_PYTHON:", os.environ.get("PYSPARK_PYTHON"))
print("PYSPARK_DRIVER_PYTHON:", os.environ.get("PYSPARK_DRIVER_PYTHON"))

/usr/bin/python3.9
3.9.23 (main, Jun  4 2025, 08:55:38) 
[GCC 11.4.0]
PYSPARK_PYTHON: /usr/bin/python3.9
PYSPARK_DRIVER_PYTHON: /usr/bin/python3.9


### CREAMOS LA SESION DE SPARK

In [3]:
appdate = datetime.now().strftime("%m-%d-%Y %H:%M:%S")
appName = '-'.join(['Procesamiento', appdate])

In [4]:
# Crear sesión Spark conectando al master del cluster
spark = SparkSession.builder \
    .appName(appName) \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/26 09:32:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### LECTURA FICHERO USANDO PYTHON

In [5]:
dl = pd.read_csv('precios.csv')

In [6]:
dl

Unnamed: 0,Name,Age,Department,Salary,Avg_Salary
0,Ramesh,20,Finance,50000,40000
1,Suresh,22,Finance,50000,40000
2,Ram,28,Finance,20000,40000
3,Pradeep,22,Sales,20000,25000
4,Deep,25,Sales,30000,25000


### LECTURA FICHERO PYSPARK/ OPERACIONES

In [7]:
from pyspark.sql.window import Window as w
import pyspark.sql.functions as F

wt = w.partitionBy('Department')
df = spark.read.csv('precios.csv', inferSchema=True, header=True)
df = df.withColumn('maximo', F.max('Salary').over(wt))
df.show()

                                                                                

+-------+---+----------+------+----------+------+
|   Name|Age|Department|Salary|Avg_Salary|maximo|
+-------+---+----------+------+----------+------+
| Ramesh| 20|   Finance| 50000|     40000| 50000|
| Suresh| 22|   Finance| 50000|     40000| 50000|
|    Ram| 28|   Finance| 20000|     40000| 50000|
|Pradeep| 22|     Sales| 20000|     25000| 30000|
|   Deep| 25|     Sales| 30000|     25000| 30000|
+-------+---+----------+------+----------+------+



### PRUEBAS DE RENDIMIENTO 1

In [8]:
from pyspark.sql import SparkSession
import time

# Inicio del test
inicio = time.time()

# Crear un DataFrame de 10 millones de números
df = spark.range(0, 10_000_000)

# Simular carga: filtrar, transformar y agrupar
resultado = df \
    .withColumn("mod", (df.id % 100)) \
    .groupBy("mod") \
    .count() \
    .orderBy("mod")

# Acción: traer los resultados al driver
resultado_local = resultado.collect()

fin = time.time()

# Mostrar los resultados y tiempo
for row in resultado_local:
    print(f"{row['mod']}: {row['count']} registros")

print(f"\nTiempo total: {fin - inicio:.2f} segundos")

0: 100000 registros
1: 100000 registros
2: 100000 registros
3: 100000 registros
4: 100000 registros
5: 100000 registros
6: 100000 registros
7: 100000 registros
8: 100000 registros
9: 100000 registros
10: 100000 registros
11: 100000 registros
12: 100000 registros
13: 100000 registros
14: 100000 registros
15: 100000 registros
16: 100000 registros
17: 100000 registros
18: 100000 registros
19: 100000 registros
20: 100000 registros
21: 100000 registros
22: 100000 registros
23: 100000 registros
24: 100000 registros
25: 100000 registros
26: 100000 registros
27: 100000 registros
28: 100000 registros
29: 100000 registros
30: 100000 registros
31: 100000 registros
32: 100000 registros
33: 100000 registros
34: 100000 registros
35: 100000 registros
36: 100000 registros
37: 100000 registros
38: 100000 registros
39: 100000 registros
40: 100000 registros
41: 100000 registros
42: 100000 registros
43: 100000 registros
44: 100000 registros
45: 100000 registros
46: 100000 registros
47: 100000 registros
48

### PRUEBAS DE RENDIMIENTO 2

In [9]:
from pyspark.sql import SparkSession
import socket
import time
from collections import Counter

# Función que simula el trabajo y devuelve la IP del nodo que ejecuta la tarea
def get_ip(_):
    time.sleep(0.001)
    ip = socket.gethostbyname(socket.gethostname())
    return ip

# Crear un RDD con 10,000 elementos (simula 10,000 tareas)
rdd = sc.parallelize(range(10000))

# Mapear la función get_ip a cada elemento
ip_addresses = rdd.map(get_ip).collect()

# Contar cuántas tareas ejecutó cada IP (nodo)
counter = Counter(ip_addresses)

print("Tasks executed")
for ip, num_tasks in counter.items():
    print(f"    {num_tasks} tasks on {ip}")

[Stage 13:>                                                         (0 + 2) / 2]

Tasks executed
    10000 tasks on 172.19.0.5


                                                                                

### Finalizamos SPARK SESSION

In [10]:
spark.stop()