<a href="https://colab.research.google.com/github/EderLara/Fundamentos-Big-Data/blob/main/Python_y_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



```
Explicación del Código:
  Se crea una sesión de Spark.
  Se lee el conjunto de datos desde HDFS utilizando spark.read.csv().
  Se utiliza la función lag() para acceder a la página anterior visitada por cada usuario.
  Se utiliza la función groupBy() y agg(count("*")) para contar el número de veces que se visita cada secuencia de páginas.
  Se muestran los resultados utilizando show().
  Se detiene la sesión de Spark.
```



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag, col, count

# Crear una sesión de Spark
spark = SparkSession.builder.appName("AnalisisRegistrosWeb").getOrCreate()

# Leer el conjunto de datos desde HDFS
registros_df = spark.read.csv("hdfs:///registros_web.csv", header=True, inferSchema=True)

# Identificar patrones de navegación
patrones_df = registros_df.withColumn(
    "pagina_anterior", lag("pagina_visitada", 1, "inicio").over(partitionBy="usuario_id", orderBy="fecha_acceso")
)

# Contar el número de veces que se visita cada secuencia de páginas
secuencias_df = patrones_df.groupBy("pagina_anterior", "pagina_visitada").agg(count("*").alias("conteo"))

# Mostrar los resultados
secuencias_df.show()

# Detener la sesión de Spark
spark.stop()



```
Ejemplo 1: Análisis de Datos de Ventas desde una Base de Datos SQL:

Escenario:
Tenemos datos de ventas almacenados en una base de datos MySQL.
Queremos analizar estos datos utilizando Spark para identificar patrones de ventas y tendencias.
```



In [None]:
from pyspark.sql import SparkSession

# Crear una sesión de Spark
spark = SparkSession.builder.appName("AnalisisVentasSQL").getOrCreate()

# Configuración de la conexión a la base de datos MySQL
jdbc_url = "jdbc:mysql://tu_servidor:tu_puerto/tu_base_de_datos"
jdbc_properties = {
    "user": "tu_usuario",
    "password": "tu_contraseña",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Leer los datos de la tabla de ventas
ventas_df = spark.read.jdbc(url=jdbc_url, table="ventas", properties=jdbc_properties)

# Realizar análisis de ventas (ejemplo: ventas totales por producto)
ventas_por_producto_df = ventas_df.groupBy("producto").sum("monto").withColumnRenamed("sum(monto)", "ventas_totales")

# Mostrar los resultados
ventas_por_producto_df.show()

# Detener la sesión de Spark
spark.stop()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, to_timestamp

# Crear una sesión de Spark
spark = SparkSession.builder.appName("AnalisisLogsServidor").getOrCreate()

# Leer los registros de logs desde archivos de texto
logs_df = spark.read.text("hdfs:///logs_servidor/*.log")

# Extraer información relevante utilizando expresiones regulares
logs_df = logs_df.withColumn("fecha_hora", regexp_extract("value", r"\[(.*?)\]", 1))
logs_df = logs_df.withColumn("url", regexp_extract("value", r"\"GET (.*?) HTTP\"", 1))
logs_df = logs_df.withColumn("estado", regexp_extract("value", r"\"HTTP/1.1 (\d+)", 1))

# Convertir la columna de fecha y hora a un tipo timestamp
logs_df = logs_df.withColumn("fecha_hora", to_timestamp("fecha_hora", "dd/MMM/yyyy:HH:mm:ss Z"))

# Realizar análisis (ejemplo: contar el número de accesos por URL)
accesos_por_url_df = logs_df.groupBy("url").count().withColumnRenamed("count", "num_accesos")

# Mostrar los resultados
accesos_por_url_df.show()

# Detener la sesión de Spark
spark.stop()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper, col

# Crear una sesión de Spark
spark = SparkSession.builder.appName("ETLClientes").getOrCreate()

# 1. Extracción: Leer datos desde el archivo CSV
clientes_df = spark.read.csv("clientes.csv", header=True, inferSchema=True)

# 2. Transformación: Limpiar y transformar los datos
# Convertir nombres y apellidos a mayúsculas
clientes_df = clientes_df.withColumn("nombre", upper(col("nombre")))
clientes_df = clientes_df.withColumn("apellido", upper(col("apellido")))

# Filtrar clientes activos
clientes_activos_df = clientes_df.filter(col("activo") == True)

# 3. Carga: Escribir los datos transformados en una tabla de MySQL
jdbc_url = "jdbc:mysql://tu_servidor:tu_puerto/tu_base_de_datos"
jdbc_properties = {
    "user": "tu_usuario",
    "password": "tu_contraseña",
    "driver": "com.mysql.cj.jdbc.Driver"
}

clientes_activos_df.write.jdbc(url=jdbc_url, table="clientes_limpios", mode="overwrite", properties=jdbc_properties)

# Detener la sesión de Spark
spark.stop()



```

Extracción:
spark.read.csv("clientes.csv", header=True, inferSchema=True): Lee el archivo CSV y crea un DataFrame de Spark.

Transformación:
clientes_df.withColumn("nombre", upper(col("nombre"))): Convierte la columna "nombre" a mayúsculas.
clientes_df.withColumn("apellido", upper(col("apellido"))): Convierte la columna "apellido" a mayúsculas.
clientes_df.filter(col("activo") == True): Filtra solo los clientes activos.

Carga:
clientes_activos_df.write.jdbc(...): Escribe el DataFrame transformado en la tabla "clientes_limpios" de MySQL.
mode="overwrite": Sobreescribe la tabla si ya existe.
```

