# <b>Prácticas uso de Spark</b>

# **Instalación del entorno**
## Instalación de Hadoop

Instalamos la versión de Hadoop/Spark 3.2.3
Se recomienda visitar el sitio de Apache Spark para descargar la última versión estable:

https://spark.apache.org/downloads.html

Se configuran posteriormente las variables de entorno `JAVA_HOME` y `SPARK_HOME`

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

La descarga de Hadoop puede tomar su tiempo, según la conexión disponible. Se borra posteriormente de la máquina virtual el archivo `.tgz`

In [None]:
# Descomentar las líneaa según la necesidad
!wget https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
!tar -xf spark-3.2.3-bin-hadoop3.2.tgz
!rm spark-3.2.3-bin-hadoop3.2.tgz

--2023-01-22 13:06:45--  https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 301136158 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.3-bin-hadoop3.2.tgz’


2023-01-22 13:06:48 (97.1 MB/s) - ‘spark-3.2.3-bin-hadoop3.2.tgz’ saved [301136158/301136158]



## Instalación e iniciación de la sesión de Spark

* Buscamos la librería `findspark` con `pip install`


In [None]:
!pip install -q findspark

* Con `SparkSession` inicializamos

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Spark_Dataframes")\
        .getOrCreate()

In [None]:
spark

# ***MapReduce*** en Spark, Ejercicio 1 (B)



In [None]:
#Invocamos a métodos de un objeto de pyspark.context.SparkContext, que representa al contexto de ejecución.

In [None]:
from pyspark import SparkContext
sc =SparkContext.getOrCreate()

In [None]:
#Invocamos las funciones de spark para poder usarlas a futuro

In [None]:
from pyspark.sql import functions as F

In [None]:
#Cargamos el dataset con el que vamos a trabajar, y se convierte directamente a rdd

In [None]:
#weblog_rdd = sc.textFile("/content/drive/MyDrive/weblog.csv")
weblog_rdd = spark.read.format("CSV").option("header","true").load("/content/drive/MyDrive/weblog.csv")

In [None]:
#Comprobamos si se ha realizado correctamente

In [None]:
weblog_rdd.take(5)

[Row(IP='10.128.2.1', Time='[29/Nov/2017:06:58:55', URL='GET /login.php HTTP/1.1', Staus='200'),
 Row(IP='10.128.2.1', Time='[29/Nov/2017:06:59:02', URL='POST /process.php HTTP/1.1', Staus='302'),
 Row(IP='10.128.2.1', Time='[29/Nov/2017:06:59:03', URL='GET /home.php HTTP/1.1', Staus='200'),
 Row(IP='10.131.2.1', Time='[29/Nov/2017:06:59:04', URL='GET /js/vendor/moment.min.js HTTP/1.1', Staus='200'),
 Row(IP='10.130.2.1', Time='[29/Nov/2017:06:59:06', URL='GET /bootstrap-3.3.7/js/bootstrap.js HTTP/1.1', Staus='200')]

In [None]:
# Primero, se filtran las columnas con la ip y codigo 20x
weblog_rdd_filtrado = weblog_rdd.filter(F.col("URL").like("%php%"))
weblog_rdd_filtrado = weblog_rdd_filtrado.filter("Staus >= 200 AND Staus < 210")

In [None]:
#selecionamos las columnas con las que deseamos trabajar
datos_filtrados = weblog_rdd_filtrado.select("IP", "URL")

In [None]:
#compruebo los datos_filtrados

In [None]:
datos_filtrados.take(5)

[Row(IP='10.128.2.1', URL='GET /login.php HTTP/1.1'),
 Row(IP='10.128.2.1', URL='GET /home.php HTTP/1.1'),
 Row(IP='10.130.2.1', URL='GET /profile.php?user=bala HTTP/1.1'),
 Row(IP='10.131.2.1', URL='GET /edit.php?name=bala HTTP/1.1'),
 Row(IP='10.131.2.1', URL='GET /login.php HTTP/1.1')]

In [None]:
#tranformo el dataset a rdd y realizo mapeo 

In [None]:
rddObj=datos_filtrados.rdd
tuplas_rddObj = rddObj.map(lambda x: (x, 1))
tuplas_rddObj

PythonRDD[258] at RDD at PythonRDD.scala:53

In [None]:
#compruebo si se realiza correctamente

In [None]:
tuplas_rddObj.take(10)

[(Row(IP='10.128.2.1', URL='GET /login.php HTTP/1.1'), 1),
 (Row(IP='10.128.2.1', URL='GET /home.php HTTP/1.1'), 1),
 (Row(IP='10.130.2.1', URL='GET /profile.php?user=bala HTTP/1.1'), 1),
 (Row(IP='10.131.2.1', URL='GET /edit.php?name=bala HTTP/1.1'), 1),
 (Row(IP='10.131.2.1', URL='GET /login.php HTTP/1.1'), 1),
 (Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1'), 1),
 (Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1'), 1),
 (Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1'), 1),
 (Row(IP='10.131.0.1', URL='GET /home.php HTTP/1.1'), 1),
 (Row(IP='10.131.0.1', URL='GET /contestproblem.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'),
  1)]

In [None]:
#agrupo por clave para reducir

In [None]:
tuplas_rddObj = tuplas_rddObj.reduceByKey(lambda x,y: x + y)

In [None]:
#muestro para comprobar el funcionamiento

In [None]:
tuplas_rddObj.take(10)

[(Row(IP='10.128.2.1', URL='GET /login.php HTTP/1.1'), 987),
 (Row(IP='10.128.2.1', URL='GET /home.php HTTP/1.1'), 90),
 (Row(IP='10.130.2.1', URL='GET /profile.php?user=bala HTTP/1.1'), 1),
 (Row(IP='10.131.2.1', URL='GET /edit.php?name=bala HTTP/1.1'), 1),
 (Row(IP='10.131.2.1', URL='GET /login.php HTTP/1.1'), 171),
 (Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1'), 959),
 (Row(IP='10.131.0.1', URL='GET /home.php HTTP/1.1'), 113),
 (Row(IP='10.131.0.1', URL='GET /contestproblem.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'),
  88),
 (Row(IP='10.131.2.1', URL='GET /contestproblem.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'),
  79),
 (Row(IP='10.131.2.1', URL='GET /login.php?value=fail HTTP/1.1'), 8)]

In [None]:
#una vez agrupado, finalmente se agrupa por página únicamente para obtener el resultado pedido

In [None]:
#primero transformamos en lista

In [None]:
tuplas_lista = tuplas_rddObj.collect()

In [None]:
#copiamos los elementos de la lista en una nueva lista con aquellos valores que interesan (eliminamos la agrupación de valores y nos quedamos con la clave)

In [None]:
tuplas_lista_reducida = []
for i in range(len(tuplas_lista)):
  tuplas_lista_reducida.append(tuplas_lista[i][0])

print(tuplas_lista_reducida)


[Row(IP='10.128.2.1', URL='GET /login.php HTTP/1.1'), Row(IP='10.128.2.1', URL='GET /home.php HTTP/1.1'), Row(IP='10.130.2.1', URL='GET /profile.php?user=bala HTTP/1.1'), Row(IP='10.131.2.1', URL='GET /edit.php?name=bala HTTP/1.1'), Row(IP='10.131.2.1', URL='GET /login.php HTTP/1.1'), Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1'), Row(IP='10.131.0.1', URL='GET /home.php HTTP/1.1'), Row(IP='10.131.0.1', URL='GET /contestproblem.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'), Row(IP='10.131.2.1', URL='GET /contestproblem.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'), Row(IP='10.131.2.1', URL='GET /login.php?value=fail HTTP/1.1'), Row(IP='10.131.2.1', URL='GET /home.php HTTP/1.1'), Row(IP='10.130.2.1', URL='GET /countdown.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'), Row(IP='10.128.2.1', URL='GET /countdown.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'), Row(IP='10.128.2.1', URL='GET /compiler.php HTTP/1.1'), Row(IP='10.129.2.1', UR

In [None]:
#transformo la lista a un dataframe de spark

In [None]:
rdd_tuplas_lista_reducida = spark.createDataFrame(data=tuplas_lista_reducida)

In [None]:
#selecionamos las columnas con las que deseamos trabajar
datos_filtrados = rdd_tuplas_lista_reducida.select("URL")

In [None]:
#tranformo el dataset a rdd y realizo mapeo 

In [None]:
rddObj=datos_filtrados.rdd
tuplas_rddObj = rddObj.map(lambda x: (x, 1))
tuplas_rddObj

PythonRDD[275] at RDD at PythonRDD.scala:53

In [None]:
#compruebo si se realiza correctamente

In [None]:
tuplas_rddObj.take(10)

[(Row(URL='GET /login.php HTTP/1.1'), 1),
 (Row(URL='GET /home.php HTTP/1.1'), 1),
 (Row(URL='GET /profile.php?user=bala HTTP/1.1'), 1),
 (Row(URL='GET /edit.php?name=bala HTTP/1.1'), 1),
 (Row(URL='GET /login.php HTTP/1.1'), 1),
 (Row(URL='GET /login.php HTTP/1.1'), 1),
 (Row(URL='GET /home.php HTTP/1.1'), 1),
 (Row(URL='GET /contestproblem.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'),
  1),
 (Row(URL='GET /contestproblem.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'),
  1),
 (Row(URL='GET /login.php?value=fail HTTP/1.1'), 1)]

In [None]:
#agrupo por clave para reducir y mostrar cuanto se ha visitado cada página

In [None]:
tuplas_rddObj = tuplas_rddObj.reduceByKey(lambda x,y: x + y)

In [None]:
#muestro el resultado que se ha pedido en el ejercicio

In [None]:
tuplas_rddObj.collect()[:10]

[(Row(URL='GET /home.php HTTP/1.1'), 5),
 (Row(URL='GET /profile.php?user=bala HTTP/1.1'), 1),
 (Row(URL='GET /edit.php?name=bala HTTP/1.1'), 1),
 (Row(URL='GET /login.php?value=fail HTTP/1.1'), 5),
 (Row(URL='GET /countdown.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'),
  5),
 (Row(URL='GET /compiler.php HTTP/1.1'), 5),
 (Row(URL='GET /details.php?id=44 HTTP/1.1'), 5),
 (Row(URL='GET /contestproblem.php?name=RUET%20OJ%20TLE%20Testing%20Contest HTTP/1.1'),
  5),
 (Row(URL='GET /allsubmission.php HTTP/1.1'), 5),
 (Row(URL='GET /showcode.php?id=281&nm=shawon HTTP/1.1'), 4)]

# ***MapReduce*** en Spark, Ejercicio 2 (D)



In [None]:
#Invocamos a métodos de un objeto de pyspark.context.SparkContext, que representa al contexto de ejecución.

In [None]:
from pyspark import SparkContext
sc =SparkContext.getOrCreate()

In [None]:
#Invocamos las funciones de spark para poder usarlas a futuro

In [None]:
from pyspark.sql import functions as F

In [None]:
#Cargamos el dataset con el que vamos a trabajar, y se convierte directamente a rdd

In [None]:
#weblog_rdd = sc.textFile("/content/drive/MyDrive/weblog.csv")
weblog_rdd = spark.read.format("CSV").option("header","true").load("/content/drive/MyDrive/weblog.csv")

In [None]:
#Comprobamos si se ha realizado correctamente

In [None]:
weblog_rdd.take(5)

[Row(IP='10.128.2.1', Time='[29/Nov/2017:06:58:55', URL='GET /login.php HTTP/1.1', Staus='200'),
 Row(IP='10.128.2.1', Time='[29/Nov/2017:06:59:02', URL='POST /process.php HTTP/1.1', Staus='302'),
 Row(IP='10.128.2.1', Time='[29/Nov/2017:06:59:03', URL='GET /home.php HTTP/1.1', Staus='200'),
 Row(IP='10.131.2.1', Time='[29/Nov/2017:06:59:04', URL='GET /js/vendor/moment.min.js HTTP/1.1', Staus='200'),
 Row(IP='10.130.2.1', Time='[29/Nov/2017:06:59:06', URL='GET /bootstrap-3.3.7/js/bootstrap.js HTTP/1.1', Staus='200')]

In [None]:
# Primero, se filtran las columnas con la ip y codigo 20x
weblog_rdd_filtrado = weblog_rdd.filter(F.col("URL").like("%php%"))

In [None]:
#selecionamos las columnas con las que deseamos trabajar
datos_filtrados = weblog_rdd_filtrado.select("IP", "URL")

In [None]:
#compruebo los datos_filtrados

In [None]:
datos_filtrados.take(5)

[Row(IP='10.128.2.1', URL='GET /login.php HTTP/1.1'),
 Row(IP='10.128.2.1', URL='POST /process.php HTTP/1.1'),
 Row(IP='10.128.2.1', URL='GET /home.php HTTP/1.1'),
 Row(IP='10.130.2.1', URL='GET /profile.php?user=bala HTTP/1.1'),
 Row(IP='10.131.2.1', URL='GET /edit.php?name=bala HTTP/1.1')]

In [None]:
#tranformo el dataset a rdd y realizo mapeo 

In [None]:
rddObj=datos_filtrados.rdd
tuplas_rddObj = rddObj.map(lambda x: (x, 1))
tuplas_rddObj

PythonRDD[224] at RDD at PythonRDD.scala:53

In [None]:
#compruebo si se realiza correctamente

In [None]:
tuplas_rddObj.take(10)

[(Row(IP='10.128.2.1', URL='GET /login.php HTTP/1.1'), 1),
 (Row(IP='10.128.2.1', URL='POST /process.php HTTP/1.1'), 1),
 (Row(IP='10.128.2.1', URL='GET /home.php HTTP/1.1'), 1),
 (Row(IP='10.130.2.1', URL='GET /profile.php?user=bala HTTP/1.1'), 1),
 (Row(IP='10.131.2.1', URL='GET /edit.php?name=bala HTTP/1.1'), 1),
 (Row(IP='10.131.2.1', URL='GET /logout.php HTTP/1.1'), 1),
 (Row(IP='10.131.2.1', URL='GET /login.php HTTP/1.1'), 1),
 (Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1'), 1),
 (Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1'), 1),
 (Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1'), 1)]

In [None]:
#agrupo por clave para reducir

In [None]:
tuplas_rddObj = tuplas_rddObj.reduceByKey(lambda x,y: x + y)

In [None]:
#muestro para comprobar el funcionamiento

In [None]:
tuplas_rddObj.take(10)

[(Row(IP='10.128.2.1', URL='GET /login.php HTTP/1.1'), 991),
 (Row(IP='10.128.2.1', URL='POST /process.php HTTP/1.1'), 65),
 (Row(IP='10.128.2.1', URL='GET /home.php HTTP/1.1'), 872),
 (Row(IP='10.130.2.1', URL='GET /profile.php?user=bala HTTP/1.1'), 1),
 (Row(IP='10.131.2.1', URL='GET /edit.php?name=bala HTTP/1.1'), 1),
 (Row(IP='10.131.2.1', URL='GET /logout.php HTTP/1.1'), 9),
 (Row(IP='10.131.2.1', URL='GET /login.php HTTP/1.1'), 173),
 (Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1'), 959),
 (Row(IP='10.129.2.1', URL='POST /process.php HTTP/1.1'), 44),
 (Row(IP='10.131.0.1', URL='GET /home.php HTTP/1.1'), 788)]

In [None]:
#una vez agrupado, solo faltaría calcula la frecuencia de cada uno para cada página

In [None]:
#Primero, habría que revertir el orden 

In [None]:
def revertir_tupla(par):
  return (par[1], par[0])

# Se revierte la clave y el valor para poder ordenar por clave
secuencias = tuplas_rddObj.map(revertir_tupla)

# Se ordena por clave (descendiente: False)
secuencias_ord = secuencias.sortByKey(False)
secuencias_ord.take(5)

[(991, Row(IP='10.128.2.1', URL='GET /login.php HTTP/1.1')),
 (979, Row(IP='10.131.0.1', URL='GET /login.php HTTP/1.1')),
 (959, Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1')),
 (872, Row(IP='10.128.2.1', URL='GET /home.php HTTP/1.1')),
 (788, Row(IP='10.131.0.1', URL='GET /home.php HTTP/1.1'))]

In [None]:
# Primero, calculamos el total de accesos
total = secuencias_ord.keys().sum()

# Aplicamos el cálculo de la frecuencia con comprensión de lista (list comprehension)
secuencias = [(sec[0]/total,sec[1]) for sec in secuencias_ord.collect()]

#mostramos el resultado pedido por el ejercicio
print("Número total de accesos de cada cliente a recursos de cada servidor.: ",total)
secuencias[:10]

Número total de accesos de cada cliente a recursos de cada servidor.:  9499


[(0.1043267712390778, Row(IP='10.128.2.1', URL='GET /login.php HTTP/1.1')),
 (0.10306348036635435, Row(IP='10.131.0.1', URL='GET /login.php HTTP/1.1')),
 (0.10095799557848195, Row(IP='10.130.2.1', URL='GET /login.php HTTP/1.1')),
 (0.09179913675123698, Row(IP='10.128.2.1', URL='GET /home.php HTTP/1.1')),
 (0.08295610064217286, Row(IP='10.131.0.1', URL='GET /home.php HTTP/1.1')),
 (0.08253500368459837, Row(IP='10.130.2.1', URL='GET /home.php HTTP/1.1')),
 (0.01915991156963891, Row(IP='10.129.2.1', URL='GET /login.php HTTP/1.1')),
 (0.018212443415096327, Row(IP='10.131.2.1', URL='GET /login.php HTTP/1.1')),
 (0.0108432466575429, Row(IP='10.129.2.1', URL='GET /home.php HTTP/1.1')),
 (0.01073797241814928,
  Row(IP='10.131.2.1', URL='GET /contestproblem.php?name=RUET%20OJ%20Server%20Testing%20Contest HTTP/1.1'))]