In [0]:
### import libs

from pyspark.sql.functions import when, split, col, lit, regexp_extract
from pyspark.sql.types import LongType, StringType, IntegerType
from pyspark.sql import functions as F, Window, DataFrame
from typing import List
import logging


In [0]:
### Session's config
spark.conf.set("spark.sql.shuffle.partitions", "100") 
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")  

In [0]:
### Define Functions

class DataValidator:
    def __init__(self, df: DataFrame):
        self.df = df

    def validate_required_columns(self, required_columns: List[str]):
        """
            Verify if all columns needed really exists

            parameters
            ---------------
            required_columns: list of necessary columns
        
        """
        missing_columns = [col for col in required_columns if col not in self.df.columns]
        if missing_columns: 
            raise ValueError(f"The columns following columns are missing from the dataframe: {', '.join(missing_columns)}")

    def validate_column_types(self, expected_types: dict):
        """
            Verify if each column is in the correct datatype/expected datatype

            parameters
            ---------------
            expected_types: dictionary of column with its datatypes
        """
        for column, expected_type in expected_types.items():
            actual_type = self.df.schema[column].dataType.simpleString()
            if actual_type != expected_type:
                raise ValueError(f"The column '{column}' should be of type {expected_type}, but it's of type {actual_type}")

    def validate_nulls_in_column(self, column: str):
        """
            Verify if it has null values in a specified column

            parameters
            ---------------
            column: column to check
        """
        if self.df.filter(F.col(column).isNull()).count() > 0:
            raise ValueError(f"There is null values in column: {column}")

    def validate_ip_format(self):
        """
            Validates the ip format, in 'ip' column
        """
        invalid_ips = self.df.filter(~F.expr(
            "ip rlike '^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\."
            "(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\."
            "(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\."
            "(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'"
        )).count()
        if invalid_ips > 0:
            raise ValueError(f"There is invalid ips in the 'ip' column: {invalid_ips}")

    def validate_status_range(self):
        """
            Verify if 'status' is in the gap between 100 and 599, or a valid status response
        """
        if self.df.filter((F.col("status") < 100) | (F.col("status") > 599)).count() > 0:
            raise ValueError("There is 'status' of response off the correct gap (100-599)")

    def validate_incompleted_avg(self):
        """
            Verify the average of incomplete lines
        """
        incompleted_avg = self.df.agg(F.avg("incompleted")).collect()[0][0]
        if incompleted_avg > 0.2:
            raise ValueError("Data quality problem... the average of incompleted rows is bigger than 20%")

    def validate_date_format(self):
        """
            Validate the format of the column 'date'
        """
        null_dates = self.df.filter(F.col("date").isNull())
        if null_dates.count() > 0:
            if null_dates.filter(F.col("incompleted") == 0).count() > 0:
                raise ValueError("There is null record in 'date' column, verify this")
            logging.warning("There is null records in 'date' column, but only in incompleted rows. Check this!")

    def validate_and_save(self, layer: str, path: str):
        """
            Main function of tests, execute tests by determined layer
        """
        try:

            
            # Defining rules based on the layer
            if layer == 'silver':
                # Mandatory columns
                required_columns = ['ip', 'date', 'request', 'status', 'size']
                self.validate_required_columns(required_columns)

                self.validate_incompleted_avg()
                self.validate_nulls_in_column("ip")
                self.validate_ip_format()
                self.validate_status_range()
                self.validate_date_format()

            elif layer == 'gold':
                # Mandatory columns
                required_columns = ['ip', 'date', 'request', 'status', 'size', 'endpoint', 'day_of_week']
                self.validate_required_columns(required_columns)

                expected_types = {
                    "ip": "string",
                    "request": "string",
                    "endpoint": "string",
                    "day_of_week": "string",
                    "status": "int",
                    "size": "int",
                    "date": "timestamp"
                }
                self.validate_column_types(expected_types)

            print("Validation succeeded")
            print(f"Saving data in {layer} layer...")
            self.df.write.format("delta").mode("overwrite").save(path)
            print("Data saved with success")

        except ValueError as e:
            print(f"Validation error: {e}")

## General Changes

In [0]:
df = spark.sql("select * from default.access_log_10_txt")

In [0]:
# Regex used to identify data
regex_pattern = r'(\S+) - (.+?)(?: \[(.*?)\] "(.*?)"(?: (\d{3}) (\d+|-)?)?)?$'


# Through regex extracting columns from the original data, following the Web Server Access Log pattern
df_extracted = df.select('_c0',
    F.regexp_extract('_c0', regex_pattern, 1).alias('ip'),
    F.regexp_extract('_c0', regex_pattern, 2).alias('extra_field'),
    F.regexp_extract('_c0', regex_pattern, 3).alias('date'),
    F.regexp_extract('_c0', regex_pattern, 4).alias('request'),
    F.regexp_extract('_c0', regex_pattern, 5).alias('status'),
    F.regexp_extract('_c0', regex_pattern, 6).alias('size')
)

# Through regex extracting date and request from 'extra_field' column, as a second step of data gathering
## in case of data extracting from those columns, after it, setting '-' into 'extra_field' field
df_extracted = df_extracted \
    .withColumn("date", when((col("date") == "") & col("extra_field").rlike(r'\['), split(split(col("extra_field"), r'\[', 2)[1], r'\]', 2)[0]).otherwise(col("date"))) \
    .withColumn("request", when((col("request") == "") & col("extra_field").rlike(r'"'), split(split(col("extra_field"), r'"', 2)[1], r'"', 2)[0]).otherwise(col("request"))) \
    .withColumn("extra_field", when((col("date") != "") | (col("request") != ""), "-").otherwise(col("extra_field"))) 


# Datatype correction
df_extracted_type_correction = df_extracted.select(
    F.col("ip").cast(StringType()).alias("ip"),
    F.col("extra_field").cast(StringType()).alias("extra_field"),
    F.to_timestamp(F.unix_timestamp("date", "dd/MMM/yyyy:HH:mm:ss Z").cast("timestamp")).alias("date"),
    F.col("request").cast(StringType()).alias("request"),
    F.col("status").cast("int").alias("status"),
    F.col("size").cast(IntegerType()).alias("size")
)

df_extracted_type_correction.show(5)



+--------------+-----------+-------------------+--------------------+------+-----+
|            ip|extra_field|               date|             request|status| size|
+--------------+-----------+-------------------+--------------------+------+-----+
|10.223.157.186|          -|2009-07-15 21:58:59|      GET / HTTP/1.1|   403|  202|
|10.223.157.186|          -|2009-07-15 21:58:59|GET /favicon.ico ...|   404|  209|
|10.223.157.186|          -|2009-07-15 22:50:35|      GET / HTTP/1.1|   200| 9157|
|10.223.157.186|          -|2009-07-15 22:50:35|GET /assets/js/lo...|   200|10469|
|10.223.157.186|          -|2009-07-15 22:50:35|GET /assets/css/r...|   200| 1014|
+--------------+-----------+-------------------+--------------------+------+-----+
only showing top 5 rows



In [0]:
# Creating a flag to indetify 'incompleted' rows of data (eg.: it has id but not date...)
df_extracted_type_correction = df_extracted_type_correction.withColumn(
    "incompleted",
    F.when(
        (col("date").isNull() | (col("date") == "")) &
        (col("request").isNull() | (col("request") == "")) &
        (col("status").isNull() | (col("status") == "")) &
        (col("size").isNull() | (col("size") == "")),
        1
    ).otherwise(0).cast(IntegerType())
)
df_extracted_type_correction.cache()

Out[74]: DataFrame[ip: string, extra_field: string, date: timestamp, request: string, status: int, size: int, incompleted: int]

In [0]:
# Testing Dataframe and saving in Silver Layer
df_silver = DataValidator(df_extracted_type_correction)
df_silver.validate_and_save("silver", "/mnt/delta/layers/silver/logs_delta_1")



Validation succeeded
Saving data in silver layer...
Data saved with success


In [0]:
### Feature preparation:
df_delta_read = spark.read.format("delta").load("/mnt/delta/layers/silver/logs_delta_1")
df_delta_read = df_delta_read.select('ip', 'date', 'request', 'status', 'size')
df_delta_read.cache()


# (question 2)
# Through regex identifying into different columns features 'request_type' and 'endpoint' from orignal 'request' column
df = df_delta_read.withColumn("request_type", F.regexp_extract("request", r"^\S+", 0)) \
        .withColumn("endpoint", F.regexp_extract("request", r"(?<=^\S+\s).+(?=\sHTTP/\d+\.\d+)", 0))


# (question 6)
# Identifying 'day_of_week' registered in the logs in - text format (monday -> sunday)
df = df.withColumn("day_of_week", F.date_format("date", "EEEE"))

In [0]:
# Testing Dataframe and saving in Gold Layer
df_gold = DataValidator(df)
df_gold.validate_and_save("gold", "/mnt/delta/layers/gold/logs_delta_1")


Validation succeeded
Saving data in gold layer...
Data saved with success


--------

In [0]:
# Preparation to common dataframe query
df_delta_read = spark.read.format("delta").load("/mnt/delta/layers/gold/logs_delta_1")
df_delta_read.cache()

Out[81]: DataFrame[ip: string, date: timestamp, request: string, status: int, size: int, request_type: string, endpoint: string, day_of_week: string]

## 1

Identifique as 10 maiores origens de acesso (Client IP) por quantidade de acessos

In [0]:
top10 = df_delta_read.groupBy('ip').count().sort(F.desc("count")).limit(10)
top10.show()

+--------------+------+
|            ip| count|
+--------------+------+
|10.216.113.172|158614|
|  10.220.112.1| 51942|
|10.173.141.213| 47503|
|10.240.144.183| 43592|
|  10.41.69.177| 37554|
|10.169.128.121| 22516|
| 10.211.47.159| 20866|
| 10.96.173.111| 19667|
| 10.203.77.198| 18878|
|   10.31.77.18| 18721|
+--------------+------+



##2

Liste os 6 endpoints mais acessados, desconsiderando aqueles que representam arquivos

In [0]:
# Filter data that 'endpoint' don't have a file extension
df_no_file_endpoints = df_delta_read.filter(~F.col("endpoint").rlike(r"\.\w+$"))

window = Window.orderBy(F.desc("count"))
top6 = (df_no_file_endpoints
        .groupBy("endpoint")
        .count()
        .withColumn("rank", F.row_number().over(window))
        .filter(F.col("rank") <= 6)
        .drop("rank"))

top6.show(truncate=False)

+--------------------------------------+-----+
|endpoint                              |count|
+--------------------------------------+-----+
|/                                     |99303|
|/release-schedule/                    |25937|
|/search/                              |23055|
|/release-schedule                     |18940|
|/release-schedule/?p=1&r=&l=&o=&rpp=10|8415 |
|/news/                                |7505 |
+--------------------------------------+-----+



##3

Qual a quantidade de Client IPs distintos?



In [0]:
# Getting only the distinct ip's and counting
uniqueIDS = df_delta_read.select("ip").distinct().count()
print(f"Quantidade de Client IPs distintos: {uniqueIDS}")

Quantidade de Client IPs distintos: 333923


##4

Quantos dias de dados estão representados no arquivo?

In [0]:
# It collects the min and max date founded in data
min_date = df_delta_read.agg(F.min("date")).collect()[0][0]
max_date = df_delta_read.agg(F.max("date")).collect()[0][0]

# Using the dates above, calculate the difference in days
days_difference = (max_date - min_date).days

print(f"Minimum value: {min_date}")
print(f"Maximum value: {max_date}")
print(f"Days between: {days_difference}")


Minimum value: 2009-07-15 21:58:59
Maximum value: 2011-12-03 21:28:11
Days between: 870


In [0]:
# ambiguos question

# Extract only year month and day and caculating only the dates that has data
unique_days = df_delta_read.select(F.to_date("date", "yyyy-MM-dd HH:mm:ss").alias("day_only")).distinct().count()
print(f"Days of existant data: {unique_days}")

Days of existant data: 792


## 5

Com base no tamanho (em bytes) do conteúdo das respostas, faça a seguinte análise:

In [0]:
# Executing data agg
stats = df_delta_read.agg(
    F.sum("size").alias("total_bytes"), # get sum 5.1
    F.max("size").alias("max_bytes"),   # get max 5.2
    F.min("size").alias("min_bytes"),   # get min 5.3
    F.avg("size").alias("avg_bytes")    # get avg 5.4
).collect()[0]

# converting data to MegaBytes to make easy the reading
total_gb = stats["total_bytes"] / (1024 ** 2)
max_gb = stats["max_bytes"] / (1024 ** 2)
min_gb = stats["min_bytes"] / (1024 ** 2)
avg_gb = stats["avg_bytes"] / (1024 ** 2)

print(f"Sum of response size: {total_gb:.2f} MB")
print(f"Biggest response in size: {max_gb:.2f} MB")
print(f"Minor response in size: {min_gb:.2f} MB")
print(f"Average of response size: {avg_gb:.2f} MB")


Sum of response size: 767916.81 MB
Biggest response in size: 76.50 MB
Minor response in size: 0.00 MB
Average of response size: 0.19 MB


##6

Qual o dia da semana com o maior número de erros do tipo "HTTP Client Error"?

In [0]:
# Calculates errors grouped by 'day_of_week'
## setting the patthern to identify which code means HTTP Client Error

common_error_day = df_delta_read.filter((F.col("status") > 399) & (F.col("status") < 500)) \
        .groupBy("day_of_week") \
        .agg(F.count("*").alias("error_count")) \
        .orderBy(F.desc("error_count")) \
        .limit(1)

common_error_day.show(truncate=False)


+-----------+-----------+
|day_of_week|error_count|
+-----------+-----------+
|Friday     |15087      |
+-----------+-----------+

