In [1]:
import glob
import os
import pyodbc
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dotenv import load_dotenv

Starting Spark Session

In [None]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Starting Spark Session") \
    .config('spark.ui.port', '4051') \
    .getOrCreate()

Creating variables for raw data directories

In [3]:
clients = "Data/Clients"

transactions_in = "Data/Transactions-in"
transactions_out = "Data/Transactions-out"

Creating variables for DataFrames schemas

In [4]:
clients_schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("nome", StringType(), True),
        StructField("email", StringType(), True),
        StructField("data_cadastro", TimestampType(), True),
        StructField("telefone", StringType(), True)
    ])

transactions_schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("cliente_id", IntegerType(), True),
        StructField("valor", DoubleType(), True),
        StructField("data", TimestampType(), True),
    ])

Function to load .csv files converting them into Spark DataFrames

In [5]:
def transform_csv_to_df(spark, path, schema):
    if not os.path.isdir(path):
        raise ValueError(f"{path} is not a valid directory.")

    list_paths_csv = glob.glob(os.path.join(path, '*.csv'))

    if not list_paths_csv:
        raise ValueError(f"No csv files found in {path}.")

    df = spark.read.csv(list_paths_csv, sep=';', schema=schema, inferSchema=True)

    df = df.filter(~col('id').contains('id'))

    return df

Function to check if there is null or missing data in the DataFrames, if so, the data is filled in

In [6]:
def verify_empty_data(df):
    for col_name in df.columns:
        data_type = df.schema[col_name].dataType
        if data_type == StringType():
            count_empty = df.filter((col(col_name) == '') | isnull(col_name) | isnan(col_name) | (col(col_name).isNull())).count()
            if count_empty != 0:
                print(f"Column '{col_name}' has {count_empty} empty/null/none/NaN values.")
                df = df.fillna({col_name: 'Not Informed'})
        elif data_type == IntegerType():
            count_null = df.filter(col(col_name).isNull()).count()
            if count_null != 0:
                print(f"Column '{col_name}' has {count_null} null values.")
                df = df.fillna({col_name: 0})
        elif data_type == DoubleType():
            count_null = df.filter(col(col_name).isNull()).count()
            if count_null != 0:
                print(f"Column '{col_name}' has {count_null} null values.")
                df = df.fillna({col_name: 0.00})
        elif data_type == TimestampType():
            count_null = df.filter(col(col_name).isNull()).count()
            if count_null != 0:
                print(f"Column '{col_name}' has {count_null} null values.")
                df = df.fillna({col_name: '1900-01-01 00:00:00'})
    return df

Function to standardize the value column data of the transactions_in and transactions_out DataFrames, correcting them so that they all have two decimal places and an absolute value

In [7]:
def correcting_data(df):
    df = df.withColumn("value", round(col("value"), 2))
    df = df.withColumn("value", expr('abs(value)'))
    return df

Function to create the state column in the DataFrame clients from the DDD number extracted from the phone_number column

In [8]:
def add_state_column(df):
    df = df.withColumn('DDD', split(df['phone_number'], r'[()]+').getItem(1))
    df = df.withColumn('state', when(col('DDD') == '20', 'Paraíba')
                        .when(col('DDD') == '21', 'Rio de Janeiro')
                        .when(col('DDD') == '22', 'Mato Grosso')
                        .when(col('DDD') == '23', 'Pernambuco')
                        .when(col('DDD') == '24', 'Rio de Janeiro')
                        .when(col('DDD') == '25', 'Bahia')
                        .when(col('DDD') == '26', 'Minas Gerais')
                        .when(col('DDD') == '27', 'Espírito Santo')
                        .when(col('DDD') == '28', 'Roraima')
                        .when(col('DDD') == '29', 'São Paulo')
                        .when(col('DDD') == '30', 'Maranhão')
                        .otherwise('Invalid'))
    df = df.drop('DDD')
    return df

Function to format customer names, dividing into two columns called first and last name, checking those that do not have a last name and filling in with 'Not informed'

In [9]:
def format_names(df):
    df = df.withColumn("name_split", split(df.name, " "))
    df = df.withColumn("name", df.name_split[0])
    df = df.withColumn("last_name1", df.name_split[1])
    df = df.withColumn("last_name2", df.name_split[2])
    df = df.withColumn("last_name3", df.name_split[3])
    df = df.withColumn("last_name4", df.name_split[4])
    df = df.withColumn("last_name5", df.name_split[5])
    df = df.withColumn("last_name6", df.name_split[6])
    df = df.withColumn("last_name", concat_ws(" ", "last_name1", "last_name2", "last_name3", "last_name4", "last_name5", "last_name6"))
    df = df.drop("name_split", "last_name1", "last_name2", "last_name3", "last_name4", "last_name5", "last_name6")
    df = df.withColumn("name", initcap(df.name))
    df = df.withColumn("last_name", initcap(df.last_name))
    df = df.withColumn("last_name", when(df.last_name == "", "Not Informed").otherwise(df.last_name))
    df = df.select("id", "name", "last_name", "email", "date_time_register", "phone_number", "state")
    return df

Function to check if all existing customer ids in the transactions_in and transactions_out DataFrames correspond to an id in the clients DataFrame and if it does not exist, the id is added to the clients DataFrame filling the other columns with 'Not found'

In [10]:
def verify_client_id_existence(df_transactions, df_clients):
    df_ids_transactions = df_transactions.select(col('client_id'))
    df_ids_clients = df_clients.select(col('id'))
    df_new_clients = df_ids_transactions.join(df_ids_clients, df_ids_transactions.client_id == df_ids_clients.id, "leftanti")
    df_new_clients = df_new_clients.distinct()
    df_new_clients = df_new_clients.withColumnRenamed("client_id", "id")
    df_new_clients = df_new_clients.withColumn('name', lit('Not found'))
    df_new_clients = df_new_clients.withColumn('last_name', lit('Not found'))
    df_new_clients = df_new_clients.withColumn('email', lit('Not found'))
    df_new_clients = df_new_clients.withColumn('date_time_register', lit('1900-01-01 00:00:00').cast('timestamp'))
    df_new_clients = df_new_clients.withColumn('phone_number', lit('Not found'))
    df_new_clients = df_new_clients.withColumn('state', lit('Not found'))
    df_clients = df_clients.unionAll(df_new_clients)
    return df_clients

Function to rename DataFrame columns

In [11]:
def renamed_column(df, previous_column, new_column):
     return df.withColumnRenamed(previous_column, new_column)

Function to connect to SQL Server database on Azure

In [12]:
def connection_database():
    load_dotenv()
    server_name = os.environ["server_name"]
    database_name = os.environ["database_name"]
    username = os.environ["username"]
    password = os.environ["password"]

    connection_string = f"Driver={{ODBC Driver 18 for SQL Server}};\
        Server=tcp:{server_name},1433;\
        Database={database_name};\
        Uid={username};\
        Pwd={password};\
        Encrypt=yes;\
        TrustServerCertificate=no;Connection Timeout=30;"
    return pyodbc.connect(connection_string)

Function to create the clients table in SQL Server database on Azure

In [13]:
def create_table_clients(conn):
    cursor = conn.cursor()
    cursor.execute(f"SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'clients'")
    if cursor.fetchone()[0] == 0:
        create_table_query = f"CREATE TABLE clients (\
                                    id INTEGER PRIMARY KEY,\
                                    name VARCHAR(255),\
                                    last_name VARCHAR(255),\
                                    email VARCHAR(255),\
                                    date_time_register DATETIME,\
                                    phone_number VARCHAR(255),\
                                    state VARCHAR(255)\
                                    );"

        cursor.execute(create_table_query)
        conn.commit()
        print("clients table successfully created!")
    else:
        print("clients table is already in the database!")

Function to create the transactions_in and transactions_out tables in SQL Server database on Azure

In [14]:
def create_table_transactions(conn, name_table):
    cursor = conn.cursor()
    cursor.execute(f"SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{name_table}'")
    if cursor.fetchone()[0] == 0:
        create_table_query = f"CREATE TABLE {name_table} (\
                                id INTEGER PRIMARY KEY,\
                                client_id INTEGER REFERENCES clients (id),\
                                value DECIMAL(10,2),\
                                date_time DATETIME,\
                            );"

        cursor.execute(create_table_query)
        conn.commit()
        print(f"{name_table} table successfully created!")
    else: 
        print(f"{name_table} table is already in the database!")

Function to insert DataFrames into existing tables in SQL Server database on Azure

In [15]:
def insert_df_into_db(conn, df, name_table):
    try:
        cursor = conn.cursor()
        columns = ",".join(df.columns)      
        placeholders = ",".join("?" for _ in df.columns) 
        df = df.rdd.collect()

        for values in df:
            cursor = conn.cursor()
            cursor.execute(f"INSERT INTO {name_table} ({columns}) VALUES ({placeholders})", values)
            cursor.commit()
        print("The data has been successfully inserted into the table.")
    except pyodbc.IntegrityError:
        print(f"This data already exists in the database!")
        conn.rollback()
    except Exception as e:
        print(f"An error occurred while inserting data into the table: {e}")
        conn.rollback()
    finally:
        cursor.close()

The main script of the application that uses the previously defined functions

In [None]:
try:
    print("Transforming CSV files into DataFrame....")
    df_clients = transform_csv_to_df(spark, clients, clients_schema)
    df_transactions_in = transform_csv_to_df(spark, transactions_in, transactions_schema)
    df_transactions_out = transform_csv_to_df(spark, transactions_out , transactions_schema)
    print("OK")

    print("Renaming DataFrames columns...")
    df_clients = renamed_column(df_clients,"nome", "name")
    df_clients = renamed_column(df_clients,"data_cadastro", "date_time_register")
    df_clients = renamed_column(df_clients,"telefone", "phone_number")

    df_transactions_in = renamed_column(df_transactions_in, "cliente_id", "client_id")
    df_transactions_in = renamed_column(df_transactions_in, "valor", "value")
    df_transactions_in = renamed_column(df_transactions_in, "data", "date_time")

    df_transactions_out = renamed_column(df_transactions_out, "cliente_id", "client_id")
    df_transactions_out = renamed_column(df_transactions_out, "valor", "value")
    df_transactions_out = renamed_column(df_transactions_out, "data", "date_time")
    print("OK")

    print("Checking for missing data in DataFrames columns...")
    df_clients = verify_empty_data(df_clients)
    df_transactions_in = verify_empty_data(df_transactions_in)
    df_transactions_out = verify_empty_data(df_transactions_out)
    print("OK")

    print("Correcting data in the value column of transactions DataFrames..")
    df_transactions_in = correcting_data(df_transactions_in)
    df_transactions_out = correcting_data(df_transactions_out)
    print("OK")

    print("Formatting the clients DataFrame...")
    df_clients = add_state_column(df_clients)
    df_clients = format_names(df_clients)
    df_clients = verify_client_id_existence(df_transactions_in, df_clients)
    df_clients = verify_client_id_existence(df_transactions_out, df_clients)
    print("OK")
    
    try:
        print("Connecting to the database...")
        conn = connection_database()
        print("OK")
    except Exception as e:
        print(f"Unable to connect to the database! The following error occurred: {e}")
    else:
        print("\nCreating clients table in database...")
        create_table_clients(conn)
        
        print("\nInserting data into the table...")
        insert_df_into_db(conn, df_clients, "clients")
            
        print("\nCreating the transaction_in table in the database...")
        create_table_transactions(conn, "transactions_in")

        print("\nInserting data into the table...")
        insert_df_into_db(conn, df_transactions_in, "transactions_in")
    
        print("\nCreating the transaction_out table in the database...")
        create_table_transactions(conn, "transactions_out")

        print("\nInserting data into the table...")
        insert_df_into_db(conn, df_transactions_out, "transactions_out")
       
    print("\n")
    print("-" * 30)
    print("Transactions in")
    df_transactions_in.show()
    print("-" * 30)
    print("Transactions out")
    df_transactions_out.show()
    print("-" * 30)
    print("Clients")
    df_clients.show()
    
except Exception as e:
    print(f"The following error occurred: {e}!")