# Cálculo de eligibilidade 

Descrição: Esse notebook faz o cálculo de eligibilidade para projeto NBA 

## Inicializar variáveis


In [1]:
%env IS_TESTING="Yes"
%env CREATE_DATAPROC_CLUSTER=None

env: IS_TESTING="Yes"
env: CREATE_DATAPROC_CLUSTER=None


In [2]:
PROJECT_ID = "<PROJECT_ID>" 
!gcloud config set project {PROJECT_ID}     

Updated property [core/project].


## Instalação

In [3]:
import os

if os.getenv("IS_TESTING"):
    """
    The testing suite does not currently support testing on Dataproc clusters,
    so the testing environment is setup to replicate Dataproc via the following steps.
    """
    JAVA_VER = "8u332-b09"
    JAVA_FOLDER = "/tmp/java"
    FILE_NAME = f"openlogic-openjdk-{JAVA_VER}-linux-x64"
    TAR_FILE = f"{JAVA_FOLDER}/{FILE_NAME}.tar.gz"
    DOWNLOAD_LINK = f"https://builds.openlogic.com/downloadJDK/openlogic-openjdk/{JAVA_VER}/openlogic-openjdk-{JAVA_VER}-linux-x64.tar.gz"
    PYSPARK_VER = "3.1.3"

    # Download Open JDK 8. Spark requires Java to execute.
    ! rm -rf $JAVA_FOLDER
    ! mkdir $JAVA_FOLDER
    ! wget -P $JAVA_FOLDER $DOWNLOAD_LINK
    os.environ["JAVA_HOME"] = f"{JAVA_FOLDER}/{FILE_NAME}"
    ! tar -zxf $TAR_FILE -C $JAVA_FOLDER
    ! echo $JAVA_HOME

    # Pin the Spark version to match that the Dataproc 2.0 cluster.
    ! pip install pyspark==$PYSPARK_VER -q

--2024-06-13 16:04:13--  https://builds.openlogic.com/downloadJDK/openlogic-openjdk/8u332-b09/openlogic-openjdk-8u332-b09-linux-x64.tar.gz
Resolving builds.openlogic.com (builds.openlogic.com)... 13.32.164.58, 13.32.164.69, 13.32.164.34, ...
Connecting to builds.openlogic.com (builds.openlogic.com)|13.32.164.58|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 105558622 (101M) [application/x-gzip]
Saving to: ‘/tmp/java/openlogic-openjdk-8u332-b09-linux-x64.tar.gz’


2024-06-13 16:04:14 (229 MB/s) - ‘/tmp/java/openlogic-openjdk-8u332-b09-linux-x64.tar.gz’ saved [105558622/105558622]

/tmp/java/openlogic-openjdk-8u332-b09-linux-x64


In [None]:
if os.getenv("CREATE_DATAPROC_CLUSTER"):
    !gcloud dataproc clusters create "data-eng-bv-cluster" \
        --region=us-central1 \
        --enable-component-gateway \
        --image-version=2.0 \
        --optional-components=JUPYTER

## Passos de Transformação

### 1. **Spark Session:**

In [4]:
from pyspark.sql import SparkSession

In [5]:
# Initialize the "SparkSession" with the following config.
VER = "0.26.0"
FILE_NAME = f"spark-bigquery-with-dependencies_2.12-{VER}.jar"

if os.getenv("IS_TESTING"):
    connector = f"https://github.com/GoogleCloudDataproc/spark-bigquery-connector/releases/download/{VER}/{FILE_NAME}"
else:
    connector = f"gs://spark-lib/bigquery/{FILE_NAME}"

spark = (
    SparkSession.builder.appName("spark-bigquery-data-eng-sessions-demo")
    .config("spark.jars", connector)
    .config("spark.sql.debug.maxToStringFields", "500")
    .getOrCreate()
)

24/06/13 16:04:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### 2. **Carga de dados:**

In [6]:
account_data = [("ACC001", "BANK01", "P001"),
                ("ACC002", "BANK01", "P002"),
                ("ACC003", "BANK02", "P003")]
account_schema = ["account_id", "bank_id", "party_id"]
account_df = spark.createDataFrame(data=account_data, schema=account_schema)

In [7]:
party_data = [("P001", "hashed_name1", 35, "other_data1"),
              ("P002", "hashed_name2", 75, "other_data2"),
              ("P003", "hashed_name3", 22, "other_data3")]
party_schema = ["party_id", "hashed_name", "age", "other_customer_data"]
party_df = spark.createDataFrame(data=party_data, schema=party_schema)

In [8]:
product_data = [("PROD01", "Car Loan"),
                ("PROD02", "Solar Equipment Loan"),
                ("PROD03", "Credit Line")]
product_schema = ["product_id", "product_name"]
product_df = spark.createDataFrame(data=product_data, schema=product_schema)

In [9]:
product_vs_party_data = [
('P001', 'PROD01', 'Car Loan', 30000, 720, '2023-12-25'),
('P001', 'PROD02', 'Solar Equipment Loan', 15000, 720, '2024-01-15'),
('P002', 'PROD01', 'Car Loan', 25000, 630, '2024-02-14'),
('P003', 'PROD03', 'Credit Line', 10000, 580, '2024-03-17'),
]

product_vs_party_schema = [
    "party_id", 
    "product_id", 
    "product_name", 
    "loan_amount", 
    "credit_score",
    "event_date"
]

product_vs_party_df = spark.createDataFrame(data=product_vs_party_data, schema=product_vs_party_schema)

#### 3. **Definindo as transformações:**

In [10]:
# Join product_vs_party_df with party_df to get age information
joined_df = product_vs_party_df.join(party_df, "party_id")

In [11]:
pip install ..

Processing /home/jupyter
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: rules
  Building wheel for rules (setup.py) ... [?25ldone
[?25h  Created wheel for rules: filename=rules-1.3-py3-none-any.whl size=1675 sha256=c029bf5cf874a2c9dd1e0b4bce273e0753737b44846e94387c4e6ca1db8ea71c
  Stored in directory: /var/tmp/pip-ephem-wheel-cache-87xx19_k/wheels/a5/eb/36/799b873945ed0b79b38fc9a1580abda5f4e3699c5c138d7fa9
Successfully built rules
Installing collected packages: rules
  Attempting uninstall: rules
    Found existing installation: rules 1.3
    Uninstalling rules-1.3:
      Successfully uninstalled rules-1.3
Successfully installed rules-1.3
Note: you may need to restart the kernel to use updated packages.


In [12]:
sc = spark.sparkContext
sc.addFile('/home/jupyter/eligibility/rules.py') 

In [13]:
from eligibility.rules import is_eligible_age_udf, has_recent_car_loan_udf
from pyspark.sql.functions import avg, col, count, desc, round, size, udf

In [14]:
joined_df.show()

                                                                                

+--------+----------+--------------------+-----------+------------+----------+------------+---+-------------------+
|party_id|product_id|        product_name|loan_amount|credit_score|event_date| hashed_name|age|other_customer_data|
+--------+----------+--------------------+-----------+------------+----------+------------+---+-------------------+
|    P003|    PROD03|         Credit Line|      10000|         580|2024-03-17|hashed_name3| 22|        other_data3|
|    P002|    PROD01|            Car Loan|      25000|         630|2024-02-14|hashed_name2| 75|        other_data2|
|    P001|    PROD01|            Car Loan|      30000|         720|2023-12-25|hashed_name1| 35|        other_data1|
|    P001|    PROD02|Solar Equipment Loan|      15000|         720|2024-01-15|hashed_name1| 35|        other_data1|
+--------+----------+--------------------+-----------+------------+----------+------------+---+-------------------+



                                                                                

#### Idade

In [21]:
eligible_df = joined_df.withColumn("eligible_age", is_eligible_age_udf(col("age")))

In [22]:
eligible_df.show()



+--------+----------+--------------------+-----------+------------+----------+------------+---+-------------------+------------+
|party_id|product_id|        product_name|loan_amount|credit_score|event_date| hashed_name|age|other_customer_data|eligible_age|
+--------+----------+--------------------+-----------+------------+----------+------------+---+-------------------+------------+
|    P003|    PROD03|         Credit Line|      10000|         580|2024-03-17|hashed_name3| 22|        other_data3|        true|
|    P002|    PROD01|            Car Loan|      25000|         630|2024-02-14|hashed_name2| 75|        other_data2|       false|
|    P001|    PROD01|            Car Loan|      30000|         720|2023-12-25|hashed_name1| 35|        other_data1|        true|
|    P001|    PROD02|Solar Equipment Loan|      15000|         720|2024-01-15|hashed_name1| 35|        other_data1|        true|
+--------+----------+--------------------+-----------+------------+----------+------------+---+--

                                                                                

#### Tem financiamento de carro nos últimos 90 dias?

In [23]:
eligible_df = eligible_df.withColumn("eligible_has_loan_car", has_recent_car_loan_udf(col("product_name"),col("event_date")))

In [24]:
eligible_df.show()



+--------+----------+--------------------+-----------+------------+----------+------------+---+-------------------+------------+---------------------+
|party_id|product_id|        product_name|loan_amount|credit_score|event_date| hashed_name|age|other_customer_data|eligible_age|eligible_has_loan_car|
+--------+----------+--------------------+-----------+------------+----------+------------+---+-------------------+------------+---------------------+
|    P003|    PROD03|         Credit Line|      10000|         580|2024-03-17|hashed_name3| 22|        other_data3|        true|                false|
|    P002|    PROD01|            Car Loan|      25000|         630|2024-02-14|hashed_name2| 75|        other_data2|       false|                false|
|    P001|    PROD01|            Car Loan|      30000|         720|2023-12-25|hashed_name1| 35|        other_data1|        true|                false|
|    P001|    PROD02|Solar Equipment Loan|      15000|         720|2024-01-15|hashed_name1| 35

                                                                                

#### 4. Salvando Data transformado

In [26]:
from pyspark.sql import SparkSession, functions as F
columns_to_keep = ["party_id", "hashed_name", "other_customer_data", "eligible_age", "eligible_has_loan_car"]

# Filter and group by 'party_id', taking the first value per column
aggregated_df = eligible_df.select(columns_to_keep).groupBy("party_id").agg(
    *[F.first(col).alias(col) for col in columns_to_keep[1:]]
)

# Show the results
aggregated_df.show()

                                                                                

+--------+------------+-------------------+------------+---------------------+
|party_id| hashed_name|other_customer_data|eligible_age|eligible_has_loan_car|
+--------+------------+-------------------+------------+---------------------+
|    P003|hashed_name3|        other_data3|        true|                false|
|    P002|hashed_name2|        other_data2|       false|                false|
|    P001|hashed_name1|        other_data1|        true|                false|
+--------+------------+-------------------+------------+---------------------+

