# Calling RES with Python in SPARK
## Pre-Requisite
   * PySpark

In [None]:
!pip install pyspark

## Initializing Python environment with ODM Jars files and ODM Model archive

   * Create a Spark Session
   * Initialize the Python environment

In [None]:
from io import StringIO

import requests
import json
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import os

import os

cwd = os.getcwd()
print(cwd)
# Download Material for Rule Exection
!curl -o {cwd}/miniloan-xom.jar https://raw.githubusercontent.com/ODMDev/decisions-on-spark/pythonintegration/data/miniloan/miniloan-xom.jar
!curl -o {cwd}/miniloan-ruleapp.jar https://raw.githubusercontent.com/ODMDev/decisions-on-spark/pythonintegration/data/miniloan/miniloan-ruleapp.jar
# Download ODM Library
!curl -o {cwd}/j2ee_connector-1_5-fr.jar http://159.122.179.123:31329/download/lib/ODM8920/j2ee_connector-1_5-fr.jar
!curl -o {cwd}/jrules-engine.jar http://159.122.179.123:31329/download/lib/ODM8920/jrules-engine.jar
!curl -o {cwd}/jrules-res-execution.jar http://159.122.179.123:31329/download/lib/ODM8920/jrules-res-execution-memory.jar

os.environ['PYSPARK_SUBMIT_ARGS'] = "--jars local:"+cwd+"/miniloan-ruleapp.jar,local:"+cwd+"/miniloan-xom.jar,local:"+cwd+"/jrules-engine.jar,local:"+cwd+"/j2ee_connector-1_5-fr.jar,local:"+cwd+"/jrules-res-execution.jar pyspark-shell"


import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

# Create a Spark Session
sc = SparkSession.builder.getOrCreate()
sc

## Load Dataset

In [None]:

# Create a SParkSQL Context to load the data in a dataframe
sql = SQLContext(sc.sparkContext)
new_decisions_pd = pd.read_csv("https://raw.githubusercontent.com/ODMDev/decisions-on-spark/master/data/miniloan/miniloan-decisions-defaultly-1K.csv")
request_df = sql.createDataFrame(new_decisions_pd)
request_df.printSchema()
request_df.show(10)

In [None]:
from  pyspark.sql import Row

def CreateODMSession(sc):
    if not hasattr(CreateODMSession, "fac"):
        factoryConfig = sc._jvm.ilog.rules.res.session.IlrJ2SESessionFactory.createDefaultConfig()
        xuConfig = factoryConfig.getXUConfig();
        xuConfig.setLogAutoFlushEnabled(True);
        xuConfig.getPersistenceConfig().setPersistenceType(sc._jvm.ilog.rules.res.session.config.IlrPersistenceType.MEMORY);
        xuConfig.getManagedXOMPersistenceConfig().setPersistenceType(sc._jvm.ilog.rules.res.session.config.IlrPersistenceType.MEMORY);
        CreateODMSession.fac=sc._jvm.ilog.rules.res.session.IlrJ2SESessionFactory(factoryConfig)    
    return CreateODMSession.fac

def execute (row):
    sc = SparkSession.builder.getOrCreate()
    factory=CreateODMSession(sc)

    sessionRequest = factory.createRequest()
    sessionRequest.setRulesetPath(sc._jvm.ilog.rules.res.model.IlrPath.parsePath("/miniloanruleapp/miniloanrules"))
    # Ensure latest version of the ruleset is taken into account
    sessionRequest.setForceUptodate(True)
    # Set the input parameters for the execution of the rules
    inputParameters = sc._jvm.java.util.HashMap()
    borrower =  sc._jvm.miniloan.Borrower(row.name, row.creditScore,row.income)
    loan =  sc._jvm.miniloan.Loan()
    loan.setAmount(row.loanAmount)
    loan.setDuration(row.monthDuration)
    loan.setYearlyInterestRate(row.rate)
    
    # Set parameters
    inputParameters["loan"]=loan
    inputParameters["borrower"]=borrower

    sessionRequest.setInputParameters(inputParameters)
    session = factory.createStatelessSession()
    # Perfrom ODM Execution 
    response = session.execute(sessionRequest)
    col= response.getOutputParameters()
#    for key in col:
#        print (key, "corresponds to", col[key])
    loanResult= response.getOutputParameters().get("loan")
    return Row(isApproved=loanResult.isApproved(),firedRulesCount=col['ilog.rules.firedRulesCount'])
#execute("dd")

dfResult = request_df.rdd.map(execute).toDF()
count= dfResult.count()
print("Execution fininsh")
#rddResult
# Count the nb of Loan approved

# We can query execution Results


In [None]:
dfResult.createOrReplaceTempView("loan")
sc.sql("SELECT isApproved FROM loan").show()
