In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=29bc3a79b32f30aacdac67b533f0243298af5dafd651bbcd38a1abf2477688b7
  Stored in directory: /home/jovyan/.cache/pip/wheels/e9/b4/d8/38accc42606f6675165423e9f0236f8e825f6b6b6048d6743e
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.1


In [58]:
from datetime import date
from pathlib import Path
from typing import Union
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, expr, lit, concat, to_date, get_json_object, count, desc, asc
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType
from uuid import uuid4
import glob
import os
from datetime import date, timedelta

In [40]:
def prepare_raw_logs(
        external_system_name: str,
        date: Union[str, date],
        raw_logs_path: Union[str, Path],
        prepared_logs_path: Union[str, Path]
) -> None:
    """
    Внутри функции должно происходить чтение логов с помощью pyspark,
    приведение их к общей схеме и запись в директорию с препарированными логами.
    Препарированные логи должны быть сгрупированны по internal_app_id и date
    """
    spark = SparkSession\
    .builder\
    .master("local[*]")\
    .appName('PrepareRawLogs')\
    .getOrCreate()

    system_path = f"{raw_logs_path}/external_system_{external_system_name}"
    app_dirs = [name for name in os.listdir(system_path) if os.path.isdir(os.path.join(system_path, name))]
    app_id = app_dirs[0].split('=')[1]
    df = spark.read.parquet(f"{raw_logs_path}/external_system_{external_system_name}/application_id={app_id}/date={date}")
    log_schema = StructType([
        StructField('external_did', StringType(), True),
        StructField('event_name', StringType(), True),
        StructField('event_datetime', TimestampType(), True),
        StructField('event_json', StringType(), True),
        StructField('date', TimestampType(), True),
        StructField('push_token', StringType(), True),
        StructField('ios_ifa', StringType(), True),
        StructField('external_profile_id', StringType(), True),
        StructField('external_app_id', StringType(), True),
        StructField('external_system', StringType(), True),
        StructField('internal_app_id', StringType(), True)])
    df_new = spark.createDataFrame([], log_schema)
    if external_system_name == '1':
        app_uuid = '24a7a8f5-35f0-4c3a-9e51-02c7f62f7f06'
        df_new = df.select(df["device_id"].alias("external_did"),
                       df["event_type"].alias("event_name"),
                       df["event_time"].alias("event_datetime"),
                       concat(df["user_properties"], df["event_properties"]).alias("event_json"),
                       df["event_time"].cast("date").alias("date"),
                       get_json_object(col("user_properties"), "$.registration_id").alias("push_token"),
                       df["idfa"].alias("ios_ifa"),
                       df["user_id"].alias("external_profile_id"),
                       lit(app_id).alias("external_app_id"),
                       df["external_system"],
                       lit(app_uuid).alias("internal_app_id"))
    elif external_system_name == '2':
        app_uuid = '86ff5d12-55db-4bdf-a849-1b685bdff00b'
        df_new = df.select(df["uniq_device_id"].alias("external_did"),
                       df["event_name"].alias("event_name"),
                       df["event_datetime"].alias("event_datetime"),
                       df["event_json"],
                       df["event_datetime"].cast("date").alias("date"),
                       df["uniq_device_id"].alias("push_token"),
                       df["ios_ifa"].alias("ios_ifa"),
                       df["profile_id"].alias("external_profile_id"),
                       lit(app_id).alias("external_app_id"),
                       df["external_system"],
                       lit(app_uuid).alias("internal_app_id"))
    dirs = prepared_logs_path.split('/')
    prepared_logs_path = f'{dirs[0]}/{app_uuid}/{dirs[1]}'
    df_new.write.parquet(prepared_logs_path)
    df_new.show(10)

In [47]:

external_system = '2'
data_date = date(2023, 1, 7)
raw_logs_path = './'
prepare_raw_logs(external_system, f'{data_date.strftime("%Y-%m-%d")}', raw_logs_path, f'prepared/{data_date.strftime("%Y-%m-%d")}')

[Stage 131:>                                                        (0 + 1) / 1]

+--------------------+---------------+-------------------+--------------------+----------+--------------------+--------------------+-------------------+---------------+-----------------+--------------------+
|        external_did|     event_name|     event_datetime|          event_json|      date|          push_token|             ios_ifa|external_profile_id|external_app_id|  external_system|     internal_app_id|
+--------------------+---------------+-------------------+--------------------+----------+--------------------+--------------------+-------------------+---------------+-----------------+--------------------+
|bc28aebe94afbb39d...|Debug.Log.error|2023-01-07 16:49:42|{"event_property_...|2023-01-07|bc28aebe94afbb39d...|3d18d215cb20fe191...|                   |        1178448|external_system_2|86ff5d12-55db-4bd...|
|bc28aebe94afbb39d...|   subscription|2023-01-07 16:49:46|{"event_property_...|2023-01-07|bc28aebe94afbb39d...|3d18d215cb20fe191...|                   |        1178448|

                                                                                

In [80]:
def update_devices_table(
    internal_app_id: Union[int, str],
    date: Union[str, date],
    prepared_logs_path: Union[str, Path],
    devices_path: Union[str, Path]
) -> None:
    """
    date: дата препарированных логов
    internal_app_id: идентификатор приложения
    prepared_logs_path: путь к директории с препарированными логами
    devices_path: путь к директории с таблицами devices
    
    Внутри функции должно происходить чтение препарированных логов с помощью pyspark
    А также чтение таблицы за предыдущий день (если она есть)

    В результате должна получиться таблица, содержащая обновленные данные юзеров, которые совершали эвенты за день `date`
    Для тех юзеров, которые не совершали эвенты за предыдущий день, данные должны браться из таблицы за предыдущий день.
    
    Для каждого external_did нужно хранить datetime первого и последнего эвента из препарированных логов,
        а также актуальное значение идентификаторов и токенов (последнее ненулевое), если они встречались в логах
        
    Внутри директории devices таблицы должны быть сгруппированы по internal_app_id и date
    """
    spark = SparkSession\
    .builder\
    .master("local[*]")\
    .appName('UpdateDevicesTable')\
    .getOrCreate()

    current_day = data_date.strftime("%Y-%m-%d")
    previous_day = (data_date-timedelta(days=1)).strftime("%Y-%m-%d")
    
    df_current_day = spark.read.parquet(f"{prepared_logs_path}/{internal_app_id}/{current_day}")
    try:
        df_previous_day = spark.read.parquet(f"{prepared_logs_path}/{internal_app_id}/{previous_day}/")
    except:
        pass
    df_current_day.show(5)
    try:
        df_previous_day.show(5)
    except:
        pass

    updated_data_df = df_current_day.alias("logs").join(
        previous_day_table_df.alias("prev"),
        (col("logs.external_did") == col("prev.external_did")),
        "left"
    ).select(
        col("logs.external_did").alias("external_did"),
        col("logs.event_name").alias("event_name"),
        col("logs.event_datetime").alias("event_datetime"),
        col("logs.event_json").alias("event_json"),
        col("logs.date").alias("date"),
        col("logs.push_token").alias("push_token"),
        col("logs.ios_ifa").alias("ios_ifa"),
        col("logs.external_profile_id").alias("external_profile_id"),
        col("logs.external_app_id").alias("external_app_id"),
        col("logs.external_system").alias("external_system"),
        col("prev.internal_app_id").alias("internal_app_id")
    ).na.fill({"internal_app_id": "UNKNOWN"})
    

In [82]:
# application_1 = 24a7a8f5-35f0-4c3a-9e51-02c7f62f7f06
# 1178448 = 86ff5d12-55db-4bdf-a849-1b685bdff00b
app_id = '24a7a8f5-35f0-4c3a-9e51-02c7f62f7f06'
data_date = date(2023, 1, 5)
logs_path = './prepared/'
devices_path = 'devices/'
update_devices_table(app_id, f'{data_date.strftime("%Y-%m-%d")}', logs_path, devices_path)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/jovyan/prepared/24a7a8f5-35f0-4c3a-9e51-02c7f62f7f06/2023-01-04.