# ETL Sistema Bancario - Projeto Integrador Grupo 05

## Configurando o ambiente:

In [1]:
!pip install pyspark
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar -xvzf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark



In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/azureuser/spark-3.3.2-bin-hadoop3"
import findspark
findspark.init()

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("Read CSV")\
    .getOrCreate()

## Importando os dados:

### Clientes:

In [2]:
# leitura de todos os arquivos CSV na pasta clientes
clientes = spark.read.option("sep", ";").schema("id int, nome string, email string, data_cadastro timestamp, telefone string").option("header", "false").csv("dados/clientes/*.csv")

# filtrando o dataframe para excluir o arquivo clientes-001.csv
clientes_sem_header = clientes.filter(~input_file_name().rlike("clients-001.csv"))
clientes_com_header = spark.read.option("sep", ";").schema("id int, nome string, email string, data_cadastro timestamp, telefone string").option("header", "true").csv("dados/clientes/clients-001.csv")

# unindo os dois dataframes
clientes = clientes_com_header.union(clientes_sem_header)

clientes.count()
clientes.show()

+---+--------------------+--------------------+-------------------+----------------+
| id|                nome|               email|      data_cadastro|        telefone|
+---+--------------------+--------------------+-------------------+----------------+
|641|Priscila Felix do...|priscila-felix-do...|2021-03-28 18:46:57|+55(30)2227-2428|
| 94|             idelmon|idelmon_94@gmail.com|2019-09-19 12:33:19|+55(29)3027-2026|
|584|Liliane soares da...|liliane-soares-da...|2021-02-10 19:15:30|+55(21)2024-2520|
|580|Fagner jose dos s...|fagner-jose-dos-s...|2021-02-07 01:47:04|+55(24)2624-2029|
| 21|               Cildo|  cildo_21@gmail.com|2019-07-30 11:40:10|+55(21)2222-2422|
|582|Nielton da Silva ...|nielton-da-silva-...|2021-02-09 00:11:22|+55(27)2028-2828|
|586|Armando Teles da ...|armando-teles-da-...|2021-02-12 15:20:14|+55(27)2720-2230|
|151|            Fabricio|fabricio_151@gmai...|2019-10-14 21:16:27|+55(20)2121-2326|
| 83|       Flavio junior|flavio-junior_83@...|2019-09-11 15:24:0

### Transaction-in:

In [3]:
# leitura de todos os arquivos CSV na pasta transaction-in
transaction_in = spark.read.option("sep", ";").schema("id int, cliente_id int, valor double, data timestamp").option("header", "false").csv("/home/azureuser/transaction-in/*.csv")

# filtrando o dataframe para excluir o arquivo transaction-in-001.csv
transaction_in_sem_header = transaction_in.filter(~input_file_name().rlike("transaction-in-001.csv"))
transaction_in_com_header = spark.read.option("sep", ";").schema("id int, cliente_id int, valor double, data timestamp").option("header", "true").csv("/home/azureuser/transaction-in/transaction-in-001.csv")

# unindo os dois dataframes
transaction_in = transaction_in_com_header.union(transaction_in_sem_header)

transaction_in.count()
transaction_in.show()

+----+----------+-----+-------------------+
|  id|cliente_id|valor|               data|
+----+----------+-----+-------------------+
|8615|       586|  0.2|2022-01-19 20:12:26|
|8613|       586|  0.2|2022-01-19 20:11:25|
|8611|       586|  0.2|2022-01-19 20:10:05|
|8606|       910|300.0|2022-01-19 19:59:36|
|8604|        76|100.0|2022-01-18 12:48:14|
|8603|        76|100.0|2022-01-18 12:48:04|
|8602|        76|100.0|2022-01-18 12:47:47|
|8601|        76|100.0|2022-01-18 12:47:43|
|8600|        76|100.0|2022-01-18 12:47:39|
|8599|        76|100.0|2022-01-18 12:43:05|
|8598|        76|100.0|2022-01-18 12:42:56|
|8597|        76|100.0|2022-01-18 12:40:28|
|8596|        76|100.0|2022-01-18 12:38:19|
|8595|        76|100.0|2022-01-18 12:37:59|
|8594|        76|100.0|2022-01-18 12:37:29|
|8593|        76|100.0|2022-01-18 12:37:19|
|8592|       907| 10.0|2022-01-18 12:30:26|
|8591|       907| 10.0|2022-01-18 12:30:14|
|8590|       907| 10.0|2022-01-18 12:30:10|
|8589|       907| 10.0|2022-01-1

### Transaction-out:

In [4]:
transaction_out = spark.read.option("sep", ";").schema("id int, cliente_id int, valor double, data timestamp").option("header", "false").csv("/home/azureuser/transaction-out/*.csv")

transaction_out_sem_header = transaction_out.filter(~input_file_name().rlike("transaction-out-001.csv"))
transaction_out_com_header = spark.read.option("sep", ";")\
    .schema("id int, cliente_id int, valor double, data timestamp")\
    .option("header", "true")\
    .csv("/home/azureuser/transaction-out/transaction-out-001.csv")

transaction_out = transaction_out_com_header.union(transaction_out_sem_header)

transaction_out.count()
transaction_out.show()

+----+----------+------+-------------------+
|  id|cliente_id| valor|               data|
+----+----------+------+-------------------+
|8607|       910|  -2.0|2022-01-19 20:15:26|
|8608|       910|  -2.0|2022-01-19 20:14:56|
|8609|       910|  -2.0|2022-01-19 20:14:26|
|8610|       910|  -2.0|2022-01-19 20:13:56|
|8612|       910|  -2.0|2022-01-19 20:13:26|
|8614|       910|  -2.0|2022-01-19 20:12:56|
|8573|       671| -10.0|2022-01-13 15:21:25|
|8574|       671| -10.0|2022-01-13 15:20:55|
|8575|       671|  -5.0|2022-01-13 15:20:25|
|8576|       671| -10.0|2022-01-13 15:19:55|
|8577|       671|  -2.0|2022-01-13 15:19:25|
|8580|       671| -10.0|2022-01-13 15:18:55|
|8581|       671|-100.0|2022-01-13 15:18:25|
|8582|       671| -15.0|2022-01-13 15:17:55|
|8583|       671|-100.0|2022-01-13 15:17:25|
|8584|       671|  -2.0|2022-01-13 15:16:55|
|8579|       370|  -7.0|2022-01-13 03:44:57|
|8578|       370|  -3.0|2022-01-13 03:37:43|
|8572|       671|  -2.0|2022-01-12 19:43:20|
|8539|    

## Análises/Transformações para identificar as fraudes:

### Unindo as duas tabelas de transações (in, out):

In [5]:
# df_transactions ----- O Dataframe de todas as transações (in e out)
df_transactions = (transaction_in.union(transaction_out))

print(f"Contagem de todas as transações: {df_transactions.count()}")

df_transactions.show(truncate=False)

Contagem de todas as transações: 7272
+----+----------+-----+-------------------+
|id  |cliente_id|valor|data               |
+----+----------+-----+-------------------+
|8615|586       |0.2  |2022-01-19 20:12:26|
|8613|586       |0.2  |2022-01-19 20:11:25|
|8611|586       |0.2  |2022-01-19 20:10:05|
|8606|910       |300.0|2022-01-19 19:59:36|
|8604|76        |100.0|2022-01-18 12:48:14|
|8603|76        |100.0|2022-01-18 12:48:04|
|8602|76        |100.0|2022-01-18 12:47:47|
|8601|76        |100.0|2022-01-18 12:47:43|
|8600|76        |100.0|2022-01-18 12:47:39|
|8599|76        |100.0|2022-01-18 12:43:05|
|8598|76        |100.0|2022-01-18 12:42:56|
|8597|76        |100.0|2022-01-18 12:40:28|
|8596|76        |100.0|2022-01-18 12:38:19|
|8595|76        |100.0|2022-01-18 12:37:59|
|8594|76        |100.0|2022-01-18 12:37:29|
|8593|76        |100.0|2022-01-18 12:37:19|
|8592|907       |10.0 |2022-01-18 12:30:26|
|8591|907       |10.0 |2022-01-18 12:30:14|
|8590|907       |10.0 |2022-01-18 12:3

### Adicionando na tabela df_transaction uma coluna para a diferença de tempo entre as transações:

In [6]:
from pyspark.sql.functions import lag, datediff, col, when, lit,desc, asc
from pyspark.sql.window import Window

# Essa window irá particionar a tabela através do cliente_id e ordenar essa partição por data
window_spec = Window.partitionBy("cliente_id").orderBy("data")

# Essa parte cria a coluna "diferenca_de_tempo" para o tempo entre as transações (obs.: O tempo será 0, quando a diferenca de tempo for nula, ou seja, quando subsequente houver um outro cliente e não houver diferença de tempo entre eles.
diferenca_de_tempo = (col("data").cast("long") - 
                   lag(col("data")).over(window_spec).cast("long"))

df_transactions = df_transactions.withColumn("diferenca_de_tempo", when(diferenca_de_tempo.isNull(), lit(0)).otherwise(diferenca_de_tempo))

df_transactions.show()

+---+----------+-----+-------------------+------------------+
| id|cliente_id|valor|               data|diferenca_de_tempo|
+---+----------+-----+-------------------+------------------+
| 58|         2|-20.0|2019-09-23 22:49:44|                 0|
| 60|         2| -5.0|2019-09-28 02:12:19|            357755|
| 99|         2| -2.0|2019-10-02 18:21:43|            403764|
|100|         2| -2.0|2019-10-02 18:23:58|               135|
|101|         2|-10.0|2019-10-02 19:37:18|              4400|
|220|         2| -2.0|2019-11-27 22:38:20|           4849262|
|226|         2| -2.0|2019-11-28 19:58:05|             76785|
|227|         2| -2.0|2019-11-28 19:58:24|                19|
|228|         2| -2.0|2019-11-28 19:58:44|                20|
|231|         2| -2.0|2019-11-30 14:38:14|            153570|
|232|         2| -2.0|2019-11-30 14:38:49|                35|
|233|         2| -2.0|2019-11-30 14:41:20|               151|
|234|         2| -2.0|2019-11-30 14:45:02|               222|
|244|   

### Adicionando a coluna 'fraudado', do tipo boolean, à tabela df_transactions:

In [9]:
from pyspark.sql.functions import col, when

df_transactions = df_transactions\
    .withColumn("fraudado", when((col("diferenca_de_tempo") <= 120) & (col("diferenca_de_tempo") != 0), True)\
    .otherwise(False))

df_transactions.show()

+---+----------+-----+-------------------+------------------+--------+
| id|cliente_id|valor|               data|diferenca_de_tempo|fraudado|
+---+----------+-----+-------------------+------------------+--------+
| 58|         2|-20.0|2019-09-23 22:49:44|                 0|   false|
| 60|         2| -5.0|2019-09-28 02:12:19|            357755|   false|
| 99|         2| -2.0|2019-10-02 18:21:43|            403764|   false|
|100|         2| -2.0|2019-10-02 18:23:58|               135|   false|
|101|         2|-10.0|2019-10-02 19:37:18|              4400|   false|
|220|         2| -2.0|2019-11-27 22:38:20|           4849262|   false|
|226|         2| -2.0|2019-11-28 19:58:05|             76785|   false|
|227|         2| -2.0|2019-11-28 19:58:24|                19|    true|
|228|         2| -2.0|2019-11-28 19:58:44|                20|    true|
|231|         2| -2.0|2019-11-30 14:38:14|            153570|   false|
|232|         2| -2.0|2019-11-30 14:38:49|                35|    true|
|233| 

## Analisando o DataFrame gerado:

In [None]:
df_transactions.printSchema()

In [None]:
df_transactions.count()

In [11]:
df_transactions.createOrReplaceTempView("transacoes_view")

In [12]:
spark.sql("""
    SELECT count(id) AS transacoes_fraudadas 
    FROM transacoes_view 
    WHERE fraudado == True""").show()

+--------------------+
|transacoes_fraudadas|
+--------------------+
|                2008|
+--------------------+



In [13]:
spark.sql("""
    SELECT cliente_id, COUNT(cliente_id) AS transacoes_por_cliente 
    FROM transacoes_view 
    GROUP BY cliente_id""").show()

+----------+----------------------+
|cliente_id|transacoes_por_cliente|
+----------+----------------------+
|       211|                   113|
|        76|                    12|
|       671|                  1069|
|       431|                    93|
|       732|                   933|
|       586|                    85|
|       907|                     5|
|       730|                   685|
|       702|                   191|
|       370|                   346|
|       910|                     7|
|       909|                     1|
|        74|                   582|
|       737|                   150|
|       530|                     6|
|       731|                    21|
|        44|                    25|
|       663|                     1|
|       685|                    11|
|       727|                     5|
+----------+----------------------+
only showing top 20 rows



In [14]:
spark.sql("""
    SELECT cliente_id, COUNT(cliente_id) AS qtd_fraudes_por_cliente 
    FROM transacoes_view 
    WHERE fraudado == True 
    GROUP BY cliente_id""").show()

+----------+-----------------------+
|cliente_id|qtd_fraudes_por_cliente|
+----------+-----------------------+
|         2|                     22|
|         4|                     91|
|        44|                      9|
|        73|                    227|
|        74|                    196|
|        76|                      8|
|       205|                    170|
|       211|                     11|
|       335|                     13|
|       370|                      1|
|       389|                      1|
|       431|                      5|
|       449|                      4|
|       495|                      2|
|       515|                     14|
|       523|                     91|
|       529|                      6|
|       530|                      1|
|       533|                    132|
|       534|                      9|
+----------+-----------------------+
only showing top 20 rows



In [15]:
spark.sql(
    """SELECT COUNT(DISTINCT cliente_id) AS total_clientes_distintos 
    FROM transacoes_view"""
).show()

+------------------------+
|total_clientes_distintos|
+------------------------+
|                      67|
+------------------------+



In [16]:
spark.sql(
    """SELECT COUNT(DISTINCT cliente_id) AS total_clientes_distintos_com_cartoes_fraudados 
    FROM transacoes_view 
    WHERE fraudado = True"""
).show()

+----------------------------------------------+
|total_clientes_distintos_com_cartoes_fraudados|
+----------------------------------------------+
|                                            38|
+----------------------------------------------+



In [21]:
quantidade_clientes_sem_fraude = spark.sql("SELECT COUNT(*) as quant_clientes_sem_fraude \
                                            FROM ( \
                                            SELECT cliente_id, SUM(CASE WHEN fraudado = True THEN 1 ELSE 0 END) as qtd_fraude \
                                            FROM transacoes_view \
                                            GROUP BY cliente_id \
                                            HAVING qtd_fraude = 0 \
                                            )")

quantidade_clientes_sem_fraude.show()

+-------------------------+
|quant_clientes_sem_fraude|
+-------------------------+
|                       29|
+-------------------------+



In [17]:
clientes.createOrReplaceTempView("clientes_view")

In [18]:
spark.sql(
    """SELECT COUNT(DISTINCT cliente_id) AS total_clientes_distintos 
    FROM transacoes_view INNER JOIN clientes_view ON clientes_view.id = transacoes_view.cliente_id"""
).show()

+------------------------+
|total_clientes_distintos|
+------------------------+
|                      60|
+------------------------+



#### Migração dos dados do Spark para o banco SQL Server (tabela clientes)

In [None]:
import pyodbc
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Criar uma sessão do Spark
spark = SparkSession.builder.appName("Exemplo").getOrCreate()

# Criando uma conexão com o SQL Server

conn = pyodbc.connect("Driver={ODBC Driver 18 for SQL Server};Server=tcp:projeto-integrador5.database.windows.net,1433;Database=projeto_integrador;Uid=ProjetoIntegrador;Pwd=#r00t123;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;")
cursor = conn.cursor()


# Cria a tabela "clientes" caso ela não exista ou exclui a tabela existente e cria uma nova
cursor.execute("IF OBJECT_ID('clientes', 'U') IS NOT NULL "
               "DROP TABLE clientes; "
               "CREATE TABLE clientes "
               "( "
               "id INT PRIMARY KEY, "
               "nome VARCHAR(100), "
               "email VARCHAR(100), "
               "data_cadastro DATETIME, "
               "telefone VARCHAR(100)"
               "); ")
conn.commit()


array_clientes = [row.asDict() for row in clientes.collect()]

    
for cliente in array_clientes:
    cursor.execute("INSERT INTO clientes (id, nome, email, data_cadastro, telefone) "
            "VALUES (?, ?, ?, ?, ?)",
            cliente["id"], cliente["nome"], cliente["email"], cliente["data_cadastro"], cliente["telefone"])
    conn.commit()

cursor.close()
conn.close()  

#### Migração dos dados do Spark para o banco SQL Server (tabela transações)

In [None]:
import pyodbc
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Criar uma sessão do Spark
spark = SparkSession.builder.appName("Exemplo").getOrCreate()

# Criando uma conexão com o SQL Server

conn = pyodbc.connect("Driver={ODBC Driver 18 for SQL Server};Server=tcp:projeto-integrador5.database.windows.net,1433;Database=projeto_integrador;Uid=ProjetoIntegrador;Pwd=#r00t123;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;")
cursor = conn.cursor()


# Cria a tabela "transacoes" caso ela não exista ou exclui a tabela existente e cria uma nova
cursor.execute("IF OBJECT_ID('transacoes', 'U') IS NOT NULL "
               "DROP TABLE transacoes; "
               "CREATE TABLE transacoes "
               "( "
               "id INT PRIMARY KEY, "
               "cliente_id INT, "
               "valor FLOAT, "
               "data DATETIME, "
               "diferenca_de_tempo INT, "
               "fraudado BIT "
               "); ")
conn.commit()




array_transaction = [row.asDict() for row in df_transactions.collect()]

    
for transacao in array_transaction:
    cursor.execute("INSERT INTO transacoes (id, cliente_id, valor, data, diferenca_de_tempo, fraudado) "
            "VALUES (?, ?, ?, ?, ?, ?)",
            transacao["id"], transacao["cliente_id"], transacao["valor"], transacao["data"], transacao["diferenca_de_tempo"], transacao["fraudado"])
    conn.commit()

cursor.close()
conn.close()