## Importing Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

**3451. Find Invalid IP Addresses (Hard)**

**Table: logs**

| Column Name | Type    |
|-------------|---------|
| log_id      | int     |
| ip          | varchar |
| status_code | int     |

log_id is the unique key for this table.
Each row contains server access log information including IP address and HTTP status code.

**Write a solution to find invalid IP addresses. An IPv4 address is invalid if it meets any of these conditions:**
- Contains numbers greater than 255 in any octet
- Has leading zeros in any octet (like 01.02.03.04)
- Has less or more than 4 octets

Return the result table ordered by invalid_count, ip in descending order respectively. 

The result format is in the following example.

**Example:**

**Input:**

**logs table:**

| log_id | ip            | status_code | 
|--------|---------------|-------------|
| 1      | 192.168.1.1   | 200         | 
| 2      | 256.1.2.3     | 404         | 
| 3      | 192.168.001.1 | 200         | 
| 4      | 192.168.1.1   | 200         | 
| 5      | 192.168.1     | 500         | 
| 6      | 256.1.2.3     | 404         | 
| 7      | 192.168.001.1 | 200         | 

**Output:**
| ip            | invalid_count|
|---------------|--------------|
| 256.1.2.3     | 2            |
| 192.168.001.1 | 2            |
| 192.168.1     | 1            |

**Explanation:**
- 256.1.2.3 is invalid because 256 > 255
- 192.168.001.1 is invalid because of leading zeros
- 192.168.1 is invalid because it has only 3 octets

The output table is ordered by invalid_count, ip in descending order respectively.

In [0]:
logs_data_3451 = [
    (1, "192.168.1.1", 200),
    (2, "256.1.2.3", 404),
    (3, "192.168.001.1", 200),
    (4, "192.168.1.1", 200),
    (5, "192.168.1", 500),
    (6, "256.1.2.3", 404),
    (7, "192.168.001.1", 200),
]

logs_columns_3451 = ["log_id", "ip", "status_code"]
logs_df_3451 = spark.createDataFrame(logs_data_3451, logs_columns_3451)
logs_df_3451.show()

+------+-------------+-----------+
|log_id|           ip|status_code|
+------+-------------+-----------+
|     1|  192.168.1.1|        200|
|     2|    256.1.2.3|        404|
|     3|192.168.001.1|        200|
|     4|  192.168.1.1|        200|
|     5|    192.168.1|        500|
|     6|    256.1.2.3|        404|
|     7|192.168.001.1|        200|
+------+-------------+-----------+



In [0]:
octets = split(col("ip"), "\.")

In [0]:
invalid_df_341 = logs_df_3451\
    .withColumn("is_invalid",
                when(
                (size(octets) != 4) |
                (octets[0].cast("int") > 255) |
                (octets[1].cast("int") > 255) |
                (octets[2].cast("int") > 255) |
                (octets[3].cast("int") > 255) |
                ((length(octets[0]) > 1) & (octets[0].startswith("0"))) |
                ((length(octets[1]) > 1) & (octets[1].startswith("0"))) |
                ((length(octets[2]) > 1) & (octets[2].startswith("0"))) |
                ((length(octets[3]) > 1) & (octets[3].startswith("0"))),
                1
                ).otherwise(0)
            )

In [0]:
invalid_df_341\
    .filter(col("is_invalid") == 1) \
        .groupBy("ip") \
            .agg(sum("is_invalid").alias("invalid_count")) \
                .orderBy(col("invalid_count").desc(), col("ip").desc()).display()

ip,invalid_count
256.1.2.3,2
192.168.001.1,2
192.168.1,1
