In [0]:
%pip install -r requirements.txt

In [0]:
import pyspark.sql.functions as f
import sys
import json
import entry_point
import importlib
import dbks_dbms.mgr.dbks_dms_mgr
import dbks_dbms.stg_tbl_mgr.mysql_tbl_mgr
import dbks_dbms.stg_tbl_mgr.redshift_tbl_mgr
import dbks_dbms.mysql_rw.mysql_writer
from dbks_dbms.stg_tbl_mgr.mysql_tbl_mgr import MysqlTblMgr
from dbks_dbms.mysql_rw.mysql_writer import MysqlWriter
from datetime import datetime

In [0]:

src_table_name="dw_dim_state"
src_database="conformedpiiedw"
tgt_db_typ="redshift"


In [0]:

with open('database_path_mapping.json', 'r') as f:
    database_path_mapping = json.load(f)

src_database_path=database_path_mapping.get(src_database)

In [0]:
def get_current_notebook_path():
    notebook_path = dbutils.entry_point.getDbutils().notebook().getContext().notebookPath().get()
    return notebook_path

def get_environment_from_notebook_path():
    #note_book_path = get_current_notebook_path()
    #main_logger.info(f"Notebook path :{note_book_path}")
    if  spark.conf.get("spark.databricks.clusterUsageTags.clusterOwnerOrgId")=='4022166418081681':
        return 'PT'
    elif spark.conf.get("spark.databricks.clusterUsageTags.clusterOwnerOrgId")=='add production id':
        return 'prod'

def get_user_mysql_cred(mysql_env,secret_mgr_scope='ds-edw-prod'):
    if mysql_env.lower() in ['pt','prod']:
        mysql_ip=dbutils.secrets.get(scope='ds-edw-prod', key='HOST_NAME')#'edwawsdatasci01.aws2.teladoc.com' #dbutils.secrets.get(scope=secret_mgr_scope, key='HOST_NAME')
        mysql_port=dbutils.secrets.get(scope='ds-edw-prod', key='HOST_PORT')#3306 #dbutils.secrets.get(scope=secret_mgr_scope, key='HOST_PORT')
        musql_username=dbutils.secrets.get(scope='ds-edw-prod', key='USERNAME')#'databricks' #dbutils.secrets.get(scope=secret_mgr_scope, key='USERNAME')
        mysql_password=dbutils.secrets.get(scope='ds-edw-prod', key='PASSWORD') #dbutils.secrets.get(scope=secret_mgr_scope, key='PASSWORD')
        return (mysql_ip,mysql_port,musql_username,mysql_password)
    else:
        raise Exception(f"Invalid environment for mysql user credentials {mysql_env}")

def get_redshift_user_cred(redshift_env,secret_mgr_scope):
    if redshift_env.lower() in ['prod']:
        host='10.10.75.66'
        #dbutils.secrets.get(scope=secret_mgr_scope, key='HOST_IP')
        port= 5439
        #dbutils.secrets.get(scope=secret_mgr_scope, key='HOST_PORT')
        username='databricks_pii'
        #dbutils.secrets.get(scope=secret_mgr_scope, key='USERNAME')
        password='XcMirz9XrKKt-TTDqhnTGVgTU'
        #dbutils.secrets.get(scope=secret_mgr_scope, key='PASSWORD')
        return (host,port,username,password)
    else:
        raise Exception(f"Invalid environment for redshift user credentials {redshift_env}")


In [0]:


importlib.reload(entry_point)
environment = get_environment_from_notebook_path()




In [0]:
 
pathlist = ['/Workspace/Users/madan.hajare@teladochealth.com/dbks_dbms','/Workspace/Users/madan.hajare@teladochealth.com/dbks_dbms/mgr','/Workspace/Users/madan.hajare@teladochealth.com/dbks_dbms/mgr/dbks_dms_mgr','/Workspace/Users/madan.hajare@teladochealth.com/dbks_dbms/stg_tbl_mgr/mysql_tbl_mgr'
            ,'dbks_dbms/mysql_rw/mysql_writer']

for path in pathlist:
    if path not in sys.path:
        print("Adding path: " + path)
        sys.path.append(path)


importlib.reload(dbks_dbms.mgr.dbks_dms_mgr)
importlib.reload(dbks_dbms.stg_tbl_mgr.mysql_tbl_mgr)
importlib.reload(dbks_dbms.mysql_rw.mysql_writer)

In [0]:
try:

    assert src_database_path, f"source database path for {src_database} is not set in database_path_mapping.json"

    df=spark.table("udf_internal.udf_to_rdbms_sync").where(f"src_db='{src_database_path}'").where(f"src_tbl='{src_table_name}'").where(f"tgt_db_typ='{tgt_db_typ}'").select("udf_to_rdbms_sync_key","src_db","src_tbl","tgt_db_typ","tgt_db","tgt_tbl","merge_keys")


    assert df.count() > 0, f"No records found in udf_internal.udf_to_rdbms_sync for src_db={src_database} and src_tbl={src_table_name} and tgt_db_typ={tgt_db_typ}"

    # check last successful log entry and get watermark value

    mysql_ip,mysql_port,mysql_username,mysql_password = get_user_mysql_cred(mysql_env=environment,secret_mgr_scope='ds-edw-prod')
    redshift_ip,redshift_port,redshift_username,redshift_password = get_redshift_user_cred(redshift_env='prod',secret_mgr_scope='ds-rs-pii-prod')

    for row in df.collect():
            conn= (mysql_ip,mysql_port,mysql_username,mysql_password,redshift_ip,redshift_port,redshift_username,redshift_password)    
            master_obj= entry_point.start_dbks_dms(row,conn)
except Exception as e:
    data=(None,None,'fail',None,datetime.now(),datetime.now(),'UDF_SYNC_FRMWRK','UDF_SYNC_FRMWRK',e.args[0])
    schema = spark.table("udf_internal.udf_to_rdbms_sync_log").schema
    spark.createDataFrame([data],schema).write.mode("append").saveAsTable("udf_internal.udf_to_rdbms_sync_log")
  
                                    




    

In [0]:
%sql
select * 
from  udf_internal.udf_to_rdbms_sync
;

In [0]:
%sql
select * 
from udf_internal.udf_to_rdbms_sync_log
order by udf_created_dt desc 
;

In [0]:
# Redshift connection details
redshift_jdbc_url = f"jdbc:redshift://{dbutils.secrets.get(scope='ds-rs-pii-prod', key='HOST_IP')}:5439/prod"
redshift_properties = {
    "user": dbutils.secrets.get(scope='ds-rs-pii-prod', key='USERNAME'),
    "password": dbutils.secrets.get(scope='ds-rs-pii-prod', key='PASSWORD'),
    "driver": "com.databricks.spark.redshift"
}

# SQL query to get the current date and timestamp in Redshift
test_query = "(SELECT CURRENT_DATE, CURRENT_TIMESTAMP) as test_query"

# Execute the query using JDBC
result_df = spark.read \
    .jdbc(redshift_jdbc_url, test_query, properties=redshift_properties)

# Show the result
result_df.show()
