Для этого задания данные выгружены из 2х различных систем, хранящих пользовательские логи. \
Для каждой из систем данные разбиты по дню выгрузки. \
В данных за разные дни нет пересечений.

В результате ожидается, что можно будет взять сырые логи для разных аналитических систем, разбитые по дням, и привести их к общему виду с помощью первой функции. \
А после этого взять обработанные логи и последовательно день за днем с помощью второй функции получить итеративно увеличивающиеся день ото дня версии таблиц с девайсами. \
**Оба задания нужно сделать с использованием pyspark**

## Task 1: Унификация данных из внешних аналитических систем

Нужно написать функцию на python, которая будет принимать на вход путь к директории с логами и дату, читать логи за эту дату и унифицировать их, приводя к общей для 2х систем схеме препарированных логов

Препарированные логи должны содержать колонки:

* `external_did` – идентификатор устройства из внешней системы
* `event_name` – название эвента
* `event_datetime`
* `event_json` – json-строка с параметрами эвента. \
    Для одной из систем колонка заполнена для некоторых записей в таблице и нужно брать её из сырых логов без изменений \
    Для другой это не так и сюда нужно складывать user_properties + event_properties
* `date`
* `push_token` \
    Для одной из систем сюда нужно записать 'registration_id' из 'user_property' \
    Для другой (где этого property нет) сюда нужно записать то же значение, что в external_did
* `ios_ifa` – идентификатор, проставляемый для ios устройства (IDFA)
* `external_profile_id` – идентификатор юзера / профиля из внешней системы
* `external_app_id` – application_id из сырых логов
* `external_system`
* `internal_app_id` – какой-нибудь вымышленный generic идентификатор приложения, чтобы отличать препарированные логи одного  приложения от другого (потребуется для следующего задания)
* `country_iso_code` – ISO 3166-1 Alpha-2 code
* `device_locale` – язык пользователя

Можно обратить внимание, например, на https://www.geonames.org/ и доджойнить данные оттуда по Locale. 

In [1]:
# Импорт библиотек
import pandas as pd
import tarfile
import os
import sys
from datetime import date, timedelta
from pathlib import Path, PurePath
from typing import Union
from pyspark.sql import functions as f
from pyspark.sql import SparkSession

In [2]:
# Необходимо для корректной работы pyspark на windows
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
# Создание спарк сессии на локальной машине
spark = SparkSession\
    .builder\
    .master("local[*]")\
    .appName("Python Spark example")\
    .getOrCreate()

In [4]:
def unpack_tar_gz(
        archive_path: str, 
        destination_path: str
    ) -> None:
    """
    Функция распаковки архивов tar.

    Функция принимает на вход путь к архиву и
    путь для извлечения файлов
    
    Аргументы:
        "archive_path" - путь к архиву
        "destination_path" - путь для извлечения файлов
    """
    with tarfile.open(archive_path, 'r:gz') as tar:
        tar.extractall(destination_path)

# Распаковка архивов с данными
destination_path = 'Q:\Python\data_engineer_task' # Необходимо указать свой путь
unpack_tar_gz('raw_data_1.tar.gz', destination_path)
unpack_tar_gz('raw_data_2.tar.gz', destination_path)        

In [5]:
def prepare_raw_logs(
    external_system_name: str,
    date: Union[str, date],
    raw_logs_path: Union[str, Path],
    prepared_logs_path: Union[str, Path]
) -> None:
    """
    external_system_name: python-строка, название внешней системы
    date: дата выгрузки логов. Внутри директорий с сырыми логами данные сгрупированны по дате
    raw_logs_path: путь к директории с сырыми логами
    prepared_logs_path: путь к директории с препарированными логами
    
    Внутри функции должно происходить чтение логов с помощью pyspark,
    приведение их к общей схеме и запись в директорию с препарированными логами.
    Препарированные логи должны быть сгрупированны по internal_app_id и date
    """
    # Чтение сырых логов 
    raw_logs = spark.read.parquet(raw_logs_path, header=True)

    if external_system_name == "external_system_1":
       
        # Получаем данные о странах и кодах с сайта
        data = pd.read_html("http://www.geonames.org/statistics/") 
        data = pd.DataFrame(data[1])
        data = data[["Country", "CountryCode"]] # Выбираем из данных необходимые столбцы
        data.columns = ["device_locale", "country_iso_code"] # Переименуем столбцы
        geo_table = spark.createDataFrame(data) # Преобразуем Pandas датафрейм в Pyspark

        # Получаем значение application_id из пути сырых логов
        path = PurePath(raw_logs.inputFiles()[0])
        app_id = path.parts[-3]
        app_id = app_id.split("=")[1]
       
       # Преобразуем данные из первой системы
        raw_logs = raw_logs.withColumnRenamed("device_id", "external_did")
        raw_logs = raw_logs.withColumnRenamed("event_type", "event_name")
        raw_logs = raw_logs.withColumnRenamed("event_time", "event_datetime")
        raw_logs = raw_logs.withColumn("event_json", f.concat("user_properties", "event_properties"))
        raw_logs = raw_logs.withColumn("push_token", f.get_json_object(raw_logs.user_properties, "$.registration_id"))
        raw_logs = raw_logs.withColumnRenamed("idfa", "ios_ifa")
        raw_logs = raw_logs.withColumnRenamed("user_id", "external_profile_id")
        raw_logs = raw_logs.withColumn("external_app_id", f.lit(app_id))
        raw_logs = raw_logs.withColumn("internal_app_id", f.substring("external_system", -1, 1))
        raw_logs = raw_logs.withColumn("device_locale", f.substring("Language", 1, 6)) 

        # Выбор необходимых столбцов
        prepare_logs = raw_logs.join(geo_table, "device_locale").select(raw_logs.external_did,
                                                                        raw_logs.event_name,
                                                                        raw_logs.event_datetime,
                                                                        raw_logs.event_json,
                                                                        raw_logs.date,
                                                                        raw_logs.push_token,
                                                                        raw_logs.ios_ifa,
                                                                        raw_logs.external_profile_id,
                                                                        raw_logs.external_app_id,
                                                                        raw_logs.external_system,
                                                                        raw_logs.internal_app_id,
                                                                        geo_table.country_iso_code,
                                                                        raw_logs.device_locale
                                                                        )
        
    if external_system_name == "external_system_2":
        
        # Получаем значение application_id из пути сырых логов
        path = PurePath(raw_logs.inputFiles()[0])
        app_id = path.parts[-3]
        app_id = app_id.split("=")[1]

        # Преобразуем данные из второй системы
        raw_logs = raw_logs.withColumnRenamed("uniq_device_id", "external_did") 
        raw_logs = raw_logs.withColumn("push_token", raw_logs.external_did)
        raw_logs = raw_logs.withColumnRenamed("profile_id", "external_profile_id")
        raw_logs = raw_logs.withColumnRenamed("application_id", "external_app_id")
        raw_logs = raw_logs.withColumn("internal_app_id", f.substring("external_system", -1, 1))

        # Выбор необходимых столбцов
        prepare_logs = raw_logs.select(raw_logs.external_did,
                                       raw_logs.event_name,
                                       raw_logs.event_datetime,
                                       raw_logs.event_json,
                                       raw_logs.date,
                                       raw_logs.push_token,
                                       raw_logs.ios_ifa,
                                       raw_logs.external_profile_id,
                                       raw_logs.external_app_id,
                                       raw_logs.external_system,
                                       raw_logs.internal_app_id,
                                       raw_logs.country_iso_code,
                                       raw_logs.device_locale
                                       )

    prepare_logs = prepare_logs.filter(prepare_logs.date == date) # Отбираем логи по дате

    # Сохранение препарированных логов в файл 
    internal_app_id = external_system_name[-1]          
    (
    prepare_logs.write
                .mode("overwrite")
                .parquet(f"{prepared_logs_path}/internal_app_id={internal_app_id}/{str(date)}")
    )

In [6]:
prepare_raw_logs("external_system_1", date(year=2023, month=1, day=5), 
                 "Q:\Python\data_engineer_task\external_system_1", "Q:\Python\date")

df1 = spark.read.parquet(f"Q:\Python\date\internal_app_id=1\{date(year=2023, month=1, day=5)}")
df1.show()

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+--------------------+---------------+-----------------+---------------+----------------+-------------+
|        external_did|          event_name|      event_datetime|          event_json|      date|          push_token|ios_ifa| external_profile_id|external_app_id|  external_system|internal_app_id|country_iso_code|device_locale|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+-------+--------------------+---------------+-----------------+---------------+----------------+-------------+
|2bb2b7629ff9977c8...|         AppRolledUp|2023-01-05 03:04:...|{"user_property_1...|2023-01-05|ehuQJKJXpkcCt_NQt...|   null|45d3abd2-6756-4eb...|  application_1|external_system_1|              1|              RU|       Russia|
|2bb2b7629ff9977c8...|  PlaceScreen_Loaded|2023-01-05 03:04:...|{"user_property_1...|202

## Taks 2: создание таблицы девайсов с актуальным состоянием колонок из препарированных логов

In [7]:
def update_devices_table(
    internal_app_id: Union[int, str],
    date: Union[str, date],
    prepared_logs_path: Union[str, Path],
    devices_path: Union[str, Path]
) -> None:
    """
    date: дата препарированных логов
    internal_app_id: идентификатор приложения
    prepared_logs_path: путь к директории с препарированными логами
    devices_path: путь к директории с таблицами devices
    
    Внутри функции должно происходить чтение препарированных логов с помощью pyspark
    А также чтение таблицы за предыдущий день (если она есть)

    В результате должна получиться таблица, содержащая обновленные данные юзеров, которые совершали эвенты за день `date`
    Для тех юзеров, которые не совершали эвенты за предыдущий день, данные должны браться из таблицы за предыдущий день.
    
    Для каждого external_did нужно хранить datetime первого и последнего эвента из препарированных логов,
    а также актуальное значение идентификаторов и токенов (последнее ненулевое), если они встречались в логах
        
    Внутри директории devices таблицы должны быть сгруппированы по internal_app_id и date
    """
    
    # Чтение препарированных логов
    prep_logs = spark.read.parquet(f"{prepared_logs_path}/internal_app_id={internal_app_id}/{str(date)}")

    # Чтение таблицы за предыдущий день (если она есть)   
    start_day = date
    yesterday = start_day - timedelta(days=1)
    try:    
        yesterday_logs = spark.read.parquet(f"{devices_path}/internal_app_id={internal_app_id}/{str(yesterday)}")
    except Exception:
        yesterday_logs = None
        print("Таблица за предыдущий день не найдена")


    # Преобразование данных
    if (prep_logs.filter(f.col("event_name").isNull())).count() == 0:
        devices_logs = prep_logs.filter(prep_logs.event_name.isNotNull())

    elif (prep_logs.filter(f.col("event_name").isNull())).count() != 0 and yesterday_logs != None:                                 
        devices_logs = prep_logs.filter(prep_logs.event_name.isNull())          
        devices_logs = devices_logs.join(yesterday_logs, "external_did").select(devices_logs.external_did,
                                                                                devices_logs.event_datetime,
                                                                                devices_logs.date,
                                                                                yesterday_logs.first_event_datetime,
                                                                                yesterday_logs.last_event_datetime,  
                                                                                yesterday_logs.push_token,
                                                                                yesterday_logs.ios_ifa,
                                                                                yesterday_logs.external_profile_id,
                                                                                yesterday_logs.external_app_id  
                                                                                )
    else:
        devices_logs = prep_logs.filter(prep_logs.event_name.isNotNull())
        
    # Агрегация таблицы    
    devices_logs = devices_logs.groupBy(f.col("external_did"), f.col("date")).agg(
        f.min("event_datetime").alias("first_event_datetime"),
        f.max("event_datetime").alias("last_event_datetime"),
        f.max(f.when(f.col("push_token").isNotNull(), f.col("push_token"))).alias("push_token"),
        f.max(f.when(f.col("ios_ifa").isNotNull(), f.col("ios_ifa"))).alias("ios_ifa"),
        f.max(f.when(f.col("external_profile_id").isNotNull(), 
                     f.col("external_profile_id"))).alias("external_profile_id"),
        f.max(f.when(f.col("external_app_id").isNotNull(), 
                     f.col("external_app_id"))).alias("external_app_id")
    )

    # Сохранение таблицы
    (
    devices_logs.write
                .mode("overwrite")
                .parquet(f"{devices_path}/internal_app_id={internal_app_id}/{str(date)}")
    )

In [8]:
update_devices_table("1", date(year=2023, month=1, day=5), "Q:\Python\date", "Q:\Python\device")

df2 = spark.read.parquet(f"Q:\Python\device\internal_app_id=1\{date(year=2023, month=1, day=5)}")
df2.show()

Таблица за предыдущий день не найдена
+--------------------+----------+--------------------+--------------------+--------------------+-------+--------------------+---------------+
|        external_did|      date|first_event_datetime| last_event_datetime|          push_token|ios_ifa| external_profile_id|external_app_id|
+--------------------+----------+--------------------+--------------------+--------------------+-------+--------------------+---------------+
|0378698611e8ab440...|2023-01-05|2023-01-04 19:50:...|2023-01-05 15:48:...|d-mIqF1ZlkB9io7uJ...|   null|3329a672-dc49-483...|  application_1|
|08a10c8277cbf2675...|2023-01-05|2022-12-22 15:12:...|2023-01-05 20:24:...|fp5UpE4pKkI9uiPJX...|   null|0ba29b76-a579-470...|  application_1|
|0c0ec224afb1502b2...|2023-01-05|2022-11-06 21:12:...|2023-01-05 13:49:...|eEuS0B350ETlgwX58...|   null|6d3ecf59-fd51-454...|  application_1|
|0df32151a756d61c6...|2023-01-05|2023-01-05 13:12:...|2023-01-05 13:26:...|ddVYxsIfx0sQt1-mP...|   null|53fe6b

In [9]:
spark.stop()