In [13]:

import pandas as pd
import pymongo
from decouple import config
import country_converter
import sqlalchemy
import os
from dotenv import load_dotenv

In [14]:
load_dotenv()

True

In [16]:
# Disable 'setting with copy' warning
pd.options.mode.chained_assignment = None

In [48]:
primary_keys = {
    "share_events": "insert_id",
    "users": "user_id",
    "organizations": "organization__id",
    "countries": "country_code"
}

username = config("POSTGRES_USERNAME")
password = config("POSTGRES_PASSWORD")
host = config("POSTGRES_HOST")
port = config("POSTGRES_PORT")
dbname = config("POSTGRES_DB_NAME")
engine = sqlalchemy.create_engine(f"postgresql://{username}:{password}@{host}:{port}/{dbname}")

In [49]:
def remove_prefix(df, prefix):
    df.rename(columns=lambda x: x[len(prefix) :] if x.startswith(prefix) else x, inplace=True)
    return df

In [50]:
share_events_df = pd.read_csv("shared_events_for_notebook.csv")
share_events_df.head()

Unnamed: 0.1,Unnamed: 0,_id,server_upload_time,server_received_time,event_type,user_creation_time,city,language,device_model,processed_time,...,event_properties_categories,user_properties_organization___v,user_properties_organization_status,user_properties_organization_logo_url_url,user_properties_organization_owner_id,user_properties_organization_updated_at,user_properties_organization_code,user_properties_organization_created_at,domain,insert_key
0,0,6101a7cf67189533badad509,1620034324392,1620034324384,Share Presentation:Share Center,1617104362001,Tananger,Norwegian,Mac,1620034326331,...,,,,,,,,,stream.masterwizr.com,
1,1,6101a7cf67189533badad512,1620204999636,1620204999630,Share Template Show Room:Studio,1620204999761,Stavanger,English,Windows,1620205001489,...,,,,,,,,,studio.masterwizr.com,
2,2,6101a7cf67189533badad52f,1624542558735,1624542558728,Share Presentation:Content Admin,1615276990031,Tirana,English,Windows,1624542561706,...,,,,,,,,,stream.masterwizr.com,
3,3,6101a7cf67189533badad4fd,1619102304194,1619102304183,Share Template Show Room:Studio,1619102164202,Ytre Arna,Norwegian,Windows,1619102306093,...,,,,,,,,,studio.masterwizr.com,
4,4,6101a7cf67189533badad505,1619531913152,1619531913138,Share Presentation:Content Admin,1616399457149,Oslo,Norwegian,Mac,1619531916312,...,,,,,,,,,stream.masterwizr.com,


In [54]:
users_df = share_events_df.loc[:, share_events_df.columns.str.startswith("user_properties")]

# Add user_id to events data because we will need it as a foreign key to the ShareEvents table
users_df["user_properties_user_id"] = share_events_df["user_id"]

# Prepare columns from `user_properties`  that will later be dropped from the initial dataset
user_cols_to_drop = list(users_df.columns)

# Rename `user_properties` columns to remove prefix
prefix = "user_properties_"
users_df = remove_prefix(users_df, prefix)

users_df.head()

Unnamed: 0,email,name,user_unique_identifier,device_id,session_id,organization_name,organization_type,organization__id,role_id,organization___v,organization_status,organization_logo_url_url,organization_owner_id,organization_updated_at,organization_code,organization_created_at,user_id
0,leif@masterwizr.no,Leif Birger Kleppa,943e2fc4-a66b-42c8-a7c6-81380a0a50b1,-Pco2mlaPZGQDdOEgKi9IY,1620034000000.0,mw-intranet,Company,5eff44d625053a0dc1c7bb98,5db8291717a3e1072298a291,,,,,,,,5f6082b63454bc01ebd20318
1,,,69dc57bf-5e52-4a07-a811-480e26e2c7d1,rbDpjPpB46N4_rBi2Oq5Sf,1620205000000.0,,,,,,,,,,,,69dc57bf-5e52-4a07-a811-480e26e2c7d1
2,info@masterwizr.com,Jone Smedsvig,a9866417-93b6-484d-9619-714bb34b4b4a,UIMMG1fEsYBH3bO6PLEDtq,1624541000000.0,mw-sales,Company,5f8670241916d600128dcfc0,5db8291717a3e1072298a291,,,,,,,,5eff451125053a0dc1c7bc52
3,,,eb41d33c-4552-47c2-b931-9ebdd74b4297,dZwxHg0a1s8FV050nTrmVT,1619100000000.0,,,,,,,,,,,,eb41d33c-4552-47c2-b931-9ebdd74b4297
4,nico@masterwizr.no,Nico Rasmussen,77640c84-db18-4f33-9b24-a201dea87991,RmU7tllO5CIQQZkxfyhGPp,1619532000000.0,mw-sales,Company,5f8670241916d600128dcfc0,5db8291717a3e1072298a291,,,,,,,,5f5a16c659be2422870c53cc


In [55]:
users_df.dtypes

email                         object
name                          object
user_unique_identifier        object
device_id                     object
session_id                   float64
organization_name             object
organization_type             object
organization__id              object
role_id                       object
organization___v             float64
organization_status          float64
organization_logo_url_url    float64
organization_owner_id        float64
organization_updated_at      float64
organization_code            float64
organization_created_at      float64
user_id                       object
dtype: object

In [40]:
def prepare_data(share_events_df):
    # Extracting `user_properties` data
    users_df = share_events_df.loc[:, share_events_df.columns.str.startswith("user_properties")]

    # Add user_id to events data because we will need it as a foreign key to the ShareEvents table
    users_df["user_properties_user_id"] = share_events_df["user_id"]

    # Prepare columns from `user_properties`  that will later be dropped from the initial dataset
    user_cols_to_drop = list(users_df.columns)

    # Rename `user_properties` columns to remove prefix
    prefix = "user_properties_"
    users_df = remove_prefix(users_df, prefix)

    # Prepare to create user table - remove duplicate user ids
    users_df.drop_duplicates(subset=["user_id"], inplace=True)

    # Drop `user_properties` from initial dataframe, now that they have been extracted
    share_events_df.drop(columns=user_cols_to_drop, inplace=True, errors="ignore")
    share_events_df.drop_duplicates(subset=["insert_id"], inplace=True)

    # Extract `organization` data
    organizations_df = users_df[["organization__id", "organization_name", "organization_type"]]
    organizations_df.dropna(subset=["organization__id"], inplace=True)
    organizations_df.drop_duplicates(subset=["organization__id"], inplace=True)
    org_cols_to_drop = ["organization_name", "organization_type", "organization___v",
        "organization_status", "organization_logo_url_url", "organization_owner_id",
        "organization_updated_at", "organization_code", "organization_created_at"]
    users_df.drop(columns=org_cols_to_drop, inplace=True)

    # Extract `location` data
    share_events_df["country_code"] = country_converter.convert(names=list(share_events_df["country"]), to="ISO3")
    countries_df = share_events_df[["country", "country_code"]]
    share_events_df.drop(columns="country", inplace=True)
    countries_df.drop_duplicates(subset=["country_code"], inplace=True)

    ### Cleaning up inconsistent share_events data
    # Add a name to event types that have no name and appear as links ("http...")
    share_events_df.loc[
        share_events_df["event_type"].str.startswith("http"), "event_type"
    ] = "Share Show Room:Studio"

    # rename Share Presentation:ContentAdmin to Share Presentation:Content Admin
    share_events_df.event_type.replace(
        "Share Presentation:ContentAdmin",
        "Share Presentation:Content Admin",
        inplace=True,
    )

    # Final cleanup of dataframes
    share_events_df.drop(columns="_id", inplace=True)
    share_events_df.reset_index(drop=True, inplace=True)
    users_df.reset_index(drop=True, inplace=True)
    organizations_df.reset_index(drop=True, inplace=True)
    countries_df.reset_index(drop=True, inplace=True)

    # Add dataframe names that will be used as table names in the database
    share_events_df.name = "share_events"
    users_df.name = "users"
    organizations_df.name = "organizations"
    countries_df.name = "countries"

    # return [share_events_df, users_df, organizations_df, countries_df]
    return [users_df]

In [43]:
def read_mongo_data(collection_name):
    user = config("MONGO_USER")
    password = config("MONGO_PASSWORD")

    try:
        client = pymongo.MongoClient(f"mongodb+srv://{user}:{password}@cluster0.yhpvw.mongodb.net/masterwizr-data-db?retryWrites=true&w=majority")
        db = client["masterwizr-data-db"]
        collection = db[collection_name]
        events = list(collection.find())
    except Exception as e:
        return {"statusCode": 500, "body": {"message": "Error connecting to MongoDB"}}

    return events

def sql_insert(df, table_name):
    try:
        df.to_sql(table_name, con=engine, if_exists="append")
    except sqlalchemy.exc.IntegrityError:
        pass
    except Exception as e:
        return {"statusCode": 500, "body": {"message": "Error inserting into Postgres DB"}}

def add_primary_key(table_name, primary_key):
    engine.execute(f"ALTER TABLE {table_name} DROP CONSTRAINT {table_name}_pkey")
    engine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({primary_key})")

def migrate_data(enviroment):
    # data = read_mongo_data(enviroment)
    data_dfs = prepare_data(share_events_df)
    for df in data_dfs:
        sql_insert(df, df.name)
        # add_primary_key(df.name, primary_keys[df.name])

In [None]:
migrate_data("production")

In [None]:
share_events_df_2 = 