<a href="https://colab.research.google.com/github/gandesirijugalkishore/Learning/blob/main/READ_DATA_FROM_ORACLE_WRITE_TO_ELASTIC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DateType, StringType
from pyspark.sql.functions import col, unix_timestamp
from pyspark.sql import functions as sf
import time
import yaml
import pickle
from datetime import datetime


def createSparkSession():
    spark_session = SparkSession.builder.appName(appname).getOrCreate()
    return spark_session

    
def readDataFromOracleStats(spark_session,dbtable,oracle_config):
    df = spark_session.read.format("jdbc")\
                        .option("url", oracle_config['url'])\
                        .option("driver", oracle_config['driver'])\
                        .option("dbtable", dbtable)\
                        .option("user", oracle_config['user'])\
                        .option("password", oracle_config['password'])\
                        .option("fetchsize", "1000")\
                        .load()
    return df

def readDataFromOracleDB(spark_session,dbtable,oracle_config):   
    df = spark_session.read.format("jdbc")\
                        .option("url", oracle_config['url'])\
                        .option("driver", oracle_config['driver'])\
                        .option("dbtable", dbtable)\
                        .option("lowerBound", oracle_config['lowerBound'] )\
                        .option("upperBound", oracle_config['upperBound'])\
                        .option("partitionColumn", "PARTN_NUM")\
                        .option("numPartitions", oracle_config['numPartitions'])\
                        .option("user", oracle_config['user'])\
                        .option("password", oracle_config['password'])\
                        .option("fetchsize", oracle_config['fetchsize'])\
                        .load()
    return df
                                   
def dataCleaning(df):
    if df.count() != 0:
        for column, dtype in df.dtypes:
            if ("date" in str(dtype)) or ("time" in str(dtype)):
                df = df.withColumn(column,sf.date_format(df[column],"yyyy-MM-dd'T'HH:mm:ss.SSS"))
                df = df.withColumn(column,df[column].cast("string"))
            if ("int" in str(dtype)) or ("double" in str(dtype)) or ("float" in str(dtype)):
                df = df.fillna(0,column)
            if "string" in str(dtype):
                df = df.fillna("nan",column)
            if "boolean" in str(dtype):
                df = df.fillna("nan",column)
            if "decimal" in str(dtype):
                df = df.withColumn(column,df[column].cast("float"))
            if "array" in str(dtype):
                df = df.withColumn(column,df[column].cast("string"))
    return df

def writeToElasticSearch(df,es_config,snap_eff_dt):
    df = dataCleaning(df)
    es_host = f"http://{es_config['host']}:{es_config['port']}"
    es_user = es_config['username']
    es_pass = es_config['password']
    index_name = f"{es_config['index_name'].lower()}_{snap_eff_dt.replace('-','_')}"
    write_log(f'Index name : {index_name}')
    index_name_r = index_name+'/'+'_doc'
    df.write.format("org.elasticsearch.spark.sql").option("es.nodes", es_host)\
                                                    .option("es.net.http.auth.user",es_user) \
                                                    .option("es.net.http.auth.pass",es_pass) \
                                                    .mode("append").save(index_name_r)
    response_data = {'response_Code':'200','response':'Inserted Sucessfully'}
    write_log(response_data)


def write_log(log_data):
    with open('logs.txt','a') as f:
        ts = str(datetime.now())
        print(f"{ts}: Job: {appname} msg: {log_data}", file=f)

    
def dataMigration():
    #Create spark session:
    spark_session = createSparkSession()
    
    # Configuration
    config_file = 'config.yml'
    with open(config_file) as f:
        config = yaml.safe_load(f)
    oracle_config = config['.........']
    es_config = config['es_config']
    
    #Read data from Oracle DB
    db = oracle_config['db']
    table = oracle_config['table']
    service_name = oracle_config['service_name']
    
    ############ Get snap_ef_dt
    dbtable = f'(select max(TABLE_NAME) as TABLE_NAME from DATABASE.{table})'
    df = readDataFromOracleStats(spark_session,dbtable,oracle_config)
    snap_eff_dt = str(df.select('......').collect()[0]['....'].date())
    write_log(f'........ {snap_eff_dt}')
    
    ############ Get max partn_num
    dbtable = f'(select max(TABLE_NAME) as TABLE_NAME from DATABASE.{table})'
    df = readDataFromOracleStats(spark_session,dbtable,oracle_config)
    max_partn_num = int(df.select('...........').collect()[0]['..........'])
    write_log(f'max_partn_num {max_partn_num}')

    ############ Get min partn_num
    dbtable = f'(select max(TABLE_NAME) as TABLE_NAME from DATABASE.{table})'
    df = readDataFromOracleStats(spark_session,dbtable,oracle_config)
    min_partn_num = int(df.select('min_partn_num').collect()[0]['min_partn_num'])
    write_log(f'min_partn_num {min_partn_num}')
    
    oracle_config['lowerBound'] = min_partn_num
    oracle_config['upperBound'] = max_partn_num
    oracle_config['numPartitions'] = num_partition
    oracle_config['fetchsize'] = 50000
    
    if filter_data:
        dbtable = f'(select * from {db}.{table} WHERE ROWNUM <= 100)'
    else:
        dbtable = f'{db}.{table}'

    write_log("Reading Data..")
    df = readDataFromOracleDB(spark_session,dbtable,oracle_config)
    write_log(f"Number of partition {df.rdd.getNumPartitions()}")    
    df = df.withColumn('SNAP_EFF_DT', sf.lit(snap_eff_dt))
    
    #Write data to Elasticsearch
    write_log(f"Start writitng to ES")
    writeToElasticSearch(df,es_config,snap_eff_dt)
    write_log(f"Data Written to Elasticsearch with count: {df.count()}")
    
    #Stop the spark session
    spark_session.stop()


if __name__ == '__main__':
    appname = "Oracle to Elasticsearch Per Day Partitioned by ......."
    num_executers = 4
    num_cores = 4
    base_partitions = num_executers * num_cores 
    num_partition = 200 # 4 * base_partitions
    filter_data = False
    dataMigration()