In [2]:
import pyspark 
import pyspark.sql.functions
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .appName("FinancialTransactions") \
    .getOrCreate()

In [4]:
data = [  
    ("1", "15/01/2023", "1000.50", "Compra de Ações"),
    ("2", "20/02/2024", "1500.75", "Compra de Ações"),
    ("3", "17/03/2021", "500.00", "Venda de Ações"),
    ("1", "02/01/2022", "1000.50", "Venda de Ações"),  # Duplicado
    ("4", "18/02/2023", "2500.25", "Compra de Ações")
]

schema = ("id STRING, data STRING, valor STRING, desc STRING")

df = spark.createDataFrame(data, schema)

In [10]:
from pyspark.sql.functions import col

# Transforma a coluna valor em inteiro
df = df.withColumn('valor_int', col('valor').cast('int') )

print(df)

DataFrame[id: string, data: string, valor: string, desc: string, valor_int: int]


In [15]:
from pyspark.sql.functions import when

# defini um rating pelo valor gasto do cliente
condicional_1 = (col('valor_int') >= 1500) 
condicional_2 = (col('valor_int') <= 1000) 

df_1 = df.withColumn(
    "Rating por valor",
    when(condicional_1, "A")
    .when(condicional_2, "B")
    .otherwise("C")
)

# Cria a mesma coluna, mas agora deixando apenas uma letra para classificar
condicional_1 = col('desc') == 'Compra de Ações'
condicional_2 = col('desc') == 'Venda de Ações'

df_2 = df_1.withColumn(
    'desc'
    ,when( condicional_1, 'C').otherwise('V')
)
display(df_2)

DataFrame[id: string, data: string, valor: string, desc: string, valor_int: int, Rating por valor: string]

In [16]:
#  Uma segunda forma de cria uma coluna, usando CASE WHEN DO SELECT
df3 = df_2.selectExpr(
    '*',
    "(CASE WHEN DESC == 'C' THEN 'COMPROU' " +
    "ELSE 'VENDEU' END) AS TESTE_TROCA"
)

display(df3)

DataFrame[id: string, data: string, valor: string, desc: string, valor_int: int, Rating por valor: string, TESTE_TROCA: string]

In [21]:
import numpy as np
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf


# Definir uma UDF para atribuir um rating aleatório, semelhante ao APPLy do pandas
def ratingColuna():
    return np.random.randint(0, 10)

rating = udf(ratingColuna, IntegerType())

# Adicionar uma nova coluna de rating ao DataFrame
df_with_rating = df3.withColumn('rating', rating())
display(df_with_rating)

DataFrame[id: string, data: string, valor: string, desc: string, valor_int: int, Rating por valor: string, TESTE_TROCA: string, rating: int]