Ссылка на данный блокнот: https://colab.research.google.com/drive/1Z5fVuajC1rLZQe_w2rJ2LFEzh1SV6KXq?usp=drive_link

Устанавка необходимых модулей, как и в файле "Generation_files.ipynb" для работы Spark.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark

findspark.init()

# TODO: тоже про порядок импортов, еще импортнула dataframe, чтобы в типах функций прописать
import pyspark.sql.functions as F
import pyspark.sql.types as T
from google.colab import drive
from pyspark.sql import DataFrame, SparkSession, Window


spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

drive.mount("/content/gdrive")

Mounted at /content/gdrive


Объявление схемы данных для дальнейшего считывания JSON файлов и составления Data Frame.

In [2]:
schema = T.StructType([
    T.StructField("inn", T.StringType(), True),
    T.StructField(
        "raw_cookie",
        T.ArrayType(
            T.MapType(
                T.StringType(),
                T.StringType()
            )
        )
    ),
    T.StructField("event_type", T.StringType(), True),
    T.StructField("event_action", T.StringType(), True),
    T.StructField("data_value", T.StringType(), True),
    T.StructField("geocountry", T.StringType(), True),
    T.StructField("city", T.StringType(), True),
    T.StructField("user_os", T.StringType(), True),
    T.StructField("systemlanguage", T.StringType(), True),
    T.StructField("geoaltitude", T.StringType(), True),
    T.StructField("meta_platform", T.StringType(), True),
    T.StructField("screensize", T.StringType(), True),
    T.StructField("timestampcolumn", T.DateType(), True)
])
# TODO: конечные скобки на уровне переменной

Объявление обяхательных переменных, успользуемых в функциях

In [3]:
PATH_TO_FILES = "/content/gdrive/MyDrive/data/json/"
DICT_MATCH_CODE = {"IOS": "IDFA", "Android": "GAID"}
filenames = sorted(os.listdir(PATH_TO_FILES), reverse=True)

Функция считывания JSON файла

In [4]:
# TODO: давай и тут во всех функциях добавим доку
def read_file_json(file_name: str, schema_json_file: T.StructType) -> DataFrame:
    return spark.read.format("json")
                .load(f"{file_name}", schema=schema_json_file)

Функция объединения данных JSON файлов и удаления дубликатов в них. Удаление производится с помощью оконной функции rank.

In [5]:
# TODO: тут поправила по форматированию, точки функций должны быть на одном уровне
def union_and_dropduplicate(df1: DataFrame, df2: DataFrame) -> DataFrame:
    return (
        df1.union(df2)
           .withColumn(
               "rank",
               F.rank().over(Window.partitionBy("inn").orderBy(F.desc("timestampcolumn")))
           )
           .filter("rank = 1")
           .drop("rank")
    )

Функция считывания и создания таблицы из сгенерированных файлов

In [6]:
def init_data(schema_json_file: T.StructType, count: int = len(filenames)) -> DataFrame:
    blank_df = spark.createDataFrame([], schema=schema_json_file)
    for i in filenames[:count]:
        json_file = read_file_json(f"{PATH_TO_FILES}{i}", schema_json_file)
        blank_df = union_and_dropduplicate(blank_df, json_file)
    return blank_df

Функция, предоставляющая возможность получить значение словоря по его ключу

In [54]:
def search_values_from_maptype(col_name: str, key: str) -> str:
    return F.expr(f"filter({col_name}, x -> x.key='{key}')")[0]["value"]

Функция преобразования соответствия ОС. Используется в витрине "G"

In [8]:
@F.udf
def match_code(x: str) -> str:
    return DICT_MATCH_CODE.get(x, None)

Чтобы не обращаться постоянно к функции, считывание данных занесем в переменную "initial_data_df"

In [36]:
# TODO: функции как глаголы, поменять initial на init, select * идет по дефолту, так никогда не пишут
initial_data_df = init_data(schema)

In [None]:
# TODO: еще есть крутая штука - кеширование. сохраняет в памяти датасет, и все считается быстрее.
# count - чтобы запустить процесс (cache - это транфсормация, ничего не произойдет, если не запустить действие)
# в конце нужен unpersist()
initial_data_df.cache().count()

# Создание витрины "A".
В общем и целом данная витрина - это наша сгенерированная таблица, только без поля "INN".

In [None]:
data_mart_a = initial_data_df.drop("inn")
data_mart_a.show(5, truncate=False)

# Создание витрины "В".
В данной витрине необходимо найти все "INN" пользователей. Для поля "ID" в данной витрине была примена оконная функция, с целью показать свои знания, в последующих витриниках оконных функций использовано не будет из-за ее долгой работы при сортировке данных.

In [None]:
# Создать цикл счтывания DF по датам. удалять дубликать на объединении дней, удалять cookie по INN

# TODO: думаю, нужна просто насмотренность кода по поводу форматирования, просто пройдись по моим изменениям
data_mart_b = initial_data_df.select(
    F.row_number().over(Window.orderBy(F.col("inn").desc())).alias("id"),
    "inn"
)
data_mart_b.show(5)

# Создание витрины "С"
В данной витрине необходимо собрать куки сайта. Для это использовалась функция
"pyspark.sql.functions.expr", в которую передовалось сгенирированное значение из функции "search_values_from_cookie". Для генерации ID используется метод "monotonically_increasing_id". В последующий витринах используются эти функции.

In [None]:
# TODO: тут почему-то отступ в 2 пробела стал вместо 4, как везде
data_mart_c = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("id"),
    search_values_from_maptype("raw_cookie", "_sa_cookie_a").alias("sa_cookie_a"))
data_mart_c.show(5)

# Создание витрины "D"
Тоже самое что и в витрине "В". Подразумевается что запущен файл "Generation_files.ipynb", который к этому моменту, должен сгенерировать дополнительные JSON файлы, которые не пересекутся с витриной "В" при объединении.

In [None]:
data_mart_d = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("id"),
    "inn"
)
data_mart_d.show(5, truncate=False)

# Создание витрины "E"
Необходимо найти телефон пользователя и захешировать его алгоритмом хешировани **md5**. Делается это с помощью встроеной функции "pyspark.sql.functions.md5"

In [None]:
data_mart_e = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("id"),
    F.md5(
        search_values_from_maptype("raw_cookie", "user_phone")
    ).alias("hash_phone_md5")
)
# TODO: элиасы лучше не переносить от скобочки поля или от всего поля, если помещается

data_mart_e.show(5, truncate=False)

# Создание витрины "F"
Тоже самое что и в витрине "E", только с полем "user_mail".

In [None]:
data_mart_f = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("id"),
    F.md5(
        search_values_from_maptype("raw_cookie", "user_mail")
    ).alias("hash_email_md5")
)

data_mart_f.show(5, truncate=False)

# Создание витрины "G"

Замена значения через функцию "match_code".

In [None]:
data_mart_g = initial_data_df.select(
    (F.monotonically_increasing_id() + 1).alias("id"),
    search_values_from_maptype("raw_cookie", "user_uid").alias("user_uid"),
    match_code("user_os").alias("match_code")
)

data_mart_g.show(5, truncate=False)

# Создание обобщенной витрины объединяющая все предыдущие витрины



In [None]:
main_marts = (
    data_mart_a.select("raw_cookie", "data_value").alias("df_a")
    .join(
        data_mart_d.alias("df_d"),
        # TODO: обычно on, how можно опустить
        (
            (F.md5(F.col("df_d.inn")) == F.col("df_a.data_value")) |
            (F.sha2(F.col("df_d.inn"), 256) == F.col("df_a.data_value"))
        # TODO: закрывающая скобка всегда на уровне открывающей
        ),
        "right"
    )
    .join(
        data_mart_b.alias("df_b"),
        (
            (F.md5(F.col("df_b.inn")) == F.col("df_a.data_value")) |
            (F.sha2(F.col("df_b.inn"), 256) == F.col("df_a.data_value"))
        ),
        "left"
    )
    .join(
        data_mart_e.alias("df_e"),
        F.md5(
            search_values_from_maptype("raw_cookie", "user_phone")
        ) == F.col("df_e.hash_phone_md5"),
        "left"
    )
    .join(
        data_mart_f.alias("df_f"),
        # TODO: когда одно условие, тогда скобочки не нужны
        F.md5(
            search_values_from_maptype("raw_cookie", "user_mail")
        ) == F.col("df_f.hash_email_md5"),
        "left"
    )
    .join(
        data_mart_g.alias("df_g"),
        search_values_from_maptype("raw_cookie", "user_uid") == F.col("df_g.user_uid"),
        "left"
    )
    .join(
        data_mart_c.alias("df_c"),
        search_values_from_maptype("raw_cookie", "_sa_cookie_a") == F.col("df_c.sa_cookie_a"),
        "left"
    )
    .select(
        F.col("df_g.user_uid"),
        F.col("df_d.inn").alias("inn"),
        F.col("df_a.data_value").alias("inn_hash"),
        F.col("df_e.hash_phone_md5"),
        F.col("df_f.hash_email_md5"),
        search_values_from_maptype(
            "raw_cookie",
            "user_phone"
        ).alias("user_phone"),
        search_values_from_maptype(
            "raw_cookie",
            "user_mail"
        ).alias("user_email"),
        search_values_from_maptype(
            "raw_cookie",
            "org_uid"
        ).alias("org_uid"),
        F.col("df_d.id").alias("id_d"),
        F.col("df_c.id").alias("id_c"),
        F.col("df_b.id").alias("id_b"),
        F.col("df_e.id").alias("id_e"),
        F.col("df_f.id").alias("id_f"),
        F.col("df_g.id").alias("id_g"),
        F.array(
            search_values_from_maptype("raw_cookie", "_sa_cookie_a"),
            search_values_from_maptype("raw_cookie", "_fa_cookie_a"),
            search_values_from_maptype("raw_cookie", "_ym_cookie_c"),
            search_values_from_maptype("raw_cookie", "_fbp")
        ).alias("array_cookie")
     )
)
main_marts.show(10, truncate=False)

In [None]:
# TODO: тот самый unpersist - возвращаем ресурсы
initial_data_df.unpersist()

In [None]:
# TODO: всегда останавливаем спарк сессию
spark.stop()