# Tipos de fuentes de datos con **PySpark**: estructuradas, semiestructuradas y no estructuradas

**Fecha:** 2025-09-19  
**Creado con:** ChatGPT (GPT-5 Pro)

Este cuaderno reescribe los ejemplos usando **PySpark** para mostrar cómo trabajar con
datos **estructurados**, **semiestructurados** y **no estructurados** a escala.
Incluye consultas con DataFrames, aplanamiento de JSON/XML, y procesamiento básico
de texto e imágenes apoyado en Spark. Las figuras se generan con **matplotlib**.


## Requisitos

- **Java 8/11** y **PySpark 3.x** instalados.
- Para XML con el conector de Spark: paquete `com.databricks:spark-xml_2.12` (opcional, ver fallback).
- Bibliotecas de apoyo: `matplotlib`, `numpy`, `pillow` (para el ejemplo de imágenes).

```bash
# Opción rápida (entorno local o Colab)
pip install pyspark matplotlib numpy pillow
# Para leer XML (opcional)
# spark-submit --packages com.databricks:spark-xml_2.12:0.18.0 ...
```


## Arranque de la sesión de Spark

In [None]:
from pyspark.sql import SparkSession, functions as F, types as T

spark = (
    SparkSession.builder
    .appName("FuentesDeDatos_PySpark")
    .config("spark.ui.showConsoleProgress", "false")
    # Descomenta para añadir el conector XML si usas spark-submit interactivo:
    # .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.18.0")
    .getOrCreate()
)

print("Spark version:", spark.version)

---
## 1) Datos **estructurados** con PySpark


### Ejemplo: `clientes` y `ventas` (JOIN y agregaciones)

In [None]:
from pyspark.sql import Row
from pyspark.sql import functions as F, types as T
import matplotlib.pyplot as plt

# --- Definimos esquemas explícitos ---
schema_clientes = T.StructType([
    T.StructField("cliente_id", T.IntegerType(), False),
    T.StructField("nombre", T.StringType(), False),
    T.StructField("ciudad", T.StringType(), True),
    T.StructField("segmento", T.StringType(), True),
])

schema_ventas = T.StructType([
    T.StructField("venta_id", T.IntegerType(), False),
    T.StructField("fecha", T.StringType(), False),  # parsearemos a date
    T.StructField("cliente_id", T.IntegerType(), False),
    T.StructField("categoria", T.StringType(), False),
    T.StructField("monto", T.DoubleType(), False),
])

clientes_rows = [
    (101, "Ana", "Bogotá", "Retail"),
    (102, "Bruno", "Medellín", "Enterprise"),
    (103, "Carla", "Cali", "Retail"),
    (104, "Diego", "Barranquilla", "SMB"),
    (105, "Elena", "Bogotá", "Enterprise"),
    (106, "Fabio", "Medellín", "SMB"),
]

ventas_rows = [
    (1,"2025-01-10",101,"Hardware", 250.50),
    (2,"2025-01-12",102,"Software", 899.99),
    (3,"2025-01-13",103,"Servicios",120.00),
    (4,"2025-01-15",104,"Hardware", 450.00),
    (5,"2025-01-18",105,"Servicios", 89.50),
    (6,"2025-02-01",101,"Software", 1300.00),
    (7,"2025-02-03",106,"Hardware", 99.90),
    (8,"2025-02-07",105,"Servicios", 500.00),
    (9,"2025-02-08",102,"Software", 650.00),
    (10,"2025-02-15",103,"Servicios", 220.00),
    (11,"2025-03-01",101,"Hardware", 1200.00),
    (12,"2025-03-05",102,"Software", 75.00),
    (13,"2025-03-07",103,"Servicios", 180.00),
    (14,"2025-03-15",104,"Hardware", 330.00),
    (15,"2025-03-20",106,"Software", 420.00),
]

clientes = spark.createDataFrame(clientes_rows, schema_clientes)
ventas = spark.createDataFrame(ventas_rows, schema_ventas).withColumn("fecha", F.to_date("fecha"))

df = ventas.join(clientes, on="cliente_id", how="left")

# Agregación por categoría
resumen_cat = (df.groupBy("categoria").agg(F.sum("monto").alias("monto_total"))
                 .orderBy(F.desc("monto_total")))

resumen_pd = resumen_cat.toPandas()

# --- Visualización (un gráfico, sin estilos ni colores definidos) ---
plt.figure()
plt.bar(resumen_pd["categoria"], resumen_pd["monto_total"])
plt.title("Monto total por categoría (PySpark)")
plt.xlabel("Categoría")
plt.ylabel("Monto (unidades monetarias)")
plt.tight_layout()
plt.show()

df.show(10, truncate=False)

### Validación ligera de esquema y reglas

In [None]:
# Tipos observados
print("dtypes:", df.dtypes)

# Nulos por columna
nulls = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])
nulls.show()

# Dominios y reglas
dominio_cat = ["Hardware","Software","Servicios"]
out_of_domain = df.filter(~F.col("categoria").isin(dominio_cat)).count()
negativos = df.filter(F.col("monto") < 0).count()

print("Filas con categoría fuera de dominio:", out_of_domain)
print("Filas con montos negativos:", negativos)

assert out_of_domain == 0, "Se encontraron categorías fuera de dominio"
assert negativos == 0, "Se encontraron montos negativos" 

---
## 2) Datos **semiestructurados** con PySpark

**Definición.** No siguen un esquema tabular rígido, pero tienen **estructura explícita** (pares clave-valor, etiquetas, anidamiento).  
**Ejemplos.** JSON/XML de APIs, logs de eventos.  
**Operaciones.** Aplanamiento con `select`, `explode`, `from_json`, validación mediante esquemas.


### Ejemplo A: Normalizar **JSON** de eventos

In [None]:
from pyspark.sql import functions as F, types as T
import matplotlib.pyplot as plt

# Datos de ejemplo con anidamiento y campos opcionales
logs = [
    {
        "timestamp": "2025-03-01T10:00:00Z",
        "service": "auth",
        "severity": "INFO",
        "user": {"id": 101, "role": "admin"},
        "action": "login",
        "payload": {"ip": "10.0.0.1", "agent": {"browser": "Chrome", "version": "123"}},
    },
    {
        "timestamp": "2025-03-01T10:05:12Z",
        "service": "billing",
        "severity": "WARN",
        "action": "charge_failed",
        "payload": {"ip": "10.0.0.2", "error_code": "CARD_DECLINED"},
    },
    {
        "timestamp": "2025-03-01T10:06:48Z",
        "service": "auth",
        "severity": "ERROR",
        "user": {"id": 102, "role": "viewer"},
        "action": "login",
        "payload": {"ip": "10.0.0.3", "agent": {"browser": "Firefox", "version": "124"}},
    },
    {
        "timestamp": "2025-03-01T10:07:02Z",
        "service": "orders",
        "severity": "INFO",
        "user": {"id": 103, "role": "editor"},
        "action": "create_order",
        "payload": {"ip": "10.0.0.4", "items": [{"sku": "A1","qty": 2},{"sku": "B9","qty": 1}]},
    },
]

# Esquema explícito (recomendado para estabilidad)
schema_logs = T.StructType([
    T.StructField("timestamp", T.StringType(), True),
    T.StructField("service", T.StringType(), True),
    T.StructField("severity", T.StringType(), True),
    T.StructField("action", T.StringType(), True),
    T.StructField("user", T.StructType([
        T.StructField("id", T.IntegerType(), True),
        T.StructField("role", T.StringType(), True),
    ]), True),
    T.StructField("payload", T.StructType([
        T.StructField("ip", T.StringType(), True),
        T.StructField("agent", T.StructType([
            T.StructField("browser", T.StringType(), True),
            T.StructField("version", T.StringType(), True),
        ]), True),
        T.StructField("error_code", T.StringType(), True),
        T.StructField("items", T.ArrayType(T.StructType([
            T.StructField("sku", T.StringType(), True),
            T.StructField("qty", T.IntegerType(), True),
        ])), True),
    ]), True),
])

df_logs = spark.createDataFrame(logs, schema_logs)

# Aplanamiento: seleccionamos campos útiles
flat = df_logs.select(
    "timestamp","service","severity","action",
    F.col("user.id").alias("user_id"),
    F.col("user.role").alias("user_role"),
    F.col("payload.ip").alias("ip"),
    F.col("payload.agent.browser").alias("browser"),
    F.col("payload.agent.version").alias("browser_version"),
    F.col("payload.error_code").alias("error_code"),
    F.col("payload.items").alias("items")
)

# Explode de items (mantiene eventos sin items con explode_outer)
items_flat = flat.withColumn("item", F.explode_outer("items"))                  .select("*",
                         F.col("item.sku").alias("sku"),
                         F.col("item.qty").alias("qty"))                  .drop("item","items")

items_flat.show(truncate=False)

# Métrica simple: eventos por severidad
sev = df_logs.groupBy("severity").count().orderBy(F.desc("count"))
sev_pd = sev.toPandas()

import matplotlib.pyplot as plt
plt.figure()
plt.bar(sev_pd["severity"], sev_pd["count"])
plt.title("Eventos por severidad (PySpark)")
plt.xlabel("Severidad")
plt.ylabel("Cuenta")
plt.tight_layout()
plt.show()

### Ejemplo B: Extraer campos desde **XML**

Mostramos dos rutas:
1) **Conector `spark-xml`** (si está disponible).  
2) **Fallback**: parsear XML con Python y luego **crear un DataFrame de Spark**.


In [None]:
# Guardamos un XML de ejemplo en disco
from pathlib import Path
xml_path = Path("/mnt/data/books.xml")
xml_path.parent.mkdir(parents=True, exist_ok=True)

xml_str = '''<catalog>
  <book id="b1">
    <title>Ingeniería de Datos</title>
    <author>A. Rivera</author>
    <year>2021</year>
    <price currency="USD">39.90</price>
    <tags><tag>ETL</tag><tag>Modelado</tag></tags>
  </book>
  <book id="b2">
    <title>APIs con Python</title>
    <author>L. Gómez</author>
    <year>2023</year>
    <price currency="EUR">29.00</price>
    <tags><tag>API</tag><tag>Web</tag></tags>
  </book>
  <book id="b3">
    <title>Bases NoSQL</title>
    <author>M. Torres</author>
    <year>2020</year>
    <price currency="USD">35.50</price>
    <tags><tag>NoSQL</tag><tag>Escalabilidad</tag></tags>
  </book>
</catalog>'''
xml_path.write_text(xml_str, encoding="utf-8")
print("Escrito:", xml_path)

In [None]:
# Intento 1: usando spark-xml (si está instalado)
from pyspark.sql import functions as F, types as T

df_books = None
try:
    df_xml = (spark.read.format("xml")
              .option("rowTag", "book")
              .load(str(xml_path)))
    # Campos: _attr_id para atributo id, price._value, price._attr_currency, etc. según spark-xml
    # Normalizamos columnas posibles
    cols = [c for c in df_xml.columns]
    # Selección robusta según nombres habituales de spark-xml
    df_books = df_xml.select(
        F.col("_attr_id").alias("id") if "_attr_id" in cols else F.col("id"),
        "title","author","year",
        F.col("price._value").cast("double").alias("price") if "price._value" in cols else F.col("price").cast("double"),
        F.col("price._attr_currency").alias("currency") if "price._attr_currency" in cols else F.lit(None).alias("currency"),
        F.concat_ws(", ", "tags.tag").alias("tags") if "tags" in cols else F.lit(None).alias("tags")
    )
    print("Lectura XML con spark-xml OK")
except Exception as e:
    print("spark-xml no disponible, usando fallback:", e)

if df_books is None:
    # Fallback: parseo con Python y creación de DataFrame Spark
    import xml.etree.ElementTree as ET
    root = ET.fromstring(xml_path.read_text(encoding="utf-8"))
    rows = []
    for book in root.findall("book"):
        book_id = book.attrib.get("id")
        title = book.findtext("title")
        author = book.findtext("author")
        year = int(book.findtext("year"))
        price_el = book.find("price")
        price = float(price_el.text)
        currency = price_el.attrib.get("currency")
        tags = [t.text for t in book.findall("tags/tag")]
        rows.append((book_id, title, author, year, price, currency, ", ".join(tags)))
    schema = T.StructType([
        T.StructField("id", T.StringType(), False),
        T.StructField("title", T.StringType(), True),
        T.StructField("author", T.StringType(), True),
        T.StructField("year", T.IntegerType(), True),
        T.StructField("price", T.DoubleType(), True),
        T.StructField("currency", T.StringType(), True),
        T.StructField("tags", T.StringType(), True),
    ])
    df_books = spark.createDataFrame(rows, schema)
    print("XML leído con fallback y cargado a Spark DataFrame.")

df_books.show(truncate=False)

---
## 3) Datos **no estructurados** con PySpark

**Definición.** Sin esquema predefinido; texto, binarios, multimedia.  
**Estrategias.** Para texto: tokenización, stopwords, conteos. Para binarios: lectura como `binaryFile` y UDFs.


### Ejemplo A: procesamiento básico de **texto** (tokenización + stopwords + regex)

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql import functions as F, types as T
import matplotlib.pyplot as plt

docs = [
    ("politica_seguridad.txt", '''
La política pública de seguridad busca reducir la violencia mediante estrategias 
de prevención, inteligencia y participación comunitaria. Contacto: seguridad@alcaldia.gov.
Reunión programada para 12/03/2025.
'''),
    ("informe_tecnico.txt", '''
Este informe técnico evalúa variables ambientales (temperatura y humedad) y su relación
con el funcionamiento de equipos. Responsable: carla.romero@universidad.edu.
Próxima calibración: 2025-04-15.
''')
]

df_docs = spark.createDataFrame(docs, ["filename","text"])

# Tokenización con regex (palabras)
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens_raw", pattern="\W+")
df_tok = tokenizer.transform(df_docs)

# Stopwords (lista breve en español, puedes ampliarla según necesidad)
stopwords_es = """de la el y en que los las un una para es se con por del al lo como mas o
mediante su sus este esta eso esa entre sobre a e u y/o""".split()

remover = StopWordsRemover(inputCol="tokens_raw", outputCol="tokens", stopWords=stopwords_es, caseSensitive=False)
df_clean = remover.transform(df_tok)

# Conteo de palabras (excluimos tokens puramente numéricos)
df_words = (df_clean
            .select(F.explode("tokens").alias("word"))
            .filter(~F.col("word").rlike("^[0-9]+$"))
            .groupBy("word").count()
            .orderBy(F.desc("count"))
           )

top10_pd = df_words.limit(10).toPandas()

plt.figure()
plt.bar(top10_pd["word"], top10_pd["count"])
plt.title("Top 10 palabras (PySpark)")
plt.xlabel("Palabra")
plt.ylabel("Frecuencia")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

df_words.show(20, truncate=False)

#### Extracción de patrones (regex): correos y fechas

- Opción directa: `regexp_extract` (primer match).  
- Opción avanzada: UDF que devuelve **todas** las coincidencias.


In [None]:
from pyspark.sql import functions as F, types as T
import re

email_pat = r"[\w\.-]+@[\w\.-]+"
date_pat  = r"(?:\b\d{2}/\d{2}/\d{4}\b|\b\d{4}-\d{2}-\d{2}\b)"

# Primer match por archivo (puede ser vacío si no hay coincidencia)
df_first = df_docs.select(
    "filename",
    F.regexp_extract("text", email_pat, 0).alias("email"),
    F.regexp_extract("text", date_pat, 0).alias("fecha")
)
df_first.show(truncate=False)

# Todas las coincidencias usando UDF
def find_all(pattern, text):
    return re.findall(pattern, text or "")

find_all_udf = F.udf(find_all, T.ArrayType(T.StringType()))

df_all = df_docs.select(
    "filename",
    F.explode_outer(find_all_udf(F.lit(email_pat), F.col("text"))).alias("email"),
    F.explode_outer(find_all_udf(F.lit(date_pat),  F.col("text"))).alias("fecha")
)
df_all.show(truncate=False)

### Ejemplo: **Imágenes** como binarios + UDF de histograma

- Generamos una imagen **en escala de grises** sintética (gradiente 256×256).
- La leemos con `binaryFile` y calculamos un **histograma de intensidades** con una UDF.


In [None]:
# Crear imagen sintética y guardarla
from pathlib import Path
import numpy as np
from PIL import Image

img_path = Path("/mnt/data/gradient.png")
img_path.parent.mkdir(parents=True, exist_ok=True)

arr = np.tile(np.linspace(0, 255, 256, dtype=np.uint8), (256,1))
Image.fromarray(arr, mode="L").save(img_path)
print("Imagen guardada en:", img_path)

In [None]:
# Cargar binario con Spark y calcular histograma
from pyspark.sql import functions as F, types as T
from io import BytesIO
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

df_img = spark.read.format("binaryFile").load(str(img_path))

def hist256(content: bytes):
    im = Image.open(BytesIO(content)).convert("L")
    a = np.array(im, dtype=np.uint8)
    hist, _ = np.histogram(a, bins=256, range=(0,255))
    return hist.tolist()

hist_udf = F.udf(hist256, T.ArrayType(T.IntegerType()))

hist_df = df_img.select(hist_udf("content").alias("hist"))
hist_ex = hist_df.select(F.posexplode("hist").alias("intensity","count"))

hist_pd = hist_ex.toPandas()

plt.figure()
plt.bar(hist_pd["intensity"], hist_pd["count"])
plt.title("Histograma de intensidades (imagen sintética, PySpark)")
plt.xlabel("Intensidad (0-255)")
plt.ylabel("Frecuencia")
plt.tight_layout()
plt.show()

hist_ex.orderBy("intensity").show(12)

## Cierre de la sesión

In [None]:
# Parar Spark (opcional en notebooks)
spark.stop()