# **Instalação (apenas na primeira vez de uso)**

**1.  Instalar o Java**

Pelo terminal bash:  sudo apt install default-jre


**2.  Instalar o Pyspark** 

In [None]:
!pip install pyspark

**3.  Baixar o arquivo compactado (tgz) do Apache Spark na versão 3.3.2. pelo comando** " !wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz " 

**-q** é usado para desativar a saída de mensagens do wget, o que significa que o comando será executado em silêncio, sem exibir mensagens de progresso ou de conclusão

**!** indica que o comando será executado diretamente no sistema operacional, em vez de ser interpretado como código Python. 



In [13]:
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

**4. Extrair o conteúdo do arquivo "spark-3.3.2-bin-hadoop3.tgz" para o diretório atual pelo comando "!tar -xvzf spark-3.3.2-bin-hadoop3.tgz"**



In [None]:
!tar -xvzf spark-3.3.2-bin-hadoop3.tgz

**5. Instalar o "findspark" :** um pacote Python que ajuda a localizar e configurar automaticamente a biblioteca Apache Spark no ambiente de desenvolvimento. Ele permite que você use o Spark com facilidade em seu ambiente de desenvolvimento local, sem precisar configurar manualmente o caminho do Spark e outras variáveis de ambiente.

In [5]:
!pip install -q findspark


**6.Listar o conteúdo do diretório** "/usr/lib/jvm/" **para que ver as versões do Java instaladas no sistema.**

**7.Listar o conteúdo do diretório do spark.**

**8. Imprimir o diretório atual por pwd**


In [1]:
!ls /usr/lib/jvm/

default-java  java-1.11.0-openjdk-amd64  java-11-openjdk-amd64


In [2]:
!ls spark-3.3.2-bin-hadoop3  

LICENSE  R	    RELEASE  conf  examples  kubernetes  python  yarn
NOTICE	 README.md  bin      data  jars      licenses	 sbin


In [1]:
!pwd

/home/azureuser/projeto


**9.Fazer as instalações do ODBC e pyodbc para conexão com banco de dados:** 

- No terminal bash: instalar o ODBC colocando o código presente no "UBUNTU" em https://learn.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-ver15&tabs=ubuntu18-install%2Calpine17-install%2Cdebian8-install%2Credhat7-13-install%2Crhel7-offline 
- No terminal bash: Instalar "sudo apt install unixodbc-dev"

In [None]:
!pip install pyodbc

# **Iniciar o spark**

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/azureuser/projeto/spark-3.3.2-bin-hadoop3"
import findspark
findspark.init()

Este é um código em Python que define duas variáveis de ambiente em Python usando a biblioteca os.

os.environ é um dicionário que contém todas as variáveis de ambiente do sistema operacional em que o Python está sendo executado.

No código, os.environ["JAVA_HOME"] é definido como "/usr/lib/jvm/java-11-openjdk-amd64", que é o caminho para o diretório Java Home.

os.environ["SPARK_HOME"] é definido como "/content/spark-3.3.2-bin-hadoop3", que é o caminho para o diretório de instalação do Apache Spark.

Em seguida, o código usa a biblioteca findspark para inicializar o Spark.para inicializar o Spark. Isso é necessário para permitir que o Python e o Spark se comuniquem corretamente. findspark é uma biblioteca Python que ajuda a localizar o diretório Spark em um sistema e define a variável de ambiente PYSPARK_DRIVER_PYTHON para "jupyter" para permitir a execução de código Spark.

# **Criar uma sessão do Spark e manipular os CSVs**

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Fraudes").getOrCreate()

#CLIENTES
# leitura do primeiro CSV com o cabecalho
df_client_header = spark.read.csv("/home/azureuser/projeto/dados/clientes/clients-001.csv", sep=';', inferSchema=True, header=True)

# leitura dos outros CSV em conjunto
file_path_clients = [f"/home/azureuser/projeto/dados/clientes/clients-00{i}.csv" for i in range(2, 5)]
df_clients = spark.read.csv(file_path_clients, sep=';', inferSchema=True, header=False)

# uniao do CSV com cabeçalho com os demais 
df_clients_final = df_client_header.union(df_clients)
df_clientes_order = df_clients_final.orderBy('id')
df_clientes_order.show()
df_clientes_order.count()


#TRANSAÇÃO_IN
df_transaction_in_header = spark.read.csv("/home/azureuser/projeto/dados/transação_in/transaction-in-001.csv", sep=';', inferSchema=True, header=True)

# leitura dos outros CSV em conjunto
file_paths_transaction_in = [f"/home/azureuser/projeto/dados/transação_in/transaction-in-00{i}.csv" for i in range(2, 10)]
df_transaction_in = spark.read.csv(file_paths_transaction_in, sep=';', inferSchema=True, header=False)
# df_transaction_in.orderBy("_c0").show()

# uniao do CSV com cabeçalho com os demais 
df_transaction_in_final = df_transaction_in_header.union(df_transaction_in)
df_transaction_in_order = df_transaction_in_final.orderBy('id')
df_transaction_in_order.show()
df_transaction_in_order.count()


#TRANSAÇÃO_OUT
df_transaction_out_header = spark.read.csv("/home/azureuser/projeto/dados/transação_out/transaction-out-001.csv", sep=';', inferSchema=True, header=True)

# leitura dos outros CSV em conjunto
file_paths_transaction_out = [f"/home/azureuser/projeto/dados/transação_out/transaction-out-{str(i).zfill(3)}.csv" for i in range(2, 64)]
df_transaction_out = spark.read.csv(file_paths_transaction_out, sep=';', inferSchema=True, header=False)

# df_transaction_in.orderBy("_c0").show()

# uniao do CSV com cabeçalho com os demais 
df_transaction_out_final = df_transaction_out_header.union(df_transaction_out)
df_transaction_out_order = df_transaction_out_final.orderBy('id')
df_transaction_out_order.show()
df_transaction_out_order.count()



# **Dataframe único de transações**

In [None]:
from pyspark.sql.functions import lit

df_in = df_transaction_in_final.withColumn("tipo_transação", lit("IN"))
df_out = df_transaction_out_final.withColumn("tipo_transação", lit("OUT"))

df_transactions = df_in.union(df_out)

df_transactions_order=df_transactions.orderBy('id')

df_transactions_order.show()

# **Verificando se tem colunas de ID repetidos no dataframe "transactions"** 

In [6]:
from pyspark.sql.functions import col

# contar o número de linhas antes de remover duplicatas
total_rows = df_transactions.count()

# remover duplicatas por 'id'
df_no_duplicates = df_transactions.dropDuplicates(['id'])

# contar o número de linhas após a remoção de duplicatas
unique_rows = df_no_duplicates.count()

# verificar se há duplicatas por 'id'
if total_rows > unique_rows:
    print("Há dados duplicados por ID no dataframe.")
else:
    print("Não há dados duplicados por ID no dataframe.")



Não há dados duplicados por ID no dataframe.


# **Verificando os tipos dos dados nas colunas nos dataframes**

In [4]:
#Imprimir os tipos de dado de cada coluna dos dataframes

df_clients_final.printSchema()

df_transaction_in_final.printSchema()

df_transaction_out_final.printSchema()

df_transactions.printSchema()


root
 |-- id: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- email: string (nullable = true)
 |-- data_cadastro: timestamp (nullable = true)
 |-- telefone: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- cliente_id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- data: timestamp (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- cliente_id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- data: timestamp (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- cliente_id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- tipo_transação: string (nullable = false)



# Conexão com o banco de dados e importar dados

In [None]:
import pyodbc

#Conectar com SQL server
try:
    conn = pyodbc.connect("Driver={ODBC Driver 18 for SQL Server};Server=tcp:projeto-accenture.database.windows.net,1433;Database=projeto_final;Uid=projeto;Pwd=4anfiHF5A;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;")
    cursor = conn.cursor()
    print("Conexão estabelecida com sucesso!")
except pyodbc.Error as ex:
    print("Erro ao estabelecer conexão:", ex)

#Criar tabela no SQL
    # Tabela de clientes
cursor.execute('''
                CREATE TABLE clients (
                    id INT PRIMARY KEY,
                    nome VARCHAR(255),
                    email VARCHAR(255),
                    data_cadastro DATETIME,
                    telefone VARCHAR(20)
                )
                ''')


   # Tabela de transactions
cursor.execute('''
                CREATE TABLE transactions (
                    id INT PRIMARY KEY,
                    cliente_id INT,
                    valor FLOAT,
                    data DATETIME,
                    tipo_transacao VARCHAR(50),
                )
                ''')

# Tabela de transaction_in
cursor.execute('''
                CREATE TABLE transaction_in (
                    id INT PRIMARY KEY,
                    cliente_id INT,
                    valor FLOAT,
                    data DATETIME,
                )
                ''')

# Tabela de transaction_out
cursor.execute('''
                CREATE TABLE transaction_out(
                    id INT PRIMARY KEY,
                    cliente_id INT,
                    valor FLOAT,
                    data DATETIME,
                )
                ''')

 
# Inserir dados na tabela de clientes
try:
    for row in df_clientes_order.collect():             # Inserindo uma linha na tabela
        cursor.execute("INSERT INTO clients_2 (id, nome, email, data_cadastro, telefone) VALUES (?, ?, ?, ?, ? )", row[0], row[1], row[2], row[3], row[4])
    print("Dados inseridos na tabela de clientes com sucesso!")

except Exception as e:
    print("Não foi possível inserir dados na tabela de clientes:", e)

# Inserir dados na tabela de transactions
try:
    for row in df_transactions.collect():               # Inserindo uma linha na tabela
        cursor.execute("INSERT INTO transactions_2 (id, cliente_id, valor, data, tipo_transacao) VALUES (?, ?, ?, ?, ?)", row[0], row[1], row[2], row[3], row[4])
    print("Dados inseridos na tabela de transactions com sucesso!")

except Exception as e:
    print("Não foi possível inserir dados na tabela de transactions:", e)   
    
# Inserir dados na tabela de transaction_in
try:
    for row in df_transaction_in_order.collect():        # Inserindo uma linha na tabela
        cursor.execute("INSERT INTO transaction_in_2 (id, cliente_id, valor, data) VALUES (?, ?, ?, ? )", row[0], row[1], row[2], row[3])
    print("Dados inseridos na tabela de transaction_in com sucesso!")

except Exception as e:
    print("Não foi possível inserir dados na tabela de transaction_in:", e)

 
    # Inserir dados na tabela de transaction_out
try:
    for row in df_transaction_in_order.collect():    # Inserindo uma linha na tabela
        cursor.execute("INSERT INTO transaction_out_2 (id, cliente_id, valor, data) VALUES (?, ?, ?, ? )", row[0], row[1], row[2], row[3])
    print("Dados inseridos na tabela de transaction_out com sucesso!")

except Exception as e:
    print("Não foi possível inserir dados na tabela de transaction_out:", e)

    

   
# Salvando as mudanças
conn.commit()

# Fechando a conexão
conn.close()

# Encerrando a sessão Spark
spark.stop()