#Ejercicio 5


 Con el fin de detectar posibles ataques o errores, queremos conocer el número de respuestas no
% correctas que se han devuelto a los clientes que han recibido al menos un error. [Opcional]

## Instalación de Hadoop

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

In [None]:
!wget https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar -xf spark-3.2.4-bin-hadoop3.2.tgz
!rm spark-3.2.4-bin-hadoop3.2.tgz

--2025-01-19 19:24:45--  https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 301183180 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.4-bin-hadoop3.2.tgz’


2025-01-19 19:24:58 (22.2 MB/s) - ‘spark-3.2.4-bin-hadoop3.2.tgz’ saved [301183180/301183180]



## Instalación Spark

In [None]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


## Descarga del dataset

In [None]:
!wget "https://www.kaggle.com/api/v1/datasets/download/shawon10/web-log-dataset"
!unzip web-log-dataset -d data

--2025-01-19 19:25:13--  https://www.kaggle.com/api/v1/datasets/download/shawon10/web-log-dataset
Resolving www.kaggle.com (www.kaggle.com)... 35.244.233.98
Connecting to www.kaggle.com (www.kaggle.com)|35.244.233.98|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://storage.googleapis.com:443/kaggle-data-sets/14835/848738/bundle/archive.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20250119%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20250119T192513Z&X-Goog-Expires=259200&X-Goog-SignedHeaders=host&X-Goog-Signature=8661ac9407ab51e059bc89e632bde1dc998178ad8e793a128d7d242875d340db3aa0944ad20db7aeeaedc02eceeabc33dd8539caa431b91ef6abb52cc9e91380dc4257aa0da251f8a30123ec6861f2cbe2a467bc5f7c8e6854e16110ed0cbdad7bfaca872c5c4689cfeaac78ef796e6dd2b928e3568a4bd65176020e5a88a1f26daae11bd24e85197f26ab1a9222e820887f1928f6205c3795449427dacd697658d2e861aa8e8de009f709ce7f15adcfdfbc73ffcdaa6519f678438

Inicializamos la Spark Session

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder\
.master("local[*]") \
.appName("Spark_Dataframes") \
.getOrCreate()

spark

In [None]:
ds = spark.read.csv('data/weblog.csv', header=True)
ds.show(5, truncate=False)

+----------+---------------------+---------------------------------------------+-----+
|IP        |Time                 |URL                                          |Staus|
+----------+---------------------+---------------------------------------------+-----+
|10.128.2.1|[29/Nov/2017:06:58:55|GET /login.php HTTP/1.1                      |200  |
|10.128.2.1|[29/Nov/2017:06:59:02|POST /process.php HTTP/1.1                   |302  |
|10.128.2.1|[29/Nov/2017:06:59:03|GET /home.php HTTP/1.1                       |200  |
|10.131.2.1|[29/Nov/2017:06:59:04|GET /js/vendor/moment.min.js HTTP/1.1        |200  |
|10.130.2.1|[29/Nov/2017:06:59:06|GET /bootstrap-3.3.7/js/bootstrap.js HTTP/1.1|200  |
+----------+---------------------+---------------------------------------------+-----+
only showing top 5 rows



## Código

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_extract, count, trim

# Inicializamos la SparkSession
spark = SparkSession.builder.appName("Exercise_D").getOrCreate()

# Cargamos el dataset
file_path = "data/weblog.csv"
logs = spark.read.csv(file_path, header=True)

# Expresión regular para validar direcciones IP con octetos en el rango 0-255
valid_ip_regex = r'^((25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})\.){3}(25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})$'


# Filtrar IPs válidas
logs = logs.withColumn("IP", trim(col("IP"))) \
           .filter(regexp_extract(col("IP"), valid_ip_regex, 0) != "")

# Clasificar recursos por tipo (en cualquier parte de la URL)
logs = logs.withColumn(
    "ResourceType",
    when(col("URL").rlike(r"\.php"), "PHP")
    .when(col("URL").rlike(r"\.js"), "JS")
    .when(col("URL").rlike(r"\.css"), "CSS")
    .when(col("URL").rlike(r"\.(jpg|png|gif)"), "Image")
    .otherwise("Other")
)

# Agrupamos por IP y por tipo de recurso y lo ordenamos para que aparezcan juntas las mismas IPs
result = logs.groupBy("IP", "ResourceType") \
             .agg(count("*").alias("AccessCount")) \
             .orderBy("IP", "ResourceType", col("AccessCount"))



result.show(truncate=False)

+----------+------------+-----------+
|IP        |ResourceType|AccessCount|
+----------+------------+-----------+
|10.128.2.1|CSS         |451        |
|10.128.2.1|Image       |57         |
|10.128.2.1|JS          |776        |
|10.128.2.1|Other       |337        |
|10.128.2.1|PHP         |2636       |
|10.129.2.1|CSS         |298        |
|10.129.2.1|Image       |29         |
|10.129.2.1|JS          |240        |
|10.129.2.1|Other       |208        |
|10.129.2.1|PHP         |877        |
|10.130.2.1|CSS         |477        |
|10.130.2.1|Image       |55         |
|10.130.2.1|JS          |725        |
|10.130.2.1|Other       |283        |
|10.130.2.1|PHP         |2516       |
|10.131.0.1|CSS         |486        |
|10.131.0.1|Image       |47         |
|10.131.0.1|JS          |711        |
|10.131.0.1|Other       |351        |
|10.131.0.1|PHP         |2603       |
+----------+------------+-----------+
only showing top 20 rows

