# Data preparation before import


In [1]:
import os
import warnings

import pandas as pd

warnings.filterwarnings("ignore")

In [2]:
CLEANED_DATA_PATH = "../data/cleaned"
MONGO_DATA_PATH = "../data/mongo"
POSTGRES_DATA_PATH = "../data/postgres"
NEO4J_DATA_PATH = "../data/neo4j"

OPTIMAL_DATA_PATH = "../data/optimal"

## Load data


In [3]:
def load_df(name: str) -> pd.DataFrame:
    return pd.read_csv(os.path.join(CLEANED_DATA_PATH, f"{name}.csv"))

## Save data


In [4]:
def save_df_json(name: str, dataset: pd.DataFrame, path: str = MONGO_DATA_PATH):
    if not os.path.exists(path):
        os.makedirs(path)

    dataset.to_json(
        os.path.join(path, f"{name}.json"),
        index=False,
        orient="records",
        lines=True,
    )


def save_df(name: str, dataset: pd.DataFrame, path: str = POSTGRES_DATA_PATH):
    if not os.path.exists(path):
        os.makedirs(path)

    dataset.to_csv(os.path.join(path, f"{name}.csv"), index=False)

## Preparation


### MongoDB


In [None]:
def generate_mongo():
    campaigns_df = load_df("campaigns")
    client_purchase_df = load_df("client_first_purchase_date")
    events_df = load_df("events")
    friends_df = load_df("friends")
    messages_df = load_df("messages")

    # Campaign
    print("Preparing campaigns...")
    campaigns_df["campaign_id"] = campaigns_df["id"].astype(str)
    campaigns_df.drop(columns=["id"], inplace=True)
    save_df_json("campaigns", campaigns_df)

    # Clients
    print("Preparing clients...")
    clients_to_str_columns = ["client_id", "user_id"]
    client_purchase_df[clients_to_str_columns] = client_purchase_df[
        clients_to_str_columns
    ].astype(str)
    save_df_json("client_first_purchase_date", client_purchase_df)

    # Events
    print("Preparing events...")
    events_to_str_columns = ["product_id", "category_id", "user_id"]
    events_df[events_to_str_columns] = events_df[events_to_str_columns].astype(str)
    save_df_json("events", events_df)

    # Friends
    print("Preparing friends...")
    friends_to_str_columns = ["friend1", "friend2"]
    friends_df[friends_to_str_columns] = friends_df[friends_to_str_columns].astype(str)
    save_df_json("friends", friends_df)

    # Messages
    print("Preparing messages...")
    messages_to_str_columns = ["client_id", "campaign_id", "user_id"]
    messages_df[messages_to_str_columns] = messages_df[messages_to_str_columns].astype(
        str
    )
    save_df_json("messages", messages_df)


# generate_mongo()

### PostgreSQL


In [8]:
def generate_postgres():
    campaigns_df = load_df("campaigns")
    client_purchase_df = load_df("client_first_purchase_date")
    events_df = load_df("events")
    friends_df = load_df("friends")
    messages_df = load_df("messages")

    # Products
    print("Building 'products'...")
    products_df = (
        events_df[["product_id", "category_id", "category_code", "brand", "price"]]
        .drop_duplicates()
        .groupby(["product_id"], as_index=False)
        .agg(
            {
                "brand": "first",
                "price": "first",
                "category_id": "first",
                "category_code": "first",
            }
        )
    )
    save_df("products", products_df)

    # Friends
    print("Building 'friends'...")
    save_df("friends", friends_df)

    # Clients
    print("Building 'clients'...")
    clients_df = (
        pd.merge(
            client_purchase_df,
            messages_df[
                ["client_id", "user_id", "user_device_id", "email_provider"]
            ].drop_duplicates(),
            on=["client_id", "user_id", "user_device_id"],
            how="outer",
        )
        .groupby(["client_id", "user_id", "user_device_id"], as_index=False)
        .agg({"first_purchase_date": "first", "email_provider": "first"})
    )
    save_df("clients", clients_df)

    # Events
    print("Building 'events'...")
    save_df(
        "events",
        events_df.drop(columns=["category_id", "category_code", "brand"]),
    )

    # Campaign
    print("Building 'campaigns'...")
    save_df(
        "campaigns",
        campaigns_df.rename(columns={"id": "campaign_id"}).astype(
            {
                int_column: pd.Int64Dtype()
                for int_column in [
                    "total_count",
                    "position",
                    "hour_limit",
                    "subject_length",
                ]
            }
        ),
    )

    # Messages
    print("Building 'messages'...")
    save_df(
        "messages",
        messages_df.drop(columns=["id", "email_provider", "user_id", "user_device_id"]),
    )


# generate_postgres()

### Neo4j


In [None]:
def generate_neo4j():
    campaigns_df = load_df("campaigns")
    client_purchase_df = load_df("client_first_purchase_date")
    events_df = load_df("events")
    friends_df = load_df("friends")
    messages_df = load_df("messages")

    # NODES

    # Products
    print("Building 'products'...")
    products_df = (
        events_df[["product_id", "category_id", "category_code", "brand", "price"]]
        .drop_duplicates()
        .groupby(["product_id"], as_index=False)
        .agg(
            {
                "brand": "first",
                "price": "first",
                "category_id": "first",
                "category_code": "first",
            }
        )
    )
    save_df("products", products_df, NEO4J_DATA_PATH)

    # Clients
    print("Building 'clients'...")
    clients_df = (
        pd.merge(
            client_purchase_df,
            messages_df[
                ["client_id", "user_id", "user_device_id", "email_provider"]
            ].drop_duplicates(),
            on=["client_id", "user_id", "user_device_id"],
            how="outer",
        )
        .groupby(["client_id", "user_id", "user_device_id"], as_index=False)
        .agg({"first_purchase_date": "first", "email_provider": "first"})
    )
    save_df("clients", clients_df, NEO4J_DATA_PATH)

    # Campaign
    print("Building 'campaigns'...")
    save_df(
        "campaigns",
        campaigns_df.rename(columns={"id": "campaign_id"}).astype(
            {
                int_column: pd.Int64Dtype()
                for int_column in [
                    "total_count",
                    "position",
                    "hour_limit",
                    "subject_length",
                ]
            }
        ),
        NEO4J_DATA_PATH,
    )

    # Messages
    print("Building 'messages'...")
    save_df(
        "messages",
        messages_df.drop(
            columns=[
                "id",
                "email_provider",
                "user_id",
                "user_device_id",
                "campaign_id",
                "client_id",
                "sent_at",
            ]
        ),
        NEO4J_DATA_PATH,
    )

    # Users
    print("Building 'users'...")
    save_df(
        "users",
        pd.DataFrame(
            {
                "user_id": pd.concat(
                    [
                        client_purchase_df["user_id"],
                        events_df["user_id"],
                        messages_df["user_id"],
                        friends_df["friend1"],
                        friends_df["friend2"],
                    ],
                    ignore_index=True,
                ).drop_duplicates(ignore_index=True)
            }
        ),
        NEO4J_DATA_PATH,
    )

    # RELATIONS

    # FRIENDS_WITH
    print("Building 'friends'...")
    save_df("friends", friends_df, NEO4J_DATA_PATH)

    # RECEIVED
    print("Building 'received'...")
    save_df("received", messages_df[["message_id", "client_id"]], NEO4J_DATA_PATH)

    # SENT_AT
    print("Building 'sent_from'...")
    save_df(
        "sent_from",
        messages_df[["message_id", "campaign_id", "sent_at"]],
        NEO4J_DATA_PATH,
    )

    # BELONGS_TO
    print("Building 'belongs_to'...")
    save_df(
        "belongs_to",
        client_purchase_df[
            [
                "client_id",
                "user_id",
            ]
        ],
        NEO4J_DATA_PATH,
    )

    # Events
    for event_type in [
        "view",
        "cart",
        "purchase",
    ]:
        print(f"Building '{event_type}'...")
        type_df = events_df[
            ["user_id", "product_id", "event_type", "price", "user_session", "event_time"]
        ][events_df["event_type"] == event_type].drop(columns=["event_type"])
        save_df(
            event_type,
            type_df,
            NEO4J_DATA_PATH,
        )


# generate_neo4j()

Building 'products'...
Building 'clients'...
Building 'campaigns'...
Building 'messages'...
Building 'users'...
Building 'friends'...
Building 'received'...
Building 'sent_from'...
Building 'belongs_to'...
Building 'view'...
Building 'cart'...
Building 'purchase'...


### Hybrid model

In [9]:
def generate_hybrid():
    campaigns_df = load_df("campaigns")
    client_purchase_df = load_df("client_first_purchase_date")
    events_df = load_df("events")
    friends_df = load_df("friends")
    messages_df = load_df("messages")

    events_df["event_id"] = events_df.index
    events_df["event_id"] = events_df["event_id"].astype(str)

    # Postgres
    # --------------------------------------------------
    # Products
    print("Building 'products_postgres'...")
    products_df = (
        events_df[["product_id", "category_id", "category_code", "brand", "price"]]
        .drop_duplicates()
        .groupby(["product_id"], as_index=False)
        .agg(
            {
                "brand": "first",
                "price": "first",
                "category_id": "first",
                "category_code": "first",
            }
        )
    )
    save_df("products_postgres", products_df, OPTIMAL_DATA_PATH)

    # Clients
    print("Building 'clients_postgres'...")
    clients_df = (
        pd.merge(
            client_purchase_df,
            messages_df[
                ["client_id", "user_id", "user_device_id", "email_provider"]
            ].drop_duplicates(),
            on=["client_id", "user_id", "user_device_id"],
            how="outer",
        )
        .groupby(["client_id", "user_id", "user_device_id"], as_index=False)
        .agg({"first_purchase_date": "first", "email_provider": "first"})
    )
    save_df("clients_postgres", clients_df, OPTIMAL_DATA_PATH)

    # Campaign
    print("Building 'campaigns_postgres'...")
    save_df(
        "campaigns_postgres",
        campaigns_df.rename(columns={"id": "campaign_id"})[
            [
                "campaign_id",
                "campaign_type",
                "channel",
                "topic",
                "started_at",
                "finished_at",
                "total_count",
            ]
        ].astype(
            {
                "total_count": pd.Int64Dtype(),
                "campaign_id": str,
            }
        ),
        OPTIMAL_DATA_PATH,
    )

    # Messages
    print("Building 'messages_postgres'...")
    save_df(
        "messages_postgres",
        messages_df[
            [
                "message_id",
                "campaign_id",
                "message_type",
                "client_id",
                "channel",
                "category",
                "date",
                "sent_at",
            ]
        ].astype(
            {
                "campaign_id": str,
            }
        ),
        OPTIMAL_DATA_PATH,
    )

    # --------------------------------------------------

    # MongoDB
    # --------------------------------------------------

    # Campaign
    print("Preparing 'campaigns_mongo'...")
    campaigns_df["campaign_id"] = campaigns_df["id"].astype(str)
    campaigns_df.drop(
        columns=[
            "id",
            "channel",
            "topic",
            "started_at",
            "finished_at",
            "total_count",
        ],
        inplace=True,
    )
    save_df_json("campaigns_mongo", campaigns_df, OPTIMAL_DATA_PATH)

    # Events
    print("Preparing 'events_mongo'...")
    save_df_json(
        "events_mongo",
        events_df[["event_id", "event_time", "event_type", "price"]],
        OPTIMAL_DATA_PATH,
    )

    # Messages
    print("Preparing 'messages_mongo'...")
    messages_to_str_columns = ["client_id", "campaign_id", "user_id"]
    messages_df[messages_to_str_columns] = messages_df[messages_to_str_columns].astype(
        str
    )
    save_df_json(
        "messages_mongo",
        messages_df.drop(
            columns=[
                "campaign_id",
                "client_id",
                "channel",
                "category",
                "date",
                "sent_at",
            ]
        ),
        OPTIMAL_DATA_PATH,
    )

    # --------------------------------------------------

    # Neo4j
    # --------------------------------------------------

    # NODES

    # Products
    print("Building 'products_neo4j'...")
    save_df("products_neo4j", products_df[["product_id"]], OPTIMAL_DATA_PATH)

    # Clients
    print("Building 'clients_neo4j'...")

    save_df("clients_neo4j", clients_df[["client_id"]], OPTIMAL_DATA_PATH)

    # Users
    print("Building 'users_neo4j'...")
    users_df = pd.DataFrame(
        {
            "user_id": pd.concat(
                [
                    client_purchase_df["user_id"],
                    events_df["user_id"],
                    messages_df["user_id"],
                    friends_df["friend1"],
                    friends_df["friend2"],
                ],
                ignore_index=True,
            ).drop_duplicates(ignore_index=True)
        }
    )
    save_df(
        "users_neo4j",
        users_df,
        OPTIMAL_DATA_PATH,
    )

    # RELATIONS

    # FRIENDS_WITH
    print("Building 'friends_neo4j'...")
    save_df("friends_neo4j", friends_df, OPTIMAL_DATA_PATH)

    # BELONGS_TO
    print("Building 'belongs_to_neo4j'...")
    save_df(
        "belongs_to_neo4j",
        client_purchase_df[
            [
                "client_id",
                "user_id",
            ]
        ],
        OPTIMAL_DATA_PATH,
    )

    # Events
    for event_type in [
        "view",
        "cart",
        "purchase",
    ]:
        print(f"Building '{event_type}_neo4j'...")
        type_df = events_df[["user_id", "product_id", "event_type", "event_id"]][
            events_df["event_type"] == event_type
        ].drop(columns=["event_type"])
        save_df(
            f"{event_type}_neo4j",
            type_df,
            OPTIMAL_DATA_PATH,
        )


# generate_hybrid()