# **Instalação para utilizar o Spark em uma máquina Linux**

**1.  Instalar o Java**

Pelo terminal bash:  sudo apt install default-jre


**2.  Instalar o Pyspark** 

In [None]:
!pip install pyspark

**3.  Baixar o arquivo compactado (tgz) do Apache Spark na versão 3.3.2. pelo comando** " !wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz " 

**-q** é usado para desativar a saída de mensagens do wget, o que significa que o comando será executado em silêncio, sem exibir mensagens de progresso ou de conclusão

**!** indica que o comando será executado diretamente no sistema operacional, em vez de ser interpretado como código Python. 



In [13]:
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

**4. Extrair o conteúdo do arquivo "spark-3.3.2-bin-hadoop3.tgz" para o diretório atual pelo comando "!tar -xvzf spark-3.3.2-bin-hadoop3.tgz"**



In [None]:
!tar -xvzf spark-3.3.2-bin-hadoop3.tgz

**5. Instalar o "findspark" :** um pacote Python que ajuda a localizar e configurar automaticamente a biblioteca Apache Spark no ambiente de desenvolvimento. Ele permite que você use o Spark com facilidade em seu ambiente de desenvolvimento local, sem precisar configurar manualmente o caminho do Spark e outras variáveis de ambiente.

In [5]:
!pip install -q findspark


**6.Listar o conteúdo do diretório** "/usr/lib/jvm/" **para que ver as versões do Java instaladas no sistema.**

**7.Listar o conteúdo do diretório do spark.**

**8. Imprimir o diretório atual por pwd**


In [1]:
!ls /usr/lib/jvm/

default-java  java-1.11.0-openjdk-amd64  java-11-openjdk-amd64


In [2]:
!ls spark-3.3.2-bin-hadoop3  

LICENSE  R	    RELEASE  conf  examples  kubernetes  python  yarn
NOTICE	 README.md  bin      data  jars      licenses	 sbin


In [1]:
!pwd

/home/azureuser/projeto


**9.Fazer as instalações do ODBC e pyodbc para conexão com banco de dados:** 

- No terminal bash: instalar o ODBC colocando o código presente no "UBUNTU" em https://learn.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-ver15&tabs=ubuntu18-install%2Calpine17-install%2Cdebian8-install%2Credhat7-13-install%2Crhel7-offline 
- No terminal bash: Instalar "sudo apt install unixodbc-dev"

In [None]:
!pip install pyodbc

# **Iniciar o spark**

In [9]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/azureuser/projeto/spark-3.3.2-bin-hadoop3"
import findspark
findspark.init()

# **Criar uma sessão do Spark e manipular os CSVs**

In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Fraudes").getOrCreate()

#CLIENTES
# leitura do primeiro CSV com o cabecalho
df_client_header = spark.read.csv("/home/azureuser/projeto/dados/clientes/clients-001.csv", sep=';', inferSchema=True, header=True)

# leitura dos outros CSV em conjunto
file_path_clients = [f"/home/azureuser/projeto/dados/clientes/clients-00{i}.csv" for i in range(2, 5)]
df_clients = spark.read.csv(file_path_clients, sep=';', inferSchema=True, header=False)

# uniao do CSV com cabeçalho com os demais 
df_clients_final = df_client_header.union(df_clients)
df_clientes_order = df_clients_final.orderBy('id')
df_clientes_order.show()


#TRANSAÇÃO_IN
df_transaction_in_header = spark.read.csv("/home/azureuser/projeto/dados/transação_in/transaction-in-001.csv", sep=';', inferSchema=True, header=True)

# leitura dos outros CSV em conjunto
file_paths_transaction_in = [f"/home/azureuser/projeto/dados/transação_in/transaction-in-00{i}.csv" for i in range(2, 10)]
df_transaction_in = spark.read.csv(file_paths_transaction_in, sep=';', inferSchema=True, header=False)

# uniao do CSV com cabeçalho com os demais 
df_transaction_in_final = df_transaction_in_header.union(df_transaction_in)
df_transaction_in_order = df_transaction_in_final.orderBy('id')
df_transaction_in_order.show()


#TRANSAÇÃO_OUT
df_transaction_out_header = spark.read.csv("/home/azureuser/projeto/dados/transação_out/transaction-out-001.csv", sep=';', inferSchema=True, header=True)

# leitura dos outros CSV em conjunto
file_paths_transaction_out = [f"/home/azureuser/projeto/dados/transação_out/transaction-out-{str(i).zfill(3)}.csv" for i in range(2, 64)]
df_transaction_out = spark.read.csv(file_paths_transaction_out, sep=';', inferSchema=True, header=False)

# uniao do CSV com cabeçalho com os demais 
df_transaction_out_final = df_transaction_out_header.union(df_transaction_out)
df_transaction_out_order = df_transaction_out_final.orderBy('id')
df_transaction_out_order.show()




+---+--------------------+--------------------+-------------------+----------------+
| id|                nome|               email|      data_cadastro|        telefone|
+---+--------------------+--------------------+-------------------+----------------+
|  1|       Fabio Rogerio|fabio-rogerio_1@g...|2019-06-20 15:41:28|+55(27)2529-2222|
|  3|           visitante|visitante_3@gmail...|2019-06-27 20:03:39|+55(26)2524-2423|
|  5|Elisângela Louren...|elisangela-louren...|2019-07-15 11:33:15|+55(27)2921-2822|
|  6|         José rafael|jose-rafael_6@gma...|2019-07-15 11:49:16|+55(24)2930-2521|
|  7|Jamersom Ferreira...|jamersom-ferreira...|2019-07-15 11:51:43|+55(22)2529-2028|
|  8|    Robson Fernandes|robson-fernandes_...|2019-07-16 10:56:22|+55(28)2627-2821|
|  9|        Gisele Lucia|gisele-lucia_9@gm...|2019-07-21 12:26:12|+55(24)2023-2626|
| 11|Luiz Felipe da silva|luiz-felipe-da-si...|2019-07-21 12:30:48|+55(22)2528-2320|
| 12|Leojaine da Silva...|leojaine-da-silva...|2019-07-21 12:33:4

# **Dataframe único de transações**

In [11]:
from pyspark.sql.functions import lit

#Cria uma nova coluna de tipo de  transação ("IN" ou "OUT")
df_in = df_transaction_in_final.withColumn("tipo_transacao", lit("IN"))
df_out = df_transaction_out_final.withColumn("tipo_transacao", lit("OUT"))

# Faz a união dos dois dataframes 
df_transactions = df_in.union(df_out)
df_transactions.show()

+----+----------+-----+-------------------+--------------+
|  id|cliente_id|valor|               data|tipo_transacao|
+----+----------+-----+-------------------+--------------+
|8615|       586|  0.2|2022-01-19 20:12:26|            IN|
|8613|       586|  0.2|2022-01-19 20:11:25|            IN|
|8611|       586|  0.2|2022-01-19 20:10:05|            IN|
|8606|       910|300.0|2022-01-19 19:59:36|            IN|
|8604|        76|100.0|2022-01-18 12:48:14|            IN|
|8603|        76|100.0|2022-01-18 12:48:04|            IN|
|8602|        76|100.0|2022-01-18 12:47:47|            IN|
|8601|        76|100.0|2022-01-18 12:47:43|            IN|
|8600|        76|100.0|2022-01-18 12:47:39|            IN|
|8599|        76|100.0|2022-01-18 12:43:05|            IN|
|8598|        76|100.0|2022-01-18 12:42:56|            IN|
|8597|        76|100.0|2022-01-18 12:40:28|            IN|
|8596|        76|100.0|2022-01-18 12:38:19|            IN|
|8595|        76|100.0|2022-01-18 12:37:59|            I

# **Verificando se tem colunas de ID repetidos no dataframe "transactions"** 

In [4]:
from pyspark.sql.functions import col

# contar o número de linhas antes de remover duplicatas
total_rows = df_transactions.count()

# remover duplicatas por 'id'
df_no_duplicates = df_transactions.dropDuplicates(['id'])

# contar o número de linhas após a remoção de duplicatas
unique_rows = df_no_duplicates.count()

# verificar se há duplicatas por 'id'
if total_rows > unique_rows:
    print("Há dados duplicados por ID no dataframe.")
else:
    print("Não há dados duplicados por ID no dataframe.")



Não há dados duplicados por ID no dataframe.


# **Verificando os tipos dos dados nas colunas nos dataframes**

In [5]:
#Imprimir os tipos de dado de cada coluna dos dataframes

df_clients_final.printSchema()

df_transactions.printSchema()


root
 |-- id: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- email: string (nullable = true)
 |-- data_cadastro: timestamp (nullable = true)
 |-- telefone: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- cliente_id: integer (nullable = true)
 |-- valor: double (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- tipo_transacao: string (nullable = false)

