# Data migration

## Required imports

Remember to install all packages required inside your working virtual Python environment

In [1]:
import pandas
import sqlalchemy
import json
import numpy
import requests
import re
import subprocess
import datetime
from json import JSONDecodeError
from copy import deepcopy
from http.cookiejar import CookieJar, MozillaCookieJar
from tqdm.notebook import tqdm
from datetime import date, datetime
from dateutil.relativedelta import relativedelta

## Oracle Client Libraries

To use **cx_Oracle connector**, it is required to install some libraries inside the execution environment: [Oracle Client Library](https://oracle.github.io/odpi/doc/installation.html#oracle-client-library-loading). Please install them and set the required environment variables before running this notebook

## Constants

Space string

In [2]:
SPACE = " "

Clean all interspaces for a given string

In [3]:
clean_spaces = lambda content: SPACE.join([c.strip() for c in content.strip().split(SPACE)])

Custom JSON parser

In [4]:
def parse(obj):
    """JSON serializer for objects not serializable by default json code"""
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    if isinstance(obj, set):
        return list(obj)
    if isinstance(obj, requests.Response):
        return obj.json()
    raise TypeError ("Type %s not serializable" % type(obj))

Date reference to label a campaign as open

In [5]:
DATE_REFERENCE = (datetime.now() - relativedelta(months=2)).date()

Email pattern

In [6]:
EMAIL_REGEX = r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)"
email_regex = re.compile(EMAIL_REGEX)

Retrieve the email address if this exists inside into a given string, return empty string otherwise

In [7]:
def sanetize(pattern: re.Pattern, content: str) -> str:
    result = pattern.search(content)
    if not result:
        return ""
    return result[0]

Sanetize release links

In [8]:
LINKS_REGEX = r"(\[([\d+][\.\d+]*)\]\s)?(https?://[-|_|A-Z|a-z|.|/|0-9]+)"
link_regex = re.compile(LINKS_REGEX)

In [9]:
def sanetize_links(content: str, pattern: re.Pattern = link_regex):
    links: str = ""
    default: str = "No links provided"
    links_content = pattern.findall(content)
    for link in links_content:
        links += f"{link[0]} {link[2]} \n"
    if not links:
        return default
    return links

Groups identifiers retrieved from: [groups.py](https://github.com/cms-PdmV/ValDB2/blob/main/data/group.py)

In [10]:
uppercase = lambda str_list: [el.strip().upper() for el in str_list]
dict_token = lambda token_list: dict(zip(uppercase(token_list), token_list))

In [11]:
# Tokens for fast parsing
# Categories
CATEGORIES = ["Reconstruction", "HLT", "PAGs", "HIN", "GEN"]
CATEGORIES_TOKENS = dict_token(CATEGORIES)

# Subcategories
SUBCATEGORIES = ["Data", "FastSim", "FullSim", "Gen"]
SUBCATEGORIES_TOKENS = dict_token(SUBCATEGORIES)

# Groups
reconstruction_groups = ['Tracker', 'Ecal', 'HGcal', 'Hcal', 'CASTOR', 'DT', 'CSC', 'RPC', 'GEM',
    'MTD', 'PPS', 'L1', 'Tracking', 'Electron', 'Photon', 'Muon', 'Jet', 'MET', 'bTag', 'Tau',
    'PF'
]
hlt_groups = ['Tracking', 'Electron', 'Photon', 'Muon', 'Jet', 'MET', 'bTag', 'Tau', 'SMP',
    'Higgs', 'Top', 'NPS', 'Exotica', 'B2G', 'B', 'Fwd', 'HIN'
]
pags_groups = ['SMP', 'Higgs', 'Top', 'NPS', 'Exotica', 'B2G', 'B', 'Fwd', 'HIN']
hin_groups = ['Tracking', 'Electron', 'Photon', 'Muon', 'Jet']
gen_groups = ['GEN']

GROUPS = set(reconstruction_groups + hlt_groups + pags_groups + hin_groups + gen_groups)
GROUPS_TOKENS = dict_token(GROUPS)

Be carefull, for some reason it seems **HIN** category has two main identifiers: **HIN** & **IN** -> [Details](https://github.com/cms-PdmV/ValDB/blob/master/ajax_app.py#L489). This is the same case for **TK** group which means **TRACKER**

In [12]:
def parse_group_id(category: str, subcategory: str, status_kind: str = None) -> str:
        # Retrieve proper identifiers using 
        # the available tokens
        # Be aware of some special cases
        if category == "IN":
            category = "HIN"
        if status_kind == "TK":
            status_kind = "TRACKER"

        category = CATEGORIES_TOKENS[category.upper()]
        subcategory = SUBCATEGORIES_TOKENS[subcategory.upper()]
        if status_kind:
            status_kind = GROUPS_TOKENS[status_kind.upper()]
            return ".".join([category, subcategory, status_kind])
        else:
            return ".".join([category, subcategory])

Tokens to parse the release status. This entity will be known as report for the new application

In [13]:
REPORT_STATUS = {
    "NOT YET DONE": 2,
    "OK": 1,
    "OK TO BE SIGNED-OFF BY THE VALIDATORS": 1,
    "FAILURE": 3,
    "CHANGES EXPECTED": 4,
    "IN PROGRESS": 5,
    "KNOWN ISSUE": 6
}

Send HTTP requests to the server

In [14]:
def bulk_data(endpoint: str, data: dict, cookies: MozillaCookieJar) -> dict:
    """
    Bulk data to the server via HTTP requests
    """
    start_time = datetime.now()
    requests_errors = {}
    for key, value in data.items():
        response = requests.post(url=endpoint, json=value, cookies=cookies)
        if response.status_code != 200:
            requests_errors[key] = {
                "body": value,
                "response": response
            }
    
    end_time = datetime.now()
    print(f"Elapsed time: {end_time - start_time}")
    return requests_errors

Retrieve target and references releases from report content

In [15]:
CAMPAIGN_REGEX = r"[0-9]{1,3}_[0-9]{1,3}_[0-9]{1,3}.{0,40}"
TARGET_REGEX = re.compile(f"Target: {CAMPAIGN_REGEX}")
REFERENCE_REGEX = re.compile(f"Reference: {CAMPAIGN_REGEX}")

## Working variables

### Production Database

In [16]:
CREDENTIALS_PATH = "./data/credentials.json"

Import database credentials

In [17]:
with open(file=CREDENTIALS_PATH, encoding="utf-8", mode="r") as cf:
    credentials = json.load(cf)

Import cookies to authenticate to send requests to the server

In [18]:
cookies = MozillaCookieJar(credentials["new"]["cookie_location"])
cookies.load()

Connection URI for SQLAlchemy

In [19]:
prod_db_user = credentials['old']['database_username']
prod_db_password = credentials['old']['database_password']
prod_db = credentials['old']['database_url']
prob_db_service = credentials['old']['database_service']
prod_db_uri = f"oracle+cx_oracle://{prod_db_user}:{prod_db_password}@{prod_db}/?service_name={prob_db_service}"

Create a SQLAlchemy Session

In [20]:
engine = sqlalchemy.create_engine(prod_db_uri, arraysize=1000)

### User data

To retrieve user information from CERN LDAP, install first the following package available inside CC8 Linux: [PyPhoneBook](https://pyphonebook.docs.cern.ch/index.html)

In [21]:
def retrieve_user_information(email: str = None, login: str = None, fullname = None) -> dict:
    """
    Retrieve user email and fullname from CERN LDAP using the user email, login or its fullname 
    
    Parameters
    ------------
    email: str
        User email
    login: str
        CERN user login
    fullname: str
        User display name register in CERN LDAP
    
    Returns
    --------
    dict:
        Username, email and user fullname if user is still registered and active in CERN.
        None otherwise
    """
    SPACE= " "
    
    if not email and not login and not fullname:
        raise ValueError("Please set the user email or its login or its fullname")
        
    def execute_parse_output(command: str) -> dict:
        shell_execution = subprocess.run(
            command,
            capture_output=True,
            shell=True,
            encoding="utf-8"
        )
        try:
            execution_result = shell_execution.stdout
            user_data = json.loads(execution_result)
            if user_data:
                user_info_list = user_data[0]
                # For some reason, the PyPhonebook application delivers this as a list
                return {
                    "displayname": user_info_list["displayname"][0],
                    "user_email": user_info_list["email"][0],
                    "login": user_info_list["login"][0]
                } 
            return None
        except JSONDecodeError as e:
            print(f"Error decoding: {execution_result}")
            return None
            
    
    if email:
        command = f"pyphonebook --email {email} --json displayname --json email --json login"
        return execute_parse_output(command=command)
    
    if login:
        command = f"pyphonebook --login {login} --json displayname --json email --json login"
        return execute_parse_output(command=command)
    
    if fullname:
        # Here we have 2 possibilities
        # Sadly, it is not possible to query by fullname directly, So, consider the following cases:
        # 1. Fullname has only two members: Name and Lastname
        # 2. Fullname has three or more members: Two firstnames and one or more lastnames or one firstname and two or more lastnames
        fullname_parts = fullname.strip().split(SPACE)
        fullname_parts = [part.strip() for part in fullname_parts if part]
        
        # Case 1
        if len(fullname_parts) == 2:
            command = f"pyphonebook --firstname '{fullname_parts[0]}' --surname '{fullname_parts[1]}' --json displayname --json email --json login"
            return execute_parse_output(command=command)
        
        # Case 2: 
        if len(fullname_parts) >= 3:
            # Two firstnames and two or more lastnames
            command = f"pyphonebook --firstname '{fullname_parts[0]} {fullname_parts[1]}' --surname '{SPACE.join(fullname_parts[2:])}' --json displayname --json email --json login"
            user_data = execute_parse_output(command=command)
            if user_data:
                return user_data
            else:
                # One firstname and two or more lastnames
                one_firstname_more_two_lastnames = f"pyphonebook --firstname '{fullname_parts[0]}' --surname '{SPACE.join(fullname_parts[1:])}' --json displayname --json email --json login"
                user_data = execute_parse_output(command=one_firstname_more_two_lastnames)
                if user_data:
                    return user_data
                
        # Something unexpected happened
        print(f"Please be aware of case: {fullname}")
        return None

## Retrieve data using Pandas and SQL queries

### Validators

Retrieve all user data related to validators

In [22]:
user_get_query = """
SELECT U.USER_NAME, U.EMAIL, U.ADMIN, U.VALIDATOR, UR.CATEGORY, UR.SUBCATEGORY,	UR.STATUS_KIND
FROM CMS_PDMV_VAL.USERS U, CMS_PDMV_VAL.USER_RIGHTS UR
WHERE U.USER_NAME = UR.USER_NAME AND U.ADMIN = 0 AND U.VALIDATOR = 1
"""

Execute the query using Pandas

In [23]:
user_data: pandas.DataFrame = pandas.read_sql(user_get_query, engine)

Remove all rows where the username has more than 8 characters.

In [24]:
user_data = user_data[(user_data["user_name"].str.len() <= 8) & (user_data["user_name"].str.len() > 0)]

Sanetize email information

In [25]:
user_data["email"] = user_data["email"].apply(lambda email: sanetize(pattern=email_regex, content=email))

Query CERN Phonebook and append user data

In [26]:
validators_start_time = datetime.now()
validators_user_name = [
    retrieve_user_information(login=validator_username)
    for validator_username
    in list(user_data["user_name"].unique())
]
validators_end_time = datetime.now()

In [27]:
print(f"Elapsed time: {validators_end_time - validators_start_time}")

Elapsed time: 0:00:22.598646


Clean the list from **None** values

In [28]:
validators_user_name = [el for el in validators_user_name if el]

Parse it as a DataFrame to merge this data with the available

In [29]:
validators_full_data: pandas.DataFrame = pandas.DataFrame(validators_user_name)

In [30]:
validators_data = user_data.merge(
    right=validators_full_data,
    how="inner",
    left_on="user_name",
    right_on="login",
    copy=True
)

Below, there is the schema the application is expecting to register a user

In [31]:
%%capture
{
    "role": 3,
    "email": "example@example.com",
    "fullname": "John Doe",
    "groups": []
}

Group all user categories, subcategories and status kind per user

In [32]:
def parse_validators_schema(user_data: pandas.DataFrame) -> dict:
    validators = {}
    schema = {
        "role": 2,
        "email": "",
        "fullname": None,
        "groups": []
    }
    user_data_json = user_data.to_dict(orient="records")
    for row in user_data_json:        
        current_user_name = row["user_name"]
        current_user = validators.get(current_user_name)
        if not current_user:
            current_user = deepcopy(schema)
            ldap_user_email = str(row["user_email"]).lower()
            old_valdb_user_email = str(row["email"]).lower()
            if ldap_user_email:
                current_user["email"] = ldap_user_email
                current_user["fullname"] = row["displayname"]
            else:
                current_user["email"] = old_valdb_user_email
        
        # Build group identifier
        category = row["category"]
        subcategory = row["subcategory"]
        status_kind = row["status_kind"]
        
        # Build Group Identifier
        group_id = parse_group_id(
            category=category,
            subcategory=subcategory,
            status_kind=status_kind
        )
        current_user["groups"].append(group_id)
        
        # Persist for next iteration
        validators[current_user_name] = current_user
        
    return validators

In [33]:
user_data_parsed = parse_validators_schema(user_data=validators_data)

Insert all validator users

In [34]:
valdb_new_url = credentials["new"]["host_url"]
valdb_users = f"{valdb_new_url}/api/users/"
valdb_validator_mig_errors = bulk_data(
    endpoint=valdb_users,
    data=user_data_parsed,
    cookies=cookies
)

Elapsed time: 0:01:08.813626


Display errors if they happened

In [35]:
if not valdb_validator_mig_errors:
    print("Validator user migration successfully")
else:
    for user, report in valdb_validator_mig_errors.items():
        print(f"Issues migrating user: {user} information")
        print("Request body sent")
        print(report)

Validator user migration successfully


### Administrators

In [36]:
admins_get_query = """
SELECT U.USER_NAME, U.EMAIL, U.ADMIN, U.VALIDATOR
FROM CMS_PDMV_VAL.USERS U
WHERE U.ADMIN = 1
"""

In [37]:
def parse_administrator_schema(user_data: pandas.DataFrame) -> dict:
    admins = {}
    schema = {
        "role": 1,
        "email": "",
        "fullname": None,
        "groups": []
    }
    user_data_json = user_data.to_dict(orient="records")
    for row in user_data_json:        
        current_user_name = row["user_name"]
        current_user = admins.get(current_user_name)
        if not current_user:
            current_user = deepcopy(schema)
            current_user["email"] = str(row["email"]).lower()
            current_user["fullname"] = row["fullname"]
        
        # Persist for next iteration
        admins[current_user_name] = current_user
        
    return admins

In [38]:
admins_data: pandas.DataFrame = pandas.read_sql(admins_get_query, engine)

In [39]:
admins_sanitized_data: pandas.DataFrame = pandas.read_csv(filepath_or_buffer=credentials["new"]["admins_list_path"])

The sanitized data for administrators has the following attributes

1. user_name: CERN 
2. email: Primary alias email registered at CERN
3. fullname: User fullname registered at CERN

Fetch login username for the current administrator registered inside the production environment 

In [40]:
admins_data_logins = list(admins_data["user_name"])

Retrieve the subset of those who remain active

In [41]:
admins_sanitized_data = admins_sanitized_data[admins_sanitized_data["user_name"].isin(admins_data_logins)]

Parse the data

In [42]:
admin_data_parsed = parse_administrator_schema(user_data=admins_sanitized_data)

Insert all admins

In [43]:
valdb_new_url = credentials["new"]["host_url"]
valdb_users = f"{valdb_new_url}/api/users/"
valdb_admins_mig_errors = bulk_data(
    endpoint=valdb_users,
    data=admin_data_parsed,
    cookies=cookies
)

Elapsed time: 0:00:09.332622


Display errors if they happened

In [44]:
if not valdb_admins_mig_errors:
    print("Administrator user migration successfully")
else:
    for user, report in valdb_admins_mig_errors.items():
        print(f"Issues migrating user: {user} information")
        print("Request body sent")
        print(report)

Administrator user migration successfully


### Releases

The data related to the release is splitted between the metadata and its details (status)

In [45]:
releases_query = """
SELECT *
FROM CMS_PDMV_VAL.RELEASES R, CMS_PDMV_VAL.STATUS S
WHERE R.ID = S.ID
"""

In [46]:
releases_data: pandas.DataFrame = pandas.read_sql(releases_query, engine)

Delete rows with unknown status kind

In [47]:
releases_data["status_kind"] = releases_data["status_kind"].str.upper()
releases_data = releases_data[releases_data["status_kind"].isin(list(GROUPS_TOKENS.keys()) + ["SUMMARY"])]

Make sure that the names have no spaces in between or borders

In [48]:
releases_data["user_name"] = releases_data["user_name"].apply(lambda name: clean_spaces(name))

Create all author users that exist in all reports. We may find usernames and emails related to users who are no longer active at CERN.
They will be omitted, their name will be added as a comment within the report content.

In [49]:
%%capture
start_time_releases_user_name = datetime.now()
releases_user_name = [
    retrieve_user_information(fullname=report_username)
    for report_username
    in list(releases_data["user_name"].unique())
]
end_time_releases_user_name = datetime.now()

In [50]:
print(f"Elapsed time: {end_time_releases_user_name - start_time_releases_user_name}")

Elapsed time: 0:01:26.240251


There are **325** registered users in the ValDB production environment. Only **216** of them are currently active at CERN.

In [51]:
releases_user_name = [r for r in releases_user_name if r]

Next step: Register users who are currently active

In [52]:
valdb_new_url = credentials["new"]["host_url"]
valdb_users = f"{valdb_new_url}/api/users/"
valdb_report_users_mig_errors = {}
for report_user_data in releases_user_name:    
    request_user_data = {
        "email": str(report_user_data["user_email"]).lower(),
        "fullname": report_user_data["displayname"],
        "groups": [],
        "role": 3
    }
    response = requests.post(url=valdb_users, json=request_user_data, cookies=cookies)
    if response.status_code != 200:
        valdb_report_users_mig_errors[username] = {
            "body": info,
            "response": response
        }

In [53]:
if not valdb_report_users_mig_errors:
    print("Users who wrote releases migrated successfully")
else:
    for user, report in valdb_admins_mig_errors.items():
        print(f"Issues migrating user: {user} information")
        print("Request body sent")
        print(report)

Users who wrote releases migrated successfully


In [54]:
releases_user_data: pandas.DataFrame = pandas.DataFrame(releases_user_name)

In [55]:
releases_with_user_data = releases_data.merge(
    right=releases_user_data,
    how="left",
    left_on="user_name",
    right_on="displayname",
    copy=True
)

**Be aware of NaN values**

In [56]:
releases_with_user_data = releases_with_user_data.fillna("")

Remove useless columns

In [57]:
releases_with_user_data = releases_with_user_data.drop(labels="id", axis=1)

Append the author email to the content in case the user is no longer active inside CERN

In [63]:
def parse_releases_schema(releases_data: pandas.DataFrame) -> dict:
    campaigns = {}
    campaign_schema = {
        "name": None,
        "description": "",
        "deadline": None,
        "target_release": None,
        "reference_release": None,
        "relmon": None,
        "subcategories": set(),
        "reports": [],
        "is_open": False        
    }
    report_schema = {
        "authors": [],
        "group": None,
        "campaign_name": None,
        "status": None,
        "content": ""
    }
    releases_data_json = releases_data.to_dict(orient="records")
    for row in releases_data_json:
            current_release_name = str(row["release_name"])
            current_campaign = campaigns.get(current_release_name)
            release_date = row["date"].to_pydatetime().date()
            if not current_campaign:
                current_campaign = deepcopy(campaign_schema)
                current_campaign["name"] = current_release_name
                current_campaign["deadline"] = release_date
                current_campaign["relmon"] = row["relmon_url"]

            # Build the report object
            current_report = deepcopy(report_schema)
            
            # If author email was successfully retrieved from CERN LDAP
            # append it inside the report object. Web server will query for it
            # and append the user ID.
            #
            # Else, append the author name into the content section
            author_email = row["user_email"]
            author_name = row["user_name"]
            
            if author_email:                
                current_report["authors"] += [str(author_email).lower()]

            # Build group identifier
            category = row["category"]
            subcategory = row["subcategory"]
            status_kind = row["status_kind"]
            
            # If STATUS_KIND == SUMMARY, this release has the description for the campaign
            # Do not persist it as a report            
            comments = row["comments"] if row["comments"] else "No comments provided"
            links = row["links"] if row["links"] else ""
            links = sanetize_links(content=links)
            author_not_found_inside_ldap = f"Author: {author_name}" if not author_email else ""
            
            if status_kind == "SUMMARY":
                current_campaign["description"] = f"""Comments: {comments} \n Links: {links}"""
                target_release = str(sanetize(pattern=TARGET_REGEX, content=comments))
                reference_release = str(sanetize(pattern=REFERENCE_REGEX, content=comments))
                
                if target_release:
                    target_release = target_release.split(SPACE)[1]
                
                if reference_release:
                    reference_release = reference_release.split(SPACE)[1] if target_release else ""
                
                current_campaign["target_release"] = target_release
                current_campaign["reference_release"] = reference_release
            else:
                current_report["content"] = f"""Comments: {comments} \n Links: {links} \n {author_not_found_inside_ldap}"""

                # Build Group Identifier
                group_id = parse_group_id(
                    category=category,
                    subcategory=subcategory,
                    status_kind=status_kind
                )

                current_report["group"] = group_id
                current_report["campaign_name"] = current_release_name
                current_report["status"] = REPORT_STATUS[row["validation_status"].strip()]

                # Append the report and its group
                current_campaign["reports"] += [current_report]
                
            # Fill other campaign attributes
            subcategory_id = parse_group_id(
                category=category,
                subcategory=subcategory,
            )
            current_campaign["subcategories"] = current_campaign["subcategories"].union(set([subcategory_id]))
            
            # Final checks:
            # 1. Always set the deadline with the latest available date
            # 2. If the deadline date is after some reference date, set the campaign as open
            if current_campaign["deadline"] < release_date:
                current_campaign["deadline"] = release_date

            if current_campaign["deadline"] > DATE_REFERENCE:
                current_campaign["is_open"] = True

            # Persist for next iteration
            campaigns[current_release_name] = current_campaign
        
    return campaigns

In [64]:
releases_data_parsed = parse_releases_schema(releases_data=releases_with_user_data)
releases_data_parsed = json.loads(json.dumps(releases_data_parsed, default=parse))

Migrate all the releases

In [65]:
release_migration_start_time = datetime.now()

valdb_new_url = credentials["new"]["host_url"]
valdb_migration_campaign = f"{valdb_new_url}/api/campaigns/migrate/"
valdb_campaign_mig_errors = {}
execute_migration = True
if execute_migration:
    valdb_campaign_mig_errors = bulk_data(
        endpoint=valdb_migration_campaign,
        data=releases_data_parsed,
        cookies=cookies
    )

if not valdb_campaign_mig_errors:
    print("Releases migrated successfully")
else:
    storage_path = "./data/migration-issues.json"
    with open(storage_path, "w", encoding="utf-8") as fp:
        json.dump(valdb_campaign_mig_errors, fp=fp, indent=4, default=parse)
        
release_migration_end_time = datetime.now()

Elapsed time: 0:11:29.493949


In [66]:
print(f"Elapsed time: {release_migration_end_time - release_migration_start_time}")

Elapsed time: 0:11:29.527442
