# Code Structure:
## - start spark session
## - classes
## - main

# Imports

In [78]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, lower, trim, regexp_replace, to_timestamp, round,lag, unix_timestamp, when
from pyspark.sql.types import FloatType
from pyspark.sql.utils import AnalysisException
from pyspark.sql.window import Window

StatementMeta(pool, 0, 78, Finished, Available)

# Start spark session

In [79]:
spark=SparkSession.builder.appName("Transform").getOrCreate()

StatementMeta(pool, 0, 79, Finished, Available)

# Cleaner Dataframe Class
The Dataframe Cleaner class provides methods to clean and transform dataframes and assists the CsvDataframe class.

In [80]:
class DataframeCleaner:
    '''"The DataframeCleaner class provides methods to clean and transform dataframes and assists the CsvDataframe class.'''
    df = None

    def __init__ (self,df):
        self.df = df

    def drop_empty_rows(self):
        """This method removes empty rows from the dataframe."""

        self.df.dropna(how='all', subset=self.df.columns)

    def drop_duplicate(self):
        '''This method removes all the duplicate rows from the dataframe.'''

        self.df.dropDuplicates()

    def drop_empty_valor(self, col_name):
        '''This method removes all the rows that have empty values in the column 'valor'.'''

        self.df.filter(col(col_name).isNotNull())
    
    def format_lowercase(self, column_name):
        '''This method formats a column in a Spark DataFrame to the lower case format.'''

        self.df = self.df.withColumn(column_name, lower(col(column_name)))
    
    def remove_extra_space(self,column_name):
        '''This method removes the extra empty space inside the column.'''

        self.df = self.df.withColumn(column_name, trim(regexp_replace(col(column_name), "\s+", " ")))
     
    def remove_accents(self, column_name):
        '''This method removes the accent in some rows of the "column_name".'''

        self.df = self.df.withColumn(column_name, regexp_replace(column_name, "[áàâãä]", "a"))
        self.df = self.df.withColumn(column_name, regexp_replace(column_name, "[éèêë]", "e"))
        self.df = self.df.withColumn(column_name, regexp_replace(column_name, "[íìîï]", "i"))
        self.df = self.df.withColumn(column_name, regexp_replace(column_name, "[óòôõö]", "o"))
        self.df = self.df.withColumn(column_name, regexp_replace(column_name, "[úùûü]", "u"))
        self.df = self.df.withColumn(column_name, regexp_replace(column_name, "[ç]", "c"))



StatementMeta(pool, 0, 80, Finished, Available)

# Dataframe Class
The Dataframe class aims to provide methods capable of reading CSV files, generating dataframes, and modifying them.

In [81]:
class CsvDataframe():
    '''The Dataframe class aims to provide methods capable of reading CSV files, generating dataframes, and modifying them.'''
    df = None

    def __init__(self, csv_path, category):
        self.csv_path = csv_path
        self.category = category

        
    def _add_header(self):
        '''The "_add_header" method performs a well-defined process to add the correct header to the dataframe.'''

        # Finding the row that will be the header and saving it to a dataframe.
        header = self.raw_df.filter(col("_c0") == "id").limit(1)

        # Excluding from the raw dataframe the row that contains its future header. 
        filtered_df = self.raw_df.where((col("_c0") != "id"))

        # Generating a dataframe with the correct header.
        header_columns = header.first()
        self.df = filtered_df.toDF(*header_columns)
    
    def _fix_schema(self):
        '''The "_fix_schema" method aims to create a series of actions that, at the end, fixes the schematype of the dataframe generated from any CSV file.'''
        
        first_row = self.df.first()
        row = Row(*first_row)
        schema = self.df.schema


        for value in first_row:

            # Get the position of the value in the row's list of values
            value_position = row.index(value)

            # Get the column name from the position
            column_name = schema[value_position].name
            
            # Check if the column type need to be converted to datatime 
            if isinstance(value, str) and ':' in value:
                self.df = self.df.withColumn(column_name, to_timestamp(col(column_name), "yyyy-MM-dd HH:mm:ss Z"))
             
            # Check if the column type needs to be converted to float and limit to 2 decimal places
            elif isinstance(value, str) and '.' in value and '@' not in value:
                self.df = self.df.withColumn(column_name, round(col(column_name).cast(FloatType()), 2))
            
    def read_csv(self, spark):
        '''The "read_csv" method reads a folder that contains CSV files, initially generating a dataframe without header and then returning a dataframe with it.'''

        self.complete_path = self.csv_path + self.category + '.csv'

        try:
            self.raw_df = spark.read.csv(self.complete_path, sep=";", header=False, inferSchema=True)
        except AnalysisException:
            print('The path does not exist. Please, insert a valid path.\nThe "category" passed might not be "clients", "transaction_in" or "transaction_out" or similar.')
            return 0
            
        # Fix and clean the dataframe
        self._add_header()
        self._fix_schema()
    
    def clean_data(self):
        '''This method cleans the dataframe based in a collection of methods of DataframeUtils class.'''
        
        cleaner_df_obj = DataframeCleaner(self.df)
        cleaner_df_obj.drop_empty_rows()
        cleaner_df_obj.drop_duplicate()

        # Only drop empty "valor" columns of transactions
        if self.category != 'clients*':
            cleaner_df_obj.drop_empty_valor("valor")
            self.df = cleaner_df_obj.df
        else:
            cleaner_df_obj.format_lowercase('nome')
            cleaner_df_obj.remove_extra_space('nome')
            cleaner_df_obj.remove_accents('nome')
            self.df = cleaner_df_obj.df
        
        return cleaner_df_obj.df
    
    def save_csv_file(self, file_name, csv_final_path):
        '''This method saves the df in a CSV file.'''

        self.df.coalesce(1).write.mode('overwrite').option("header", "true").csv(csv_final_path + file_name)

    def rename_column (self, old_name, new_name):
        '''This method aims to rename a specific column.'''

        self.df = self.df.withColumnRenamed(old_name, new_name)


StatementMeta(pool, 0, 81, Finished, Available)

# Class Fraud Utils

In [82]:
class FraudUtils:
    '''The FraudUtils class provides methods to modify dataframes that assist the Fraud class in creating fraud-related dataframes.'''
    frauds_df = None

    def __init__ (self, frauds_df):
        self.frauds_df = frauds_df

    def join_csv(self,df_transactions):
        '''The "join_csv" method aims to merge two dataframes and filter the final dataframe to only include specific columns.'''

        self.frauds_df = self.frauds_df.join(df_transactions, ["id", "cliente_id", "valor", "data_transacao"], "left_outer")

        self.frauds_df = self.frauds_df.select(col("id"), col("cliente_id"), col("valor"), col("data_transacao"), col("fraude"))

    def add_category_column(self,column_name, condition_column):
        '''This method aims to add a column inside the dataframe based in some "when" condition.'''

        self.frauds_df = self.frauds_df.withColumn(column_name, when(col(condition_column) > 0, 'entrada').otherwise('saida'))

StatementMeta(pool, 0, 82, Finished, Available)

# Class Frauds

In [83]:
class Fraud():
    '''The Fraud class aims to provide different methods capable of detecting and analyzing potential frauds in banking transaction dataframes.'''
    frauds_df = None

    def __init__ (self, df_transactions):
        self.df_transactions = df_transactions

    def _identify_frauds(self):
        '''This method identifies frauds and creates an extra column in the original dataframe.'''

        # Partitioning the transactions df (in and out) by the "cliente_id" column and ordering by "data".
        windowSpec  = Window.partitionBy("cliente_id").orderBy("data_transacao")

        # Using the lag function to create a new column (based on the "data" column) with the previous row information
        self.fraud_df = self.df_transactions.withColumn("data_anterior",lag("data_transacao",1).over(windowSpec))

        # Transforming dates ("data" and "data_anterior") to seconds
        self.fraud_df = self.fraud_df\
            .withColumn("data_seg", unix_timestamp("data_transacao", "yyyy-MM-dd HH:mm:ss Z"))\
            .withColumn("data_anterior_seg", unix_timestamp("data_anterior", "yyyy-MM-dd HH:mm:ss Z"))

        # Calculating the difference between the previous rows
        self.fraud_df = self.fraud_df.withColumn("diff", col("data_seg") - col("data_anterior_seg"))

        # Creating a column that determines if there was fraud
        self.fraud_df = self.fraud_df.withColumn("fraude", when(col("diff") < 120, '1').otherwise('0'))

    def create_fraud_df(self):
        '''This method generates dataframes that include columns signaling the presence or absence of fraud suspicions.'''

        self._identify_frauds()
        column_name = 'tipo'
        condition_column = 'valor'

        self.fraud_utils_obj = FraudUtils(self.fraud_df)
        self.fraud_utils_obj.join_csv(self.df_transactions)
        self.fraud_utils_obj.add_category_column(column_name, condition_column)

        return self.fraud_utils_obj.frauds_df

StatementMeta(pool, 0, 83, Finished, Available)

# Class Database

In [84]:
class Database():
    '''
    The Database class aims to perform actions that manipulate tables and data from a specific database.
    Attention: To perform these actions, you need to connect through JDBC.
    '''

    def __init__(self, *, jdbc_url, database, username, password, driver):
        self.jdbc_url = jdbc_url
        self.database = database
        self.username = username
        self.password = password
        self.driver = driver

    def create_table(self, df, table_name):
        '''The "create_table" method aims to create a table in the specified database from an already defined dataframe.'''
        
        self.df = df
        self.table_name = table_name

        
        # Tries to create a table in the specified database.
        df.write \
            .format("jdbc") \
            .option("url", self.jdbc_url) \
            .option("database", self.database)\
            .option("dbtable", self.table_name) \
            .option("user", self.username) \
            .option("password", self.password) \
            .option("driver", self.driver) \
            .option("batchsize", 100) \
            .mode("overwrite") \
            .save()

StatementMeta(pool, 0, 84, Finished, Available)

# Create dataframes based in our relational model

In [85]:
csv_path = 'abfss://container@sheanalyzessdatalake.dfs.core.windows.net/dados_brutos/'

clients_df_obj = CsvDataframe(csv_path,'clients*')
clients_df_obj.read_csv(spark)
transactions_df_obj = CsvDataframe(csv_path,'transaction*')
transactions_df_obj.read_csv(spark)

StatementMeta(pool, 0, 85, Finished, Available)

# Clean Dataframes

In [86]:
clients_df = clients_df_obj.clean_data()
clients_df.show()
transactions_df_obj.rename_column('data','data_transacao')
transactions_df = transactions_df_obj.clean_data()

StatementMeta(pool, 0, 86, Finished, Available)

+---+--------------------+--------------------+-------------------+----------------+
| id|                nome|               email|      data_cadastro|        telefone|
+---+--------------------+--------------------+-------------------+----------------+
| 55|   edmilson da silva|edmilson-da-silva...|2019-08-30 00:54:33|+55(22)2922-2626|
| 78|maxson barros do ...|maxson-barros-do-...|2019-09-10 02:03:42|+55(22)2126-2529|
| 61| bruno cesar e silva|bruno-cesar-e-sil...|2019-08-30 01:05:21|+55(23)2528-2729|
|106|dernival passos d...|dernival-passos-d...|2019-09-27 02:40:14|+55(29)2927-2322|
|107| jose rubian de goes|jose-rubian-de-go...|2019-09-28 13:09:43|+55(22)2023-2620|
|108|angelica dos sant...|angelica-dos-sant...|2019-09-28 14:48:16|+55(20)2521-3030|
|109|          alanderson|alanderson_109@gm...|2019-09-29 00:15:12|+55(22)2323-2426|
|142|      silvio gabriel|silvio-gabriel_14...|2019-10-04 12:47:21|+55(22)3026-2123|
|144|jose carlos da si...|jose-carlos-da-si...|2019-10-06 14:24:2

# Saving in a cleaner CSV

In [87]:
csv_final_path_transac = 'abfss://container@sheanalyzessdatalake.dfs.core.windows.net/dados_finais/'
csv_final_path_client = 'abfss://container@sheanalyzessdatalake.dfs.core.windows.net/dados_finais/'
transactions_df_obj.save_csv_file('clean-transaction',csv_final_path_transac)
clients_df_obj.save_csv_file('clean-clients', csv_final_path_client)

StatementMeta(pool, 0, 87, Finished, Available)

# Find Frauds

In [88]:
column_name = 'categoria'
condition_column = 'valor'
frauds_df = Fraud(transactions_df).create_fraud_df()

StatementMeta(pool, 0, 88, Finished, Available)

# ConnectionDB Class

In [89]:
import pandas as pd
import pyodbc

class ConnectionDB:
    def __init__(self):
        self.conn = None
        self.cursor = None
    
    def connectToDatabase(self):
        try:
            self.conn = pyodbc.connect("Driver={ODBC Driver 18 for SQL Server};Server=tcp:projetosfinalserver.database.windows.net,1433;Database=ProjetoFinal;Uid=projetosfinalserver;Pwd={paamforgbwirj423r%};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;")
            self.cursor = self.conn.cursor()
            return  self.conn, self.cursor
        
        except Exception as e:
            print(f"Erro de conexão com banco: {e}")
    

    def closeConnection(self):
        self.cursor.close()
        self.conn.close()


    def create_clients_table(self):
        conn, cursor = self.connectToDatabase()
        try:
            # Verifica se a tabela já existe
            table_check_query = "IF OBJECT_ID('dbo.clientess', 'U') IS NULL CREATE TABLE clientess (id INT PRIMARY KEY, nome VARCHAR(100), email VARCHAR(200), data_cadastro DATETIMEOFFSET, telefone VARCHAR(20));"
            cursor.execute(table_check_query)
            conn.commit()

            print("Tabela clientes criada com sucesso!")
        except Exception as e:
            print(f"Erro para criar a tabela clientes: {e}")
        finally:
            self.closeConnection()


    def insert_clients_data(self,df):
        conn, cursor = self.connectToDatabase()
        try:

            # Loop através de cada linha no DataFrame
            for index, linha in df.iterrows():
                # Verifica se o registro já existe
                check_query = "SELECT COUNT(*) FROM clientess WHERE id = ?;"
                cursor.execute(check_query, linha[0])
                result = cursor.fetchone()[0]
                if result == 0:  # Se não existe, insere na tabela
                    # Inserindo uma linha na tabela
                    cursor.execute("INSERT INTO clientess ([id], [nome], [email], [data_cadastro], [telefone]) VALUES (?, ?, ?, CONVERT(DATETIMEOFFSET, ?, 127), ?);", linha[0], linha[1], linha[2], linha[3], linha[4])
                    conn.commit()
                    print(f"Registro com id={linha[0]} inserido com sucesso.")
                else:
                    print(f"Registro com id={linha[0]} já existe na tabela clientes. Não foi inserido novamente.")

        except Exception as e:
            print(f"Erro para inserir dados na tabela clientes: {e}")
        finally:
            self.closeConnection()


    def create_transactions_table(self):
        conn, cursor = self.connectToDatabase()
        try:
            # Verifica se a tabela já existe
            table_check_query = "IF OBJECT_ID('dbo.transacoess', 'U') IS NULL CREATE TABLE transacoess (id INT PRIMARY KEY, cliente_id INT, valor INT, data DATETIMEOFFSET, tipo VARCHAR(45), fraude INT);"
            cursor.execute(table_check_query)
            conn.commit()

            print("Tabela transacoes criada com sucesso!")
        except Exception as e:
            print(f"Erro para criar a tabela transacoes: {e}")
        finally:
            self.closeConnection()


    def insert_transactions_data(self,df):
        conn, cursor = self.connectToDatabase()
        try: 

            # Loop através de cada linha no DataFrame
            for index, linha in df.iterrows():
                # Verifica se o registro já existe
                check_query = "SELECT COUNT(*) FROM transacoess WHERE id = ?;"
                cursor.execute(check_query, linha[0])
                result = cursor.fetchone()[0]
                if result == 0:  # Se não existe, insere na tabela
                    # Inserindo uma linha na tabela
                    cursor.execute("INSERT INTO transacoess ([id], [cliente_id], [valor], [data], [tipo], [fraude]) VALUES (?, ?, ?, CONVERT(DATETIMEOFFSET, ?, 127), ?, ?);", linha[0], linha[1], linha[2], linha[3], linha[4], linha[5])
                    conn.commit()
                    print(f"Registro com id={linha[0]} inserido com sucesso.")
                else:
                    print(f"Registro com id={linha[0]} já existe na tabela transacoes. Não foi inserido novamente.")
        except Exception as e:
            print(f"Erro para inserir dados na tabela transacoes: {e}")
        finally:
            self.closeConnection()

StatementMeta(pool, 0, 89, Finished, Available)

# Connection on databse and creating tables

In [90]:
df_c = clients_df.toPandas()
df_f = frauds_df.toPandas()

conexao = ConnectionDB()
conexao.create_clients_table()
conexao.insert_clients_data(df_c)
conexao.create_transactions_table()
            
conexao.insert_transactions_data(df_f)

StatementMeta(pool, 0, 90, Finished, Available)

Tabela clientes criada com sucesso!
Registro com id=55 inserido com sucesso.
Registro com id=78 inserido com sucesso.
Registro com id=61 inserido com sucesso.
Registro com id=106 inserido com sucesso.
Registro com id=107 inserido com sucesso.
Registro com id=108 inserido com sucesso.
Registro com id=109 inserido com sucesso.
Registro com id=142 inserido com sucesso.
Registro com id=144 inserido com sucesso.
Registro com id=145 inserido com sucesso.
Registro com id=146 inserido com sucesso.
Registro com id=88 inserido com sucesso.
Registro com id=159 inserido com sucesso.
Registro com id=150 inserido com sucesso.
Registro com id=248 inserido com sucesso.
Registro com id=156 inserido com sucesso.
Registro com id=157 inserido com sucesso.
Registro com id=160 inserido com sucesso.
Registro com id=152 inserido com sucesso.
Registro com id=153 inserido com sucesso.
Registro com id=31 inserido com sucesso.
Registro com id=199 inserido com sucesso.
Registro com id=200 inserido com sucesso.
Reg

# End Spark Session

In [91]:
spark.stop()

StatementMeta(pool, 0, 91, Finished, Available)

spark.stop()