<a href="https://colab.research.google.com/github/EsSanches/DE-step/blob/main/task1_git.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql.functions import col, to_date, sum, avg, month, year
from pyspark.sql.functions import *
import csv
from faker import Faker
import random

spark = SparkSession.builder.appName("task_spark").getOrCreate()


# Сгенерировать 100,000 записей логов и сохранили их в CSV-файл
fake = Faker()

num_records = 100000

http_methods = ['GET', 'POST', 'PUT', 'DELETE']
response_codes = [200, 301, 404, 500]

file_path = "web_server_logs.csv"

with open(file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['ip', 'timestamp', 'method', 'url', 'response_code', 'response_size'])

    for _ in range(num_records):
        ip = fake.ipv4()
        timestamp = fake.date_time_this_year().isoformat()
        method = random.choice(http_methods)
        url = fake.uri_path()
        response_code = random.choice(response_codes)
        response_size = random.randint(100, 10000)

        writer.writerow([ip, timestamp, method, url, response_code, response_size])

print(f"Сгенерировано {num_records} записей и сохранено в {file_path}")


logs_df = spark.read.csv("web_server_logs.csv", header=True, inferSchema=True)
logs_df.printSchema()

#1. Сгруппировать данные по IP и посчитайть количество запросов для каждого IP, выводим 10 самых активных IP.
top_ip = logs_df.groupBy("ip").count().orderBy(col("count").desc()).limit(10).withColumnRenamed("count", "request_count")
top_ip.show()

#2. Сгруппировать данные по HTTP-методу и посчитать количество запросов для каждого метода.
top_methods = logs_df.groupBy("method").count().orderBy(col("count").desc()).withColumnRenamed("count", "method_count")
top_methods.show()


#3. Профильтровать и посчитать количество запросов с кодом ответа 404.
count_request = logs_df.filter(col("response_code") == 404).count()
count_request


#4. Сгруппировать данные по дате и просуммировать размер ответов, сортировка по дате.
total_resp = logs_df.withColumn("date", to_date(col("timestamp"))).groupBy("date").agg(sum(col("response_size")).alias("total_response_size"))\
.orderBy(col("date")).show()

spark.stop()


