In [1]:
cd ..

/Users/jisooryu/Projects/lease-version-reliability


In [2]:
import pandas as pd 
import numpy as np
import typing 
import structlog
from lease_version_reliability.models.train import train_model
from lease_version_reliability.models.inference import run_inference

from lease_version_reliability.data.database_io import get_logo_df, get_all_data, get_reliable_data

from lease_version_reliability.data.database_io import attribute_to_label_dict

from lease_version_reliability.config.settings import settings 
from lease_version_reliability.data.database import (
    CompstakServicesMySQL,
    get_snowflake_connection,
)
from lease_version_reliability.data.database import cs_mysql_instance as mysql
from lease_version_reliability.data.database_io import read_file

logger = structlog.get_logger()

In [4]:
# await run_inference(download=True)

### Importing Submitter_df

In [130]:
from lease_version_reliability.config.attributes import attributes

col = attributes.copy()

In [132]:
col_reliability = [s + '_reliability' for s in col]
col.insert(0,'submitter_person_id')
col.insert(len(col), 'general_reliability')
col_reliability.insert(0,'submitter_person_id')
col_reliability.insert(len(col), 'general_reliability')

In [133]:
temp = submitter_df[col_reliability]

In [116]:
temp = temp.set_axis(col, axis=1)

In [117]:
import datetime as dt 

temp['date_created'] = pd.Timestamp.now()
temp['date_created'] = temp['date_created'].dt.strftime('%Y-%m-%d %X')
temp.columns = map(lambda x: str(x).upper(), temp.columns)

In [1]:
from sqlalchemy import create_engine
from snowflake.connector.pandas_tools import pd_writer

conn = get_snowflake_connection()
engine = create_engine(f"snowflake://{settings.SNOWFLAKE_ACCOUNT}.{settings.SNOWFLAKE_REGION}.snowflakecomputing.com", creator=lambda: conn)

with engine.connect() as con:
    temp.to_sql('submitter', engine, schema = 'LEASE_VERSION_RELIABILITY', index=False, if_exists='append', chunksize=10000, method=pd_writer)

### Version_df

In [139]:
col = attributes.copy()
col_reliability = [s + '_prob' for s in col]
col.insert(0,'comp_data_id_version')
col_reliability.insert(0,'comp_data_id_version')

In [140]:
temp = version_df[col_reliability]

In [141]:
temp = temp.set_axis(col, axis=1)

In [142]:
temp['date_created'] = pd.Timestamp.now()
temp['date_created'] = temp['date_created'].dt.strftime('%Y-%m-%d %X')
temp.columns = map(lambda x: str(x).upper(), temp.columns)

In [3]:
# conn = get_snowflake_connection()
# engine = create_engine(f"snowflake://{settings.SNOWFLAKE_ACCOUNT}.{settings.SNOWFLAKE_REGION}.snowflakecomputing.com", creator=lambda: conn)

# with engine.connect() as con:
#     temp.to_sql('version', engine, schema = 'LEASE_VERSION_RELIABILITY', index=False, if_exists='append', chunksize=10000, method=pd_writer)

### OOM Error - Read ALL_DATA in Batches

In [16]:
async def get_version_max_id(db: CompstakServicesMySQL) -> typing.Any:
    """
    Retrun max id of comp_version table
    """

    query = read_file(settings.SQL_QUERY, "version_max_id.sql")

    return await db.fetch_val(query)

In [5]:
async def get_all_versions(
    db: CompstakServicesMySQL,
    min: int,
    max: int,
) -> pd.DataFrame:
    """
    Return version data from MySQL
    """
    query = read_file(settings.SQL_QUERY, "all_data.sql").format(min=min, max=max)
    data = [dict(item) for item in await db.fetch_all(query)]

    return pd.DataFrame(data)

In [6]:
async def temp_get_all_data(db:CompstakServicesMySQL) -> pd.DataFrame:
    id = await get_version_max_id(mysql)
    logger.info("Start processing lease data")
    all_df = pd.DataFrame()
    for i in range(0, id, settings.BATCH_CONFIG.BATCH_SIZE):
        logger.info(f"Processing {i + settings.BATCH_CONFIG.BATCH_SIZE}/{id}")
        data = await get_all_versions(mysql, i, i + settings.BATCH_CONFIG.BATCH_SIZE)
        all_df = pd.concat([all_df, data], ignore_index=True)
    
    all_df = await get_logo_df(all_df)

    return all_df

In [7]:
await mysql.connect()

temp_all_df = await temp_get_all_data(mysql)

await mysql.disconnect()

INFO:databases:Connected to database mysql://admin:********@localhost:3308/compstak


2023-03-01 10:30:33 [info     ] Start processing lease data
2023-03-01 10:30:33 [info     ] Processing 500000/4017690


  data = [dict(item) for item in await db.fetch_all(query)]


2023-03-01 10:31:57 [info     ] Processing 1000000/4017690
2023-03-01 10:32:45 [info     ] Processing 1500000/4017690
2023-03-01 10:33:44 [info     ] Processing 2000000/4017690
2023-03-01 10:35:15 [info     ] Processing 2500000/4017690
2023-03-01 10:36:42 [info     ] Processing 3000000/4017690
2023-03-01 10:38:19 [info     ] Processing 3500000/4017690
2023-03-01 10:40:23 [info     ] Processing 4000000/4017690
2023-03-01 10:42:08 [info     ] Processing 4500000/4017690


INFO:databases:Disconnected from database mysql://admin:********@localhost:3308/compstak


In [12]:
temp_all_df.shape

(2722225, 34)

In [10]:
await mysql.connect()
all_df = await get_all_data(mysql)
await mysql.disconnect()

INFO:databases:Disconnected from database mysql://admin:********@localhost:3308/compstak


### Reliable Data - Batch

In [3]:
async def get_reliable_data(
    db: CompstakServicesMySQL,
    min: int,
    max: int,
) -> pd.DataFrame:
    """
    Return reliable data (more than 3 submitted versions) from MySQL
    """
    query = read_file(settings.SQL_QUERY, "reliable_data.sql").format(min=min, max=max)
    data = [dict(item) for item in await db.fetch_all(query)]
    
    return pd.DataFrame(data)

In [4]:
async def temp_get_reliable_data() -> pd.DataFrame:
    id = await get_version_max_id(mysql)
    logger.info("Start processing lease data")
    df = pd.DataFrame()
    for i in range(0, id, settings.BATCH_CONFIG.BATCH_SIZE):
        logger.info(f"Processing {i + settings.BATCH_CONFIG.BATCH_SIZE}/{id}")
        data = await get_reliable_data(mysql, i, i + settings.BATCH_CONFIG.BATCH_SIZE)
        df = pd.concat([df, data], ignore_index=True)
    
    df = await get_logo_df(df)

    return df

In [3]:
from lease_version_reliability.data.database_io import get_reliable_data

await mysql.connect()
reliable_data = await get_reliable_data()
await mysql.disconnect()

INFO:databases:Connected to database mysql://admin:********@localhost:3308/compstak


2023-03-02 14:38:06 [info     ] Processed 10000/4017690
2023-03-02 14:38:28 [info     ] Processed 100000/4017690
2023-03-02 14:38:57 [info     ] Processed 200000/4017690
2023-03-02 14:39:21 [info     ] Processed 300000/4017690
2023-03-02 14:39:40 [info     ] Processed 400000/4017690
2023-03-02 14:39:59 [info     ] Processed 500000/4017690
2023-03-02 14:40:18 [info     ] Processed 600000/4017690
2023-03-02 14:40:36 [info     ] Processed 700000/4017690
2023-03-02 14:40:52 [info     ] Processed 800000/4017690
2023-03-02 14:41:07 [info     ] Processed 900000/4017690
2023-03-02 14:41:25 [info     ] Processed 1000000/4017690
2023-03-02 14:41:44 [info     ] Processed 1100000/4017690
2023-03-02 14:42:00 [info     ] Processed 1200000/4017690
2023-03-02 14:42:16 [info     ] Processed 1300000/4017690
2023-03-02 14:42:36 [info     ] Processed 1400000/4017690
2023-03-02 14:42:49 [info     ] Processed 1500000/4017690
2023-03-02 14:43:07 [info     ] Processed 1600000/4017690
2023-03-02 14:43:26 [info

INFO:databases:Disconnected from database mysql://admin:********@localhost:3308/compstak


In [20]:
await mysql.connect()
temp_reliable_data = await temp_get_reliable_data()
await mysql.disconnect()

2023-03-01 11:27:14 [info     ] Start processing lease data
2023-03-01 11:27:14 [info     ] Processing 500000/4017690
2023-03-01 11:28:26 [info     ] Processing 1000000/4017690
2023-03-01 11:29:03 [info     ] Processing 1500000/4017690
2023-03-01 11:29:45 [info     ] Processing 2000000/4017690
2023-03-01 11:30:49 [info     ] Processing 2500000/4017690
2023-03-01 11:31:46 [info     ] Processing 3000000/4017690
2023-03-01 11:32:48 [info     ] Processing 3500000/4017690
2023-03-01 11:33:39 [info     ] Processing 4000000/4017690
2023-03-01 11:34:25 [info     ] Processing 4500000/4017690


INFO:databases:Disconnected from database mysql://admin:********@localhost:3308/compstak


### Label attributes to vectorization 

In [3]:
# await mysql.connect()
# reliable_data = await get_reliable_data()
# await mysql.disconnect()

In [11]:
temp = reliable_data.copy()

In [20]:
import numpy as np
from datetime import timedelta

def label_date(data, att):

    idx_null = np.where((data[att + '_version'].isnull()) | (data[att + '_master'].isnull()))[0]
    idx_execution_date =  np.where((data[att + '_version'] <= data[att + '_master']+timedelta(days=90)) & (data[att + '_version'] >= data[att + '_master']-timedelta(days=90)))[0]

    data[att + '_label'] = 0 
    data.loc[idx_null, att + '_label'] = -1
    data.loc[idx_execution_date, att + '_label'] = 1

    return data 

In [21]:
from lease_version_reliability.data.database_io import attribute_to_label_dict, label_tenant_name, label_strict_equality, label_lease_term

def get_labels(data: pd.DataFrame, attributes: list[str]) -> pd.DataFrame:
    """
    Populate each attribute column based on label calculation rules
    """
    for att in attributes:
        logger.info(f"Calculating Labels: {att}")
        data[att + "_filled"] = np.where(
            (pd.notnull(data[att + "_version"])),
            1,
            0,
        )

        if ((att == 'execution_date') | (att == 'commencement_date') | (att == 'expiration_date')):
            data = label_date(data, att)

        else:
            data[att + "_label"] = data.apply(
            lambda x: attribute_to_label_dict[att](
                x[att + "_version"],
                x[att + "_master"],
            ),
            axis=1,
        )

    return data

In [24]:
from lease_version_reliability.data.database_io import label_tenant_name

def label_strict_equality(
    data:pd.DataFrame, 
    att: str
) -> float:
    """
    Replace attribute columns with indicator values
    Based on strict equality for masters and versions
    """
    idx_null = np.where((data[att + '_version'].isnull()) | (data[att + '_master'].isnull()))[0]
    idx_equality =  np.where((data[att + '_version'] == data[att + '_master']))[0]

    data[att + '_label'] = 0 
    data.loc[idx_null, att + '_label'] = -1
    data.loc[idx_equality, att + '_label'] = 1

    return data 


def label_transaction_size(
    data:pd.DataFrame,
    att:str

) -> float:
    """
    Replace transaction_size attribute column with indicator values
    Given size threshold for masters and versions
    """
    idx_null = np.where((data[att + '_version'].isnull()) | (data[att + '_master'].isnull()))[0]
    idx_transaction_size =  np.where((data[att + '_version'] >= 0.95 * data[att + '_master']) & (data[att + '_version'] <= 1.05 * data[att + '_master']))[0]

    data[att + '_label'] = 0 
    data.loc[idx_null, att + '_label'] = -1
    data.loc[idx_transaction_size, att + '_label'] = 1

    return data 

def label_lease_term(
    data:pd.DataFrame,
    att: str
) -> float:
    """
    Replace lease_term attribute column with indicator values
    Given term threshold for masters and versions
    """
    idx_null = np.where((data[att + '_version'].isnull()) | (data[att + '_master'].isnull()))[0]
    idx_equality =  np.where((data[att + '_version'] >= 0.92 * data[att + '_master']) & (data[att + '_version'] <= 1.08 * data[att + '_master']))[0]

    data[att + '_label'] = 0 
    data.loc[idx_null, att + '_label'] = -1
    data.loc[idx_equality, att + '_label'] = 1

    return data

attribute_to_label_dict = {
    "tenant_name": label_tenant_name,
    "space_type_id": label_strict_equality,
    "transaction_size": label_transaction_size,
    "starting_rent": label_strict_equality,
    "execution_date": label_date,
    "commencement_date": label_date,
    "lease_term": label_lease_term,
    "expiration_date": label_date,
    "work_value": label_strict_equality,
    "free_months": label_strict_equality,
    "transaction_type_id": label_strict_equality,
    "rent_bumps_percent_bumps": label_strict_equality,
    "rent_bumps_dollar_bumps": label_strict_equality,
    "lease_type_id": label_strict_equality,
}