In [None]:
%pip install pyspark
%pip install faker

In [2]:
import csv
from faker import Faker
import random

from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc

Сгенерируем 100,000 записей логов и сохраним их в CSV-файл

In [3]:
fake = Faker()

num_records = 100000

http_methods = ['GET', 'POST', 'PUT', 'DELETE']
response_codes = [200, 301, 404, 500]

file_path = "web_server_logs.csv"

with open(file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['ip', 'timestamp', 'method', 'url', 'response_code', 'response_size'])
    
    for _ in range(num_records):
        ip = fake.ipv4()
        timestamp = fake.date_time_this_year().isoformat()
        method = random.choice(http_methods)
        url = fake.uri_path()
        response_code = random.choice(response_codes)
        response_size = random.randint(100, 10000)
        
        writer.writerow([ip, timestamp, method, url, response_code, response_size])

print(f"Сгенерировано {num_records} записей и сохранено в {file_path}")

Сгенерировано 100000 записей и сохранено в web_server_logs.csv


Создаем SparkSession для чтения csv файла

In [4]:
spark = SparkSession.builder \
    .appName('web_server_logs') \
    .master('local[*]') \
    .getOrCreate()
    
logs_df = spark.read.csv(r'web_server_logs.csv', header=True, inferSchema=True)
logs_df.show()

+---------------+-------------------+------+--------------------+-------------+-------------+
|             ip|          timestamp|method|                 url|response_code|response_size|
+---------------+-------------------+------+--------------------+-------------+-------------+
|159.175.219.111|2024-06-27 16:57:18|  POST|      app/posts/blog|          301|         7427|
|    51.8.211.65|2024-02-03 10:42:16|  POST|          categories|          301|         1597|
| 182.197.64.200|2024-01-16 16:14:17|  POST|         app/explore|          404|         3482|
| 116.212.216.37|2024-08-09 07:38:36|   GET|       main/main/tag|          500|         9887|
|   3.192.187.20|2024-06-22 23:52:55|  POST|           blog/list|          301|         3340|
| 140.35.234.134|2024-07-09 16:17:03|  POST|     search/app/main|          301|         3442|
| 174.122.70.119|2024-06-21 22:38:38|   PUT|          categories|          200|         4971|
|177.104.219.119|2024-05-30 16:34:10|   PUT|                

Группируем данные по IP и считаем количество запросов для каждого IP, выводим 10 самых активных IP

In [5]:
active_ip_addresses = logs_df.groupBy('ip') \
    .agg(count('response_size').alias('response_count')) \
    .orderBy(desc('response_count')) \
    .limit(10)

print('Top 10 active IP addresses:')
active_ip_addresses.show()


Top 10 active IP addresses:
+---------------+--------------+
|             ip|response_count|
+---------------+--------------+
| 118.49.251.236|             2|
| 76.222.127.203|             2|
|110.135.193.211|             1|
|187.252.136.135|             1|
|  196.119.15.53|             1|
|  177.77.163.23|             1|
|   16.206.5.172|             1|
| 112.137.136.89|             1|
|  31.125.175.52|             1|
|   27.40.27.217|             1|
+---------------+--------------+



Сгруппируем данные по HTTP-методу и считаем количество запросов для каждого метода

In [19]:
count_method = logs_df.groupBy('method').agg(count('response_code'))


print(f'Request count by HTTP method:')
count_method.show()

Request count by HTTP method:
+------+--------------------+
|method|count(response_code)|
+------+--------------------+
|  POST|               24846|
|DELETE|               24959|
|   PUT|               25136|
|   GET|               25059|
+------+--------------------+



Фильтруем и считаем количество запросов с кодом ответа 404   
Группируем данные по дате и суммируем размер ответов, сортируем по дате

In [37]:
logs_df.createOrReplaceTempView('logs')

response_404 = spark.sql("""
                         SELECT DATE(timestamp) AS date, sum(response_size) AS total_response_size
                         FROM logs
                         WHERE response_code = 404
                         GROUP BY DATE(timestamp)
                         ORDER BY date                        
                         """)

number_404 = spark.sql("""
                       SELECT COUNT(response_code)
                       FROM logs
                       WHERE response_code = 404
                       """)

print(f'Number of 404 response codes: {number_404.collect()[0][0]}')
print('Total response size by day:')
response_404.show()


Number of 404 response codes: 25132
Total response size by day:
+----------+-------------------+
|      date|total_response_size|
+----------+-------------------+
|2024-01-01|             614352|
|2024-01-02|             600045|
|2024-01-03|             462107|
|2024-01-04|             450255|
|2024-01-05|             599210|
|2024-01-06|             523533|
|2024-01-07|             562389|
|2024-01-08|             638383|
|2024-01-09|             525968|
|2024-01-10|             662050|
|2024-01-11|             508995|
|2024-01-12|             541372|
|2024-01-13|             510950|
|2024-01-14|             549425|
|2024-01-15|             563182|
|2024-01-16|             574433|
|2024-01-17|             623054|
|2024-01-18|             584093|
|2024-01-19|             484968|
|2024-01-20|             492107|
+----------+-------------------+
only showing top 20 rows

