<a href="https://colab.research.google.com/github/cristianoaraujodasilveira/getnet/blob/main/desafio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [13]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [4]:
# iniciar uma sessão local
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [9]:
#criando uma base de teste - contratos
import csv

header = ["contract_id", "client_id", "client_name", "percentage", "is_active"]

data = [  
  [3, 3545, "Magazine Luana", 2.00, True],
  [4, 3545, "Magazine Luana", 1.95, False],
  [5, 3509, "Lojas Italianas", 1, True],
  [6, 3510, "Carrerfive", 3.00, True]
]

with open('contratos.csv', 'w') as f:
  writer = csv.writer(f)
  writer.writerow(header)
  writer.writerows(data)  


header = ["transaction_id", "client_id", "total_amount", "discount_percentage"]

data = [  
  [1, 3545, 3000, 6.99],
  [2, 3545, 4500, 0.45],
  [3, 3509, 69998, 0],
  [4, 3510, 1, None],
  [5, 4510, 34, 40]
]

with open('transacoes.csv', 'w') as f:
  writer = csv.writer(f)
  writer.writerow(header)
  writer.writerows(data)    

In [5]:
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType

In [12]:
#carregar dados 
df_c = spark.read.option("inferSchema", "true").option("header",True).option("delimiter", ",").csv("contratos.csv")
df_t = spark.read.option("inferSchema", "true").option("header",True).option("delimiter", ",").csv("transacoes.csv")

In [15]:
#aplicando transformações
df = df_t.join(df_c, df_c.client_id == df_t.client_id, 'inner').filter(df_c.is_active == True)

#conversão de tipo de campos
df = df.withColumn("total_amount",df.total_amount.cast('float'))
df = df.withColumn("discount_percentage",df.discount_percentage.cast('float'))

#adicionando valor liquido
def get_net_value(x):
    discount_percentage, total_amount = x
    if not isinstance(discount_percentage, (float)):
        discount_percentage = 0.0        
    net_value = (total_amount-((total_amount*discount_percentage)/100.0))
    return net_value

udf_get_net_value = udf(lambda x: get_net_value(x), returnType=FloatType())
df = df.withColumn("net_value", udf_get_net_value(array(df.discount_percentage, df.total_amount)))

#adicionando valor ganho
def get_earned_value(x):
    net_value, percentage = x
    earned_value = ((net_value * percentage)/100.0)
    return earned_value
udf_get_earned_value = udf(lambda x: get_earned_value(x), returnType=FloatType())
df = df.withColumn("earned_value", udf_get_earned_value(array(df.net_value, df.percentage)))

In [16]:
#extraindo informação de ganho total
df.createOrReplaceTempView("vendas")
total_gain = spark.sql("""select round(sum(earned_value), 2) as total_gain from vendas""").collect()[0]['total_gain']
print(f"total ganho = {total_gain}")

total = 845.41
