# Setup

## Instalación de paquetes

In [1]:
!apt-get update  > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
!pip install findspark pyspark

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
!ls

sample_data  spark-3.5.3-bin-hadoop3  spark-3.5.3-bin-hadoop3.tgz


## Preparación del ambiente

In [3]:
%env PYTHONHASHSEED=1234
%env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
%env SPARK_HOME=/content/spark-3.5.3-bin-hadoop3

env: PYTHONHASHSEED=1234
env: JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
env: SPARK_HOME=/content/spark-3.5.3-bin-hadoop3


In [4]:
!ls $SPARK_HOME

bin   data	jars	    LICENSE   NOTICE  R		 RELEASE  yarn
conf  examples	kubernetes  licenses  python  README.md  sbin


In [8]:
import findspark
findspark.init("/content/spark-3.5.3-bin-hadoop3")
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("MobyDickSearch").getOrCreate()

## *Txt a buscar

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [10]:
# Leer el archivo como RDD
rdd = spark.sparkContext.textFile('drive/MyDrive/bigdata/mobydick.txt')
# Filtrar las líneas que contienen la palabra "whale"
keyword = "whale"
filtered = rdd.filter(lambda line: keyword.lower() in line.lower())
# Mapear cada línea encontrada a 1
ones = filtered.map(lambda line: 1)
# Reducir para contar la cantidad total de líneas que contienen la palabra
count = ones.reduce(lambda a, b: a + b)

print(f'Líneas que contienen la palabra "{keyword}": {count}')


Líneas que contienen la palabra "whale": 1620


## Regresión Logística
Generar datos sintéticos

In [11]:
import numpy as np

# Función para generar puntos sintéticos
def generate_point(dim=10):
    x = np.random.randn(dim)
    y = 1 if np.dot(x, np.ones(dim)) + np.random.randn() > 0 else -1
    return (x.tolist(), y)

# Generar 10,000 puntos sintéticos en dimensión 10
points_list = [generate_point(10) for _ in range(10000)]

# Crear RDD
rdd_points = spark.sparkContext.parallelize(points_list).cache()


Preparar descenso por gradiente con acumuladores

In [14]:
import numpy as np

# Función para generar puntos sintéticos
def generate_point(dim=10):
    x = np.random.randn(dim)
    y = 1 if np.dot(x, np.ones(dim)) + np.random.randn() > 0 else -1
    return (x.tolist(), y)

# Crear 10,000 puntos sintéticos
points_list = [generate_point(10) for _ in range(10000)]

# Crear RDD
rdd_points = spark.sparkContext.parallelize(points_list).cache()


Funciones auxiliares estables numéricamente

In [15]:
from pyspark import AccumulatorParam
import math

D = 10  # dimensión del vector
ITERATIONS = 10
LR = 0.1  # tasa de aprendizaje

# Inicializar vector de pesos aleatorio
w = np.random.randn(D)

# Clase acumuladora para vectores
class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return [0.0] * len(value)
    def addInPlace(self, val1, val2):
        return [v1 + v2 for v1, v2 in zip(val1, val2)]

# Registrar acumulador
grad_acc = spark.sparkContext.accumulator([0.0]*D, VectorAccumulatorParam())


Entrenar el modelo

In [23]:
# Función sigmoide con protección contra overflow
def sigmoid(z):
    if z < -20:
        return 0.0
    elif z > 20:
        return 1.0
    else:
        return 1 / (1 + math.exp(-z))

# Función de gradiente para un punto
def gradient(p):
    x, y = p
    dot = sum(w[j] * x[j] for j in range(D))
    s = (sigmoid(dot * y) - 1) * y
    return [s * x[j] for j in range(D)]

for i in range(ITERATIONS):
    grad_acc.value = [0.0]*D  # reiniciar acumulador
    rdd_points.foreach(lambda p: grad_acc.add(gradient(p)))
    w = [w[j] - LR * grad_acc.value[j] for j in range(D)]
    print(f"Iteración {i+1} completada. ||grad|| = {round(np.linalg.norm(grad_acc.value), 4)}")


Iteración 1 completada. ||grad|| = 178.3077
Iteración 2 completada. ||grad|| = 179.9626
Iteración 3 completada. ||grad|| = 182.4941
Iteración 4 completada. ||grad|| = 185.6186
Iteración 5 completada. ||grad|| = 190.6954
Iteración 6 completada. ||grad|| = 195.9549
Iteración 7 completada. ||grad|| = 208.9553
Iteración 8 completada. ||grad|| = 236.6531
Iteración 9 completada. ||grad|| = 319.7692
Iteración 10 completada. ||grad|| = 545.6071


Evaluar el modelo

In [24]:
# Función de predicción
def predict(x):
    return 1 if sum(w[j] * x[j] for j in range(D)) >= 0 else -1

# Calcular precisión
accuracy = rdd_points.map(lambda p: 1 if predict(p[0]) == p[1] else 0).mean()
print(f"Precisión en datos de entrenamiento: {accuracy:.4f}")


Precisión en datos de entrenamiento: 0.8718


## ALS

In [19]:
from pyspark.sql import Row

# Creamos un pequeño conjunto de datos de ejemplo: userId, itemId, rating
data = [
    Row(userId=0, itemId=0, rating=4.0),
    Row(userId=0, itemId=1, rating=2.0),
    Row(userId=1, itemId=1, rating=3.0),
    Row(userId=1, itemId=2, rating=4.0),
    Row(userId=2, itemId=2, rating=5.0),
    Row(userId=2, itemId=0, rating=3.0)
]

df = spark.createDataFrame(data)
df.show()


+------+------+------+
|userId|itemId|rating|
+------+------+------+
|     0|     0|   4.0|
|     0|     1|   2.0|
|     1|     1|   3.0|
|     1|     2|   4.0|
|     2|     2|   5.0|
|     2|     0|   3.0|
+------+------+------+



Entrenar el modelo ALS

In [20]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="userId",
    itemCol="itemId",
    ratingCol="rating",
    maxIter=10,
    regParam=0.1,
    rank=5,
    coldStartStrategy="drop"  # evita NaN en predicciones
)

model = als.fit(df)


Hacer predicciones

In [21]:
# Crear combinaciones usuario-producto para predecir
from pyspark.sql.functions import lit

users = df.select("userId").distinct()
items = df.select("itemId").distinct()

user_item_pairs = users.crossJoin(items)

predictions = model.transform(user_item_pairs)
predictions.show()


+------+------+----------+
|userId|itemId|prediction|
+------+------+----------+
|     0|     0|  3.837964|
|     1|     0| 2.5495687|
|     2|     0| 3.0063426|
|     0|     1| 2.0184698|
|     1|     1| 2.8155203|
|     2|     1| 2.8713424|
|     0|     2| 3.4623556|
|     1|     2|  4.003248|
|     2|     2| 4.8675914|
+------+------+----------+



Extraer recomendaciones

In [22]:
# Recomendaciones de películas para cada usuario
model.recommendForAllUsers(numItems=3).show(truncate=False)

# Recomendaciones de usuarios para cada producto
model.recommendForAllItems(numUsers=3).show(truncate=False)


+------+------------------------------------------------+
|userId|recommendations                                 |
+------+------------------------------------------------+
|0     |[{0, 3.837964}, {2, 3.4623556}, {1, 2.0184698}] |
|1     |[{2, 4.003248}, {1, 2.8155203}, {0, 2.5495687}] |
|2     |[{2, 4.8675914}, {0, 3.0063426}, {1, 2.8713424}]|
+------+------------------------------------------------+

+------+------------------------------------------------+
|itemId|recommendations                                 |
+------+------------------------------------------------+
|0     |[{0, 3.837964}, {2, 3.0063426}, {1, 2.5495687}] |
|1     |[{2, 2.8713424}, {1, 2.8155203}, {0, 2.0184698}]|
|2     |[{2, 4.8675914}, {1, 4.003248}, {0, 3.4623556}] |
+------+------------------------------------------------+

