<a href="https://colab.research.google.com/github/GrunCrow/MUICE_UCO_Code/blob/main/1%C2%BA%20Cuatrimestre/%5BBDA%5D%20-%20Introducci%C3%B3n%20al%20Big%20Data%20An%C3%A1lisis/Pr%C3%A1cticas/Bloque%202/01_PySpark_dataframes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# <b>Introducción a DataFrames en Spark (Python)</b>
## <i>Big Data Analytics</i>

Curso 2023/24

Prof. *Dr. José Raúl Romero Salguero*

---



Vamos a ver en este notebook los conceptos básicos de manejo de conjuntos de datos con *PySpark*

---

# **Instalación del entorno**
## Instalación de Hadoop

Instalamos la versión de Hadoop/Spark 3.2.4
Se puede visitar el sitio de Apache Spark para descargar otra versión, siempre que se trate de una versión estable y con mantenimiento en Apache:

https://spark.apache.org/downloads.html

Se configuran posteriormente las variables de entorno `JAVA_HOME` y `SPARK_HOME`

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

La descarga de Hadoop puede tomar su tiempo, según la conexión disponible. En caso de que fallara, se puede descargar el archivo de Moodle y subirlo al espacio de ejecución. Se borra posteriormente de la máquina virtual el archivo `.tgz`

In [2]:
# Descomentar las líneaa según la necesidad
!wget https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar -xf spark-3.2.4-bin-hadoop3.2.tgz
#!rm spark-3.2.4-bin-hadoop3.2.tgz

--2024-01-24 15:20:14--  https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 301183180 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.4-bin-hadoop3.2.tgz’


2024-01-24 15:20:32 (16.7 MB/s) - ‘spark-3.2.4-bin-hadoop3.2.tgz’ saved [301183180/301183180]



## Instalación e iniciación de la sesión de Spark

* Buscamos la librería `findspark` con `pip install`


In [3]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


* Con `SparkSession` inicializamos

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Spark_Dataframes")\
        .getOrCreate()

In [5]:
spark

# **Lectura del dataset**


* Para acceder al dataset, **solo si está en Google Drive**, debemos montar la unidad de Google Drive. Igualmente, es importante activarlo con el botón del menú de la izquierda.

> Es posible que pida autenticación y autorización para acceder a Google Drive.

In [6]:
## Utilizar esta celda SOLO si el dataset lo tenemos en Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


* Leemos en spark el dataset (formato csv)

In [8]:
# Descomentar la siguiente línea si el dataset se ha descargado de una ruta de Google Drive (actualizar ruta)
# OJO: Comentar las dos últimas líneas !wget, spark.read si se descomenta esta.
##ds = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/Material/Data/weblog.csv')

# Si subimos el CSV directamente al espacio de la máquina virtual, lo cargamos directamente
ds = spark.read.csv('weblog.csv')

# Recuerda que los ficheros de este espacio de almacenamiento desaparecen cuando finaliza la ejecución de la máquina

* Realizamos varias operaciones básicas sobre la estructura, como:
  * Imprimir el esquema con `printSchema`, incluyendo el nombre y tipo
  * Mostrar las 5 primeras filas con `show`
  * Contar el número de filas (tamaño del dataset) con `count`

También podríamos obtener el nombre de la columna `i` con `.columns[i]`. De forma genérica, se le nombrará como "_c" y el índice de orden `_c0`, `_c1`, ...


In [9]:
# Mostramos el esquema del dataset
ds.printSchema()
# Mostramos las 5 primeras filas
ds.show(5, False)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

+----------+---------------------+-------------------------------------+-----+
|_c0       |_c1                  |_c2                                  |_c3  |
+----------+---------------------+-------------------------------------+-----+
|IP        |Time                 |URL                                  |Staus|
|10.128.2.1|[29/Nov/2017:06:58:55|GET /login.php HTTP/1.1              |200  |
|10.128.2.1|[29/Nov/2017:06:59:02|POST /process.php HTTP/1.1           |302  |
|10.128.2.1|[29/Nov/2017:06:59:03|GET /home.php HTTP/1.1               |200  |
|10.131.2.1|[29/Nov/2017:06:59:04|GET /js/vendor/moment.min.js HTTP/1.1|200  |
+----------+---------------------+-------------------------------------+-----+
only showing top 5 rows



In [10]:
# El número total de filas - Tamaño del dataset
ds.count()

16008

## Selección de filas y operaciones SQL sobre el dataframe
Como si de SQL se tratara, podemos realizar un filtrado de las filas en base al valor de alguna de sus columnas utilizando métodos que nos devolverán un nuevo `DataFrame`.

Por ejemplo, seleccionar aquellos accesos cuya respuesta HTTP es `200 OK`:

In [11]:
ds_200 = ds.filter("_c3 = 200")
ds_200.select("_c0", "_c2").show(10, False)

+----------+---------------------------------------------+
|_c0       |_c2                                          |
+----------+---------------------------------------------+
|10.128.2.1|GET /login.php HTTP/1.1                      |
|10.128.2.1|GET /home.php HTTP/1.1                       |
|10.131.2.1|GET /js/vendor/moment.min.js HTTP/1.1        |
|10.130.2.1|GET /bootstrap-3.3.7/js/bootstrap.js HTTP/1.1|
|10.130.2.1|GET /profile.php?user=bala HTTP/1.1          |
|10.128.2.1|GET /js/jquery.min.js HTTP/1.1               |
|10.131.2.1|GET /js/chart.min.js HTTP/1.1                |
|10.131.2.1|GET /edit.php?name=bala HTTP/1.1             |
|10.131.2.1|GET /login.php HTTP/1.1                      |
|10.130.2.1|GET /login.php HTTP/1.1                      |
+----------+---------------------------------------------+
only showing top 10 rows



Como podemos ver arriba, `filter` equivale a la cláusula *WHERE* de un *SELECT*, mientras que `select` equivale a la proyección.

In [12]:
# Filtramos los códigos de respuesta en el rango 300
ds_filtrado = ds.filter("_c3 >= 300 AND _c3 < 400")
print("Hay",ds_filtrado.count(),"accesos que devolvieron código de redirección:")
# Se muestran los *distintos* códigos de redirección que encontramos
ds_filtrado.select("_c3").distinct().show(10)

Hay 4156 accesos que devolvieron código de redirección:
+---+
|_c3|
+---+
|302|
|304|
+---+



Pero, ¿cuántos accesos de cada tipo hay?

In [13]:
ds_filtrado = ds.filter("_c3 >= 300 AND _c3 < 400")
ds_filtrado.groupby("_c3").count().show(10)

+---+-----+
|_c3|count|
+---+-----+
|302| 3498|
|304|  658|
+---+-----+



## Uso de sentencias SQL

Podemos utilizar directamente SQL sobre el DataFrame. Para ello, la sesión Spark (`spark`) debe estar inicializada.

Se crea una tabla temporal en caché asociada al DataFrame sobre la que se ejecuta SQL.

Ya podemos realizar operaciones y escribir el resultado en un DataFrame, si fuera necesario.

**Ejemplo**: Obtener aquellas instancias cuyo estado es ```200```.

In [14]:
ds.createOrReplaceTempView("ds")
spark.sql("select * from ds where _c3 = 200").show(10, False)

+----------+---------------------+---------------------------------------------+---+
|_c0       |_c1                  |_c2                                          |_c3|
+----------+---------------------+---------------------------------------------+---+
|10.128.2.1|[29/Nov/2017:06:58:55|GET /login.php HTTP/1.1                      |200|
|10.128.2.1|[29/Nov/2017:06:59:03|GET /home.php HTTP/1.1                       |200|
|10.131.2.1|[29/Nov/2017:06:59:04|GET /js/vendor/moment.min.js HTTP/1.1        |200|
|10.130.2.1|[29/Nov/2017:06:59:06|GET /bootstrap-3.3.7/js/bootstrap.js HTTP/1.1|200|
|10.130.2.1|[29/Nov/2017:06:59:19|GET /profile.php?user=bala HTTP/1.1          |200|
|10.128.2.1|[29/Nov/2017:06:59:19|GET /js/jquery.min.js HTTP/1.1               |200|
|10.131.2.1|[29/Nov/2017:06:59:19|GET /js/chart.min.js HTTP/1.1                |200|
|10.131.2.1|[29/Nov/2017:06:59:30|GET /edit.php?name=bala HTTP/1.1             |200|
|10.131.2.1|[29/Nov/2017:06:59:37|GET /login.php HTTP/1.1        

**Ejemplo**: Obtener las IPs que han solicitado un recurso con extensión ```.php```y han devuelto un estado ```200```.

In [15]:
ds_sql = spark.sql("SELECT DISTINCT(_c0) FROM ds WHERE _c2 LIKE '%.php%' AND _c3 = 200")
ds_sql.show(10)

+----------+
|       _c0|
+----------+
|10.131.2.1|
|10.128.2.1|
|10.130.2.1|
|10.131.0.1|
|10.129.2.1|
+----------+



¿Cuántas IPs se han obtenido?

In [16]:
ds_sql.count()

5

# Finalización de la sesión de Spark

In [17]:
spark.stop()

# <b>Referencias</b>

Información adicional sobre:

* PySpark y el uso de Spark SQL en DataFrames: https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53

* Disponible la *Spark SQL Guide*: https://spark.apache.org/docs/latest/sql-data-sources.html

* Tutorial básico/guía de referencia de SQL: https://www.w3schools.com/sql/default.asp

* Tutorial básico/guía de referencia de Python: https://www.w3schools.com/python/