In [26]:
# 👉 0. Importación de librerías necesarias
import os
from pyspark.sql import SparkSession
from Bio import SeqIO
import re

In [27]:
# 👉 1. Configuración de entorno Java para Spark
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17"
os.environ["PATH"] = f"{os.environ['JAVA_HOME']}/bin:" + os.environ["PATH"]

In [28]:
# 👉 2. Crear sesión de Spark
spark = SparkSession.builder \
    .appName("busqueda_regex") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

In [29]:
# 👉 3. Ruta de tus archivos fasta
BASE_PATH = "/Users/felipeespinoza/Documents/GitHub/DIASMA2024/Tecnicas Avanzadas de Programacion/Tarea 1/Prob1/"

archivos = [
    os.path.join(BASE_PATH, "Secuenciacion1_correctedLongReads.fasta"),
    os.path.join(BASE_PATH, "Secuenciacion3_correctedLongReads.fasta"),
    os.path.join(BASE_PATH, "Secuenciacion4_correctedshortReads.fasta"),
    os.path.join(BASE_PATH, "Secuenciacion5_correctedLongReads.fasta"),
    os.path.join(BASE_PATH, "Secuenciacion6_correctedshortReads.fasta")
]

In [19]:
# 👉 4. Cargar el genoma de referencia (una vez, fuera del worker)
genoma_path = os.path.join(BASE_PATH, "Genoma_referencia.fasta")
genoma = str(next(SeqIO.parse(genoma_path, "fasta")).seq)

In [20]:
# 👉 5. Función de búsqueda con regex
def buscar(secuencia, genoma):
    return [(m.start(), m.group()) for m in re.finditer(secuencia, genoma)]

In [21]:
# 👉 6. Función a paralelizar (nota: reimporta dentro del worker)
def procesar(archivo):
    from Bio import SeqIO
    import re
    import os

    try:
        secuencia = str(next(SeqIO.parse(archivo, "fasta")).seq)
        genoma = str(next(SeqIO.parse(genoma_path, "fasta")).seq)  # lectura local en cada worker
        matches = [(m.start(), m.group()) for m in re.finditer(secuencia, genoma)]
        return (os.path.basename(archivo), len(matches), [pos for pos, _ in matches])
    except Exception as e:
        return (os.path.basename(archivo), 0, f"❌ Error: {e}")

In [22]:
# 👉 7. Procesamiento distribuido
rdd = sc.parallelize(archivos)
resultados = rdd.map(procesar).collect()

                                                                                

In [23]:
# 👉 8. Mostrar resultados
for archivo, cantidad, posiciones in resultados:
    print(f"📄 {archivo} → {cantidad} calce(s)")
    if isinstance(posiciones, list):
        print("   Posiciones:", posiciones[:5], "..." if len(posiciones) > 5 else "")
    else:
        print("   Error:", posiciones)

# spark.stop()  # puedes usar esto si estás fuera de notebooks

📄 Secuenciacion1_correctedLongReads.fasta → 0 calce(s)
   Posiciones: [] 
📄 Secuenciacion3_correctedLongReads.fasta → 0 calce(s)
   Posiciones: [] 
📄 Secuenciacion4_correctedshortReads.fasta → 0 calce(s)
   Posiciones: [] 
📄 Secuenciacion5_correctedLongReads.fasta → 0 calce(s)
   Posiciones: [] 
📄 Secuenciacion6_correctedshortReads.fasta → 0 calce(s)
   Posiciones: [] 


Secuenciacion3_correctedLongReads.fasta — Calces: 0 — Posiciones: []
Secuenciacion1_correctedLongReads.fasta — Calces: 0 — Posiciones: []
Secuenciacion5_correctedLongReads.fasta — Calces: 0 — Posiciones: []
Secuenciacion6_correctedshortReads.fasta — Calces: 0 — Posiciones: []
Secuenciacion4_correctedshortReads.fasta — Calces: 0 — Posiciones: []
