In [5]:
# ==============================================================================
# 1. IMPORTACIONES E INICIO DE SESIÓN DE SPARK
# ==============================================================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_date, count, when
from pyspark.sql.types import StringType
import re

# Detiene cualquier sesión de Spark existente para asegurar una configuración limpia
try:
    spark.stop()
except NameError:
    pass

# --- Configuración de Conexión y Spark ---
path_al_jar = "/home/user/BigData_UPAO/postgresql-42.7.3.jar"
db_host = "aws-1-us-east-2.pooler.supabase.com"
db_password = "2EOJG5w9K48yY4Qq"
db_user = "postgres.nnoedhfsidxvosrvlrqb"
db_name = "postgres"

print("Configurando e iniciando la sesión de Spark...")
spark = SparkSession.builder \
    .appName("ETL_Supabase_To_HDFS") \
    .config("spark.jars", path_al_jar) \
    .config("spark.driver.extraClassPath", path_al_jar) \
    .getOrCreate()
print("Sesión de Spark iniciada.")

Configurando e iniciando la sesión de Spark...
Sesión de Spark iniciada.


In [6]:
# ==============================================================================
# 2. LECTURA DE DATOS DESDE SUPABASE
# ==============================================================================
jdbc_url = f"jdbc:postgresql://{db_host}:6543/{db_name}"
connection_properties = {"user": db_user, "password": db_password, "driver": "org.postgresql.Driver"}
query = "(SELECT * FROM public.instagram_posts) AS posts_completos"

print("Conectando a Supabase y cargando datos...")
df_from_supabase = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
print(f"Datos cargados desde Supabase. Se leyeron {df_from_supabase.count()} registros.")

Conectando a Supabase y cargando datos...


[Stage 0:>                                                          (0 + 1) / 1]

Datos cargados desde Supabase. Se leyeron 100 registros.


                                                                                

In [7]:
df_from_supabase

DataFrame[id: string, post_id: string, username: string, caption: string, location: string, hashtags: string, mentions: string, engagement: string, created_at: timestamp]

In [8]:
# ==============================================================================
# 3. DEFINICIÓN DE LÓGICA DE PROCESAMIENTO (UDFs)
# ==============================================================================
TRANSPORT_KEYWORDS = {'aéreo': ['vuelo', 'volamos', 'avión', 'latam', 'sky airline', 'jetsmart', 'star peru', 'avioneta'],'terrestre': ['bus', 'carro', 'auto', 'camioneta', 'cruz del sur', 'movil tours', 'en carretera', 'van'],'tren': ['tren', 'perurail', 'inca rail', 'andean explorer'],'marítimo': ['bote', 'lancha', 'barco', 'navegando']}
ACCOMMODATION_KEYWORDS = {'hotel': ['hotel', 'resort', 'lodge', 'boutique'],'hostal': ['hostal', 'hostel', 'hospedaje'],'otro': ['airbnb', 'casa', 'bungalow', 'cabaña', 'campamento', 'acampamos', 'camping']}
PERUVIAN_DESTINATIONS = ['cusco', 'machu picchu', 'arequipa', 'lima', 'iquitos', 'mancora', 'paracas', 'huaraz','nazca', 'puno', 'lago titicaca', 'trujillo', 'chiclayo', 'tarapoto', 'ayacucho','cajamarca', 'chachapoyas', 'kuelap', 'gocta', 'vinicunca', 'montaña de 7 colores','huacachina', 'ica', 'ollantaytambo', 'valle sagrado', 'urubamba', 'colca', 'salkantay','choquequirao', 'tambopata', 'pacaya samiria', 'lunahuaná']

def extract_entity(text, keywords_dict):
    if text is None: return None
    text_lower = text.lower()
    for category, keywords in keywords_dict.items():
        for keyword in keywords:
            if re.search(r'\b' + re.escape(keyword) + r'\b', text_lower):
                return category
    return None

def extract_duration(text):
    if text is None: return None
    text_lower = text.lower()
    match = re.search(r'(\d+)\s*(días|noches)', text_lower)
    if match: return f"{match.group(1)} días/noches"
    if 'una semana' in text_lower or 'semana entera' in text_lower: return "1 semana"
    if 'dos semanas' in text_lower: return "2 semanas"
    if 'fin de semana' in text_lower or 'finde' in text_lower: return "fin de semana"
    if 'full day' in text_lower or 'un día' in text_lower or 'medio día' in text_lower: return "1 día (Full Day)"
    return None

def extract_location_spark(location_struct, caption):
    if location_struct and hasattr(location_struct, 'name') and location_struct.name:
        location_name = location_struct.name.split(',')[0].strip()
        if location_name.lower() not in ['peru', 'cusco', 'lima'] or len(location_struct.name.split(',')) == 1:
            return location_name
    if caption:
        text_lower = caption.lower()
        for dest in PERUVIAN_DESTINATIONS:
            if dest in text_lower:
                return dest.title()
    if location_struct and hasattr(location_struct, 'name') and location_struct.name:
        return location_struct.name.split(',')[0].strip()
    return None

extract_transport_udf = udf(lambda caption: extract_entity(caption, TRANSPORT_KEYWORDS), StringType())
extract_accommodation_udf = udf(lambda caption: extract_entity(caption, ACCOMMODATION_KEYWORDS), StringType())
extract_duration_udf = udf(extract_duration, StringType())
extract_location_udf = udf(extract_location_spark, StringType())

In [9]:
# ==============================================================================
# 4. APLICACIÓN DE TRANSFORMACIONES
# ==============================================================================
print("Aplicando transformaciones al DataFrame...")

df_processed = df_from_supabase.withColumn("lugar_viaje", extract_location_udf(col("location"), col("caption"))) \
                           .withColumn("fecha", to_date(col("created_at"))) \
                           .withColumn("medio_transporte", extract_transport_udf(col("caption"))) \
                           .withColumn("duracion", extract_duration_udf(col("caption"))) \
                           .withColumn("alojamiento", extract_accommodation_udf(col("caption")))

df_final = df_processed.select("lugar_viaje", "fecha", "medio_transporte", "duracion", "alojamiento")

Aplicando transformaciones al DataFrame...


In [10]:
# ==============================================================================
# 5. MOSTRAR RESULTADOS Y GUARDAR EN HDFS
# ==============================================================================
print("\n--- Vista Previa del Dataset Procesado ---")
df_final.show(15, truncate=False)

print("\n--- Resumen de Valores Nulos por Columna ---")
df_final.select([count(when(col(c).isNull(), c)).alias(c) for c in df_final.columns]).show()


--- Vista Previa del Dataset Procesado ---


                                                                                

+-----------+----------+----------------+----------------+-----------+
|lugar_viaje|fecha     |medio_transporte|duracion        |alojamiento|
+-----------+----------+----------------+----------------+-----------+
|Cusco      |2025-05-22|aéreo           |3 días/noches   |hotel      |
|Iquitos    |2025-08-10|aéreo           |1 semana        |hotel      |
|Lima       |2025-02-15|terrestre       |5 días/noches   |otro       |
|Cusco      |2025-09-05|tren            |4 días/noches   |NULL       |
|Lima       |2025-03-28|NULL            |fin de semana   |otro       |
|Arequipa   |2025-07-19|aéreo           |3 días/noches   |hotel      |
|Lima       |2025-04-12|terrestre       |2 días/noches   |hostal     |
|Chachapoyas|2025-06-30|aéreo           |1 semana        |hotel      |
|Nazca      |2025-10-14|aéreo           |1 día (Full Day)|NULL       |
|Arequipa   |2025-08-25|terrestre       |3 días/noches   |NULL       |
|Cusco      |2025-07-27|aéreo           |1 semana        |otro       |
|Lima 

[Stage 4:>                                                          (0 + 1) / 1]

+-----------+-----+----------------+--------+-----------+
|lugar_viaje|fecha|medio_transporte|duracion|alojamiento|
+-----------+-----+----------------+--------+-----------+
|         15|    0|              51|       5|         67|
+-----------+-----+----------------+--------+-----------+



                                                                                

In [11]:
hdfs_path_csv = "hdfs://localhost:9000/data/processed/instagram"
df_final.write.mode("overwrite").option("header", "true").csv(hdfs_path_csv)

print(f"Datos guardados como CSV en HDFS en la ruta: {hdfs_path_csv}")

[Stage 7:>                                                          (0 + 1) / 1]

Datos guardados como CSV en HDFS en la ruta: hdfs://localhost:9000/data/processed/instagram


                                                                                

In [9]:
# ==============================================================================
# 6. DETENER LA SESIÓN DE SPARK
# ==============================================================================
spark.stop()
print("Proceso completado. Sesión de Spark detenida.")

Proceso completado. Sesión de Spark detenida.
