<a href="https://colab.research.google.com/github/1rd0/VK_test/blob/main/VK_TEST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark
!pip install faker




In [None]:
import pandas as pd
import random
from faker import Faker

fake = Faker()

def generate_test_data(num_rows):
    data = {
        'email': [fake.email() for _ in range(num_rows)],
        'action': [random.choice(['CREATE', 'READ', 'UPDATE', 'DELETE']) for _ in range(num_rows)],
        'dt': [fake.date_between(start_date='-1y', end_date='today') for _ in range(num_rows)]
    }
    return pd.DataFrame(data)


df = generate_test_data(1000)

df.to_csv('test_data.csv', index=False)


In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("Log Processing") \
    .getOrCreate()

df = spark.read.csv("/content/test_data.csv", header=True, inferSchema=True)


In [None]:
df.show()

+--------------------+------+----------+
|               email|action|        dt|
+--------------------+------+----------+
|eharrison@example...|CREATE|2024-07-21|
|leonardchristophe...|CREATE|2024-07-20|
|dorothy78@example...|UPDATE|2024-01-29|
| kelly86@example.net|CREATE|2024-05-21|
|heather66@example...|DELETE|2023-10-09|
| peter93@example.org|  READ|2023-12-20|
|ronniegomez@examp...|CREATE|2023-10-23|
|   ososa@example.net|CREATE|2024-06-05|
|derekdavis@exampl...|DELETE|2024-08-30|
|drodgers@example.net|  READ|2023-09-25|
| hwright@example.org|CREATE|2024-07-09|
|luisjenkins@examp...|  READ|2024-08-11|
|katherineball@exa...|CREATE|2024-06-28|
| wmartin@example.org|CREATE|2024-03-24|
|wendymunoz@exampl...|DELETE|2024-08-22|
| kylie23@example.net|CREATE|2024-02-26|
|   vking@example.org|DELETE|2024-08-11|
|imcbride@example.net|DELETE|2024-07-02|
|taylor51@example.com|  READ|2024-05-19|
|kingsamantha@exam...|  READ|2024-08-05|
+--------------------+------+----------+
only showing top

In [None]:
from pyspark.sql.functions import col, count, date_sub

def calculate_weekly_aggregate(date_str):
    #читаем запрос
    target_date = pd.to_datetime(date_str)

    start_date = target_date - pd.Timedelta(days=7)
    end_date = target_date - pd.Timedelta(days=1)

    filtered_df = df.filter((col("dt") >= start_date.strftime('%Y-%m-%d')) &
                             (col("dt") <= end_date.strftime('%Y-%m-%d')))


    aggregated_df = filtered_df.groupBy("email", "action").agg(count("action").alias("count"))
    pivoted_df = aggregated_df.groupBy("email").pivot("action").agg(count("count")).na.fill(0)

    # Переименовываем колонки
    pivoted_df = pivoted_df.withColumnRenamed("CREATE", "create_counter") \
                           .withColumnRenamed("READ", "read_counter") \
                           .withColumnRenamed("UPDATE", "update_counter") \
                           .withColumnRenamed("DELETE", "delete_counter")


    output_file = f"/content/output/{target_date.strftime('%Y-%m-%d')}.csv"
    pivoted_df.coalesce(1).write.csv(output_file, mode="overwrite", header=True)

    print(f"Aggregated data saved to {output_file}")


In [None]:
import os

#директория для выходных файлов
os.makedirs("/content/output", exist_ok=True)


In [None]:
#выбираем нужную дату

In [None]:
calculate_weekly_aggregate("2024-09-16")


Aggregated data saved to /content/output/2024-09-16.csv
