# Задача
__________

**Часть 1. Генерация информации**

Мы использовали библиотеку Faker для генерации логов веб-сервера. Логи содержат следующую информацию:

- IP-адрес клиента
- Временная метка запроса
- HTTP-метод (GET, POST, etc.)
- URL запроса
- Код ответа (200, 404, etc.)
- Размер ответа в байтах

Сгенерировали 100,000 записей логов и сохранили их в CSV-файл.

**Часть 2. Анализ информации**

1. Сгруппируйте данные по IP и посчитайте количество запросов для каждого IP, выводим 10 самых активных IP.

2. Сгруппируйте данные по HTTP-методу и посчитайте количество запросов для каждого метода.

3. Профильтруйте и посчитайте количество запросов с кодом ответа 404.

4. Сгруппируйте данные по дате и просуммируйте размер ответов, сортируйте по дате.

# Загрузка и настройка библиотек

In [1]:
import csv
import random
from datetime import datetime, timedelta

# faker
from faker import Faker
fake = Faker()

# pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, sum, desc

spark = SparkSession.builder \
    .appName("pyspark_hw_final") \
    .master("local[*]") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/11 21:30:00 WARN Utils: Your hostname, DESKTOP-O4NLJ3P, resolves to a loopback address: 127.0.1.1; using 172.27.58.90 instead (on interface eth0)
25/08/11 21:30:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/11 21:30:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/11 21:30:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/08/11 21:30:11 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


# Генерация данных

In [2]:
num_records = 100000

http_methods = ['GET', 'POST', 'PUT', 'DELETE']
response_codes = [200, 301, 404, 500]

file_path = "/home/nick/learning/data_engineer/de_stepik/files/web_server_logs.csv"

with open(file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['ip', 'timestamp', 'method', 'url', 'response_code', 'response_size'])
    
    for _ in range(num_records):
        ip = fake.ipv4()
        timestamp = fake.date_time_this_year().isoformat()
        method = random.choice(http_methods)
        url = fake.uri_path()
        response_code = random.choice(response_codes)
        response_size = random.randint(100, 10000)
        
        writer.writerow([ip, timestamp, method, url, response_code, response_size])
        
print(f"Сгенерировано {num_records} записей и сохранено в {file_path}")

Сгенерировано 100000 записей и сохранено в /home/nick/learning/data_engineer/de_stepik/files/web_server_logs.csv


# Чтение данных

In [3]:
# 1. Чтение CSV файла
web_server_logs_df = spark.read \
    .option("header", "true") \
    .schema("ip STRING, timestamp TIMESTAMP, method STRING, url STRING, response_code INT, response_size INT") \
    .csv("/home/nick/learning/data_engineer/de_stepik/files/web_server_logs.csv")
    
    
# 2. Просмотр структуры данных
print("Схема данных books_df")
web_server_logs_df.printSchema()


# 3. Просмотр данных в табличном формате
web_server_logs_df.show(5, truncate=False)

Схема данных books_df
root
 |-- ip: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- method: string (nullable = true)
 |-- url: string (nullable = true)
 |-- response_code: integer (nullable = true)
 |-- response_size: integer (nullable = true)

+---------------+--------------------------+------+---------------+-------------+-------------+
|ip             |timestamp                 |method|url            |response_code|response_size|
+---------------+--------------------------+------+---------------+-------------+-------------+
|169.194.198.188|2025-07-15 11:48:19.288121|GET   |posts/tags     |301          |2438         |
|93.188.19.13   |2025-07-08 03:16:12.951191|PUT   |posts          |500          |1625         |
|115.57.151.168 |2025-08-07 19:52:45.659922|GET   |wp-content/main|404          |4696         |
|35.0.100.168   |2025-04-14 19:31:44.2974  |GET   |search/explore |500          |532          |
|173.165.186.142|2025-06-28 14:20:25.185811|DELETE|posts 

# Анализ информации

## 1. Сгруппируйте данные по IP и посчитайте количество запросов для каждого IP, выводим 10 самых активных IP.

In [None]:
print("Top 10 active IP addresses:")
web_server_logs_df \
    .groupBy("ip") \
    .agg(count("*").alias("request_count")) \
    .orderBy(desc("request_count")) \
    .limit(10) \
    .show()

Top 10 active IP addresses
+---------------+-------------+
|             ip|request_count|
+---------------+-------------+
|  96.45.194.167|            2|
|   31.113.43.53|            2|
|    49.32.45.26|            1|
|  54.140.47.132|            1|
|209.211.229.158|            1|
| 181.89.106.110|            1|
|   183.81.202.9|            1|
|   1.68.165.194|            1|
| 18.185.169.241|            1|
| 21.103.124.202|            1|
+---------------+-------------+



## 2. Сгруппируйте данные по HTTP-методу и посчитайте количество запросов для каждого метода.

In [None]:
print("Request count by HTTP method:")
web_server_logs_df \
    .groupBy("method") \
    .agg(count("*").alias("method_count")) \
    .show()

Request count by HTTP method
+------+------------+
|method|method_count|
+------+------------+
|  POST|       25075|
|DELETE|       25000|
|   PUT|       24958|
|   GET|       24967|
+------+------------+



## 3. Профильтруйте и посчитайте количество запросов с кодом ответа 404.

In [13]:
web_server_logs_df.createOrReplaceTempView("web_server_logs")

number_of_404 = \
    spark.sql("""
        SELECT COUNT(*)
        FROM web_server_logs
        WHERE response_code = 404
    """).first()[0]

print(f"Number of 404 response codes: {number_of_404}")

Number of 404 response codes: 24635


## 4. Сгруппируйте данные по дате и просуммируйте размер ответов, сортируйте по дате.

In [14]:
print("Total response size by day:")
spark.sql("""
    SELECT timestamp::date AS date,
           sum(response_size) AS total_response_size
    FROM web_server_logs
    GROUP BY date
    ORDER BY date
""").show()

Total response size by day:
+----------+-------------------+
|      date|total_response_size|
+----------+-------------------+
|2025-01-01|            2502829|
|2025-01-02|            2250096|
|2025-01-03|            2183530|
|2025-01-04|            2240402|
|2025-01-05|            2350401|
|2025-01-06|            1918317|
|2025-01-07|            2179389|
|2025-01-08|            2019062|
|2025-01-09|            2367440|
|2025-01-10|            2429901|
|2025-01-11|            2231846|
|2025-01-12|            2037497|
|2025-01-13|            2170417|
|2025-01-14|            2492886|
|2025-01-15|            2357468|
|2025-01-16|            2477730|
|2025-01-17|            2172260|
|2025-01-18|            2267976|
|2025-01-19|            2202596|
|2025-01-20|            2036961|
+----------+-------------------+
only showing top 20 rows


In [4]:
spark.stop()