In [18]:
# Impor the necessary libs
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import sshtunnel
import socket
import sys
import json
from pyspark.sql.functions import current_timestamp, date_format, col, from_utc_timestamp
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType, LongType, DoubleType, IntegerType


StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 22, Finished, Available, Finished)

In [2]:
def cast_numeric_columns_to_common_type(df, target_type=DoubleType()):
    '''
    This function casts all numeric columns in the given DataFrame to a specified common data type.
    
    By default, the target type is DoubleType, but it can be changed by providing a different target_type.
    
    The function performs the following steps:
    1. Iterates through each column in the DataFrame schema.
    2. Checks if the column's data type is one of the numeric types (IntegerType, LongType, or DoubleType).
    3. If the column is numeric, it casts the column to the specified target type.
    4. Returns the DataFrame with the updated column types.

    Args:
        df (DataFrame): The input DataFrame containing various columns with different data types.
        target_type (DataType, optional): The target data type to cast numeric columns to. Defaults to DoubleType().

    Returns:
        DataFrame: The resulting DataFrame with numeric columns cast to the specified target type.
    '''

    # Iterate over each field in the DataFrame schema
    for field in df.schema.fields:
        # Check if the field's data type is a numeric type (IntegerType, LongType, or DoubleType)
        if isinstance(field.dataType, (IntegerType, LongType, DoubleType)):
            # Cast the column to the target type
            df = df.withColumn(field.name, col(field.name).cast(target_type))
    
    # Return the DataFrame with updated column types
    return df


StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 6, Finished, Available, Finished)

In [3]:
def insert_metadata(df):
    '''
    This function inserts metadata into the given DataFrame by adding a new column with the current timestamp 
    formatted according to the Brazilian time zone.

    The function performs the following steps:
    1. Defines the Brazilian time zone ("America/Sao_Paulo").
    2. Converts the current UTC timestamp to the Brazilian time zone.
    3. Formats the timestamp to a specific string format ("yyyyMMddHHmmss").
    4. Adds a new column "INGESTION_DATE" to the DataFrame with the formatted timestamp.
    5. Returns the DataFrame with the new metadata column.

    Args:
        df (DataFrame): The input DataFrame to which the metadata column will be added.

    Returns:
        DataFrame: The resulting DataFrame with an additional "INGESTION_DATE" column containing the formatted timestamp.
    '''

    # Define the Brazilian time zone
    brazilian_time_zone = "America/Sao_Paulo"
    
    # Convert the current UTC timestamp to the Brazilian time zone
    brazilian_timestamp = from_utc_timestamp(current_timestamp(), brazilian_time_zone)
    
    # Format the timestamp as "yyyyMMddHHmmss"
    formatted_timestamp = date_format(brazilian_timestamp, "yyyyMMddHHmmss")
    
    # Add a new column "INGESTION_DATE" to the DataFrame with the formatted timestamp
    df = df.withColumn("INGESTION_DATE", formatted_timestamp)
    
    # Return the DataFrame with the added metadata column
    return df


StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 7, Finished, Available, Finished)

In [5]:
def write_parquet_file(df, table_name):
    '''
    This function writes a Spark DataFrame to a Parquet file and saves it as a Delta table.

    The function performs the following steps:
    1. Extracts the `INGESTION_DATE` value from the DataFrame and uses it to construct the file path.
    2. Organizes the file path into a specific directory structure based on the year, month, day, and time.
    3. Writes the DataFrame to the constructed file path as a Parquet file.
    4. Saves the DataFrame as a Delta table with schema overwrite enabled.

    Args:
        df (DataFrame): The Spark DataFrame to be written.
        table_name (str): The name of the table to be used for constructing the file path and saving the Delta table.

    Returns:
        None
    '''

    # Extract the distinct 'INGESTION_DATE' from the DataFrame
    parquet_table_name = df.select('INGESTION_DATE').distinct().collect()[0][0]
    
    # Parse the 'INGESTION_DATE' into year, month, day, and time components
    year = parquet_table_name[0:4]
    month = parquet_table_name[4:6]
    day = parquet_table_name[6:8]
    time = parquet_table_name[8:]
    
    # Construct the file path using the parsed components and the table name
    file_path = f"abfss://6870845f-6c5b-4f2b-a193-d7c7bac67d3b@onelake.dfs.fabric.microsoft.com/a139e00c-b185-4563-9a2d-ccae704b3457/Files/{table_name}/{year}/{month}/{day}/{time}"
    
    # Write the DataFrame to the constructed file path as a Parquet file
    df.write.parquet(file_path)
    
    # Save the DataFrame as a Delta table with schema overwrite enabled
    df.write \
        .option("overwriteSchema", "true") \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(table_name, mode="overwrite", ifNotExists=True)


StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 9, Finished, Available, Finished)

In [6]:
table_names_dag = ''
table_names_dag = "['FPG010', 'FQ4010', 'STL010', 'SCP010']"


StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 10, Finished, Available, Finished)

In [7]:
print(table_names_dag)

StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 11, Finished, Available, Finished)

['FPG010', 'FQ4010', 'STL010', 'SCP010']


In [8]:
table_names = table_names_dag[1:-1].split(',')
table_names

StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 12, Finished, Available, Finished)

["'FPG010'", " 'FQ4010'", " 'STL010'", " 'SCP010'"]

In [10]:
def generate_table_schema(table_name):
    '''
    This function retrieves the schema of a specified table from a database.

    The function performs the following steps:
    1. Executes an SQL query to retrieve the schema information of the specified table from the `bronze_protheus_raw.tableschema` table.
    2. Returns the resulting schema as a Spark DataFrame.

    Args:
        table_name (str): The name of the table for which to retrieve the schema.

    Returns:
        DataFrame: A Spark DataFrame containing the schema information of the specified table.
    '''

    # Execute SQL query to retrieve the schema of the specified table
    tables_schema = spark.sql(f"SELECT * FROM bronze_protheus_raw.tableschema WHERE TABLE_NAME='{table_name}'")
    
    # Return the resulting schema as a Spark DataFrame
    return tables_schema


StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 14, Finished, Available, Finished)

In [9]:
# Types Documentation https://spark.apache.org/docs/latest/sql-ref-datatypes.html#:~:text=Spark%20SQL%20and%20DataFrames%20support%20the%20following%20data,6%20Interval%20types%20...%207%20Complex%20types%20

def get_mapped_lists(tables_schema):
    '''
    This function maps SQL column names and data types to PySpark data types and returns various lists used for query generation and schema creation.

    The function performs the following steps:
    1. Collects the `COLUMN_NAME` and `DATA_TYPE` from the provided table schema DataFrame.
    2. Maps the SQL data types to PySpark data types using a predefined mapping.
    3. Generates a list of column names.
    4. Removes specific columns from the list of column names (if required).
    5. Generates a comma-separated string of the final list of column names.

    Args:
        tables_schema (DataFrame): The Spark DataFrame containing the schema information of a table.

    Returns:
        tuple: A tuple containing the following elements:
            - mapped_columns (list): A list of tuples where each tuple contains a column name and its corresponding PySpark data type.
            - first_elements_list_final (list): A list of column names after removing any specified columns.
            - columns_string (str): A comma-separated string of the final list of column names.
    '''

    # Collect 'COLUMN_NAME' and 'DATA_TYPE' from the provided schema DataFrame
    aux_list = tables_schema.select('COLUMN_NAME', 'DATA_TYPE').collect()
    
    # Convert the collected rows into a list of tuples
    tuples_list = [tuple(row) for row in aux_list]

    # Define a mapping from SQL data types to PySpark data types
    data_type_map = {
        'varchar': StringType(),
        'varbinary': StringType(),
        'float': DoubleType(),
        'bigint': LongType(),
        'datetime': TimestampType()
    }
    
    # Map SQL column names and data types to PySpark data types
    mapped_columns = [(col[0], data_type_map[col[1]]) for col in tuples_list]

    # Generate a list of the first elements (column names) from the tuples
    first_elements_list = [item[0] for item in tuples_list]
    
    # Remove specific columns from the list (e.g., 'F1_ZJSONIN')
    columns_to_remove = ['F1_ZJSONIN']
    first_elements_list_final = list(set(first_elements_list) - set(columns_to_remove))

    # Create a comma-separated string of the final list of column names
    columns_string = ", ".join(first_elements_list_final)
    
    return mapped_columns, first_elements_list_final, columns_string


StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 13, Finished, Available, Finished)

In [11]:
def create_struct_type(mapped_columns):
    '''
    This function dynamically creates a PySpark `StructType` schema based on the provided mapped columns.

    The function performs the following steps:
    1. Iterates over the list of mapped columns to create a list of `StructField` objects.
    2. Combines the `StructField` objects into a `StructType` schema.
    3. Returns both the list of `StructField` objects and the constructed `StructType` schema.

    Args:
        mapped_columns (list): A list of tuples where each tuple contains a column name and its corresponding PySpark data type.

    Returns:
        tuple: A tuple containing the following elements:
            - fields (list): A list of `StructField` objects representing the columns and their data types.
            - schema (StructType): A `StructType` object representing the schema of the DataFrame.
    '''

    # Create a list of StructField objects from the mapped columns
    fields = [StructField(name, dtype, True) for name, dtype in mapped_columns]
    
    # Combine the StructField objects into a StructType schema
    schema = StructType(fields)
    
    # Return the list of StructField objects and the constructed StructType schema
    return fields, schema



StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 15, Finished, Available, Finished)

In [12]:
# Get secrets from Azure Key Vault
ssh_host = mssparkutils.credentials.getSecret('https://dataengineervault.vault.azure.net/','ssh-host')
ssh_port = mssparkutils.credentials.getSecret('https://dataengineervault.vault.azure.net/','ssh-port')
ssh_username = mssparkutils.credentials.getSecret('https://dataengineervault.vault.azure.net/','ssh-username')
ssh_password = mssparkutils.credentials.getSecret('https://dataengineervault.vault.azure.net/','ssh-password')
sql_host = mssparkutils.credentials.getSecret('https://dataengineervault.vault.azure.net/','sql-host')
sql_port = mssparkutils.credentials.getSecret('https://dataengineervault.vault.azure.net/','sql-port')
sql_username = mssparkutils.credentials.getSecret('https://dataengineervault.vault.azure.net/','sql-username')
sql_password = mssparkutils.credentials.getSecret('https://dataengineervault.vault.azure.net/','sql-password')
sql_database = mssparkutils.credentials.getSecret('https://dataengineervault.vault.azure.net/','sql-database')

StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 16, Finished, Available, Finished)

In [14]:
def generate_query_string(first_elements_list, table):
    '''
    This function generates an SQL SELECT query string for a specified table and list of columns.

    The function performs the following steps:
    1. Joins the list of column names into a comma-separated string.
    2. Forms the SELECT query using the provided table name and columns string.

    Args:
        first_elements_list (list): A list of column names to include in the SELECT query.
        table (str): The name of the table from which to select data.

    Returns:
        str: An SQL SELECT query string that can be executed against the specified table.
    '''

    # Join the list of column names into a comma-separated string
    columns_string = ", ".join(first_elements_list)

    # Forming the SELECT query using the table name and columns string
    query_string = f"SELECT {columns_string} FROM {table}"
    
    return query_string


StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 18, Finished, Available, Finished)

In [15]:

# Create SSH tunnel
with sshtunnel.SSHTunnelForwarder(
    (ssh_host, int(ssh_port)),
    ssh_username=ssh_username,
    ssh_password=ssh_password,
    remote_bind_address=(sql_host, int(sql_port))
) as tunnel:
    server = tunnel.local_bind_address[0]
    
    port = tunnel.local_bind_address[1]
    print(f'{server}:{port}')
    tunnel.skip_tunnel_checkup = False
    tunnel.start()
    tunnel.check_tunnels()
    print(tunnel.tunnel_is_up, flush=True)

    # Initialize Spark session with specific configurations
    spark = SparkSession.builder.appName("JDBCExample") \
        .config("spark.sql.parquet.vorder.enabled", "true") \
        .config("spark.microsoft.delta.optimizeWrite.enabled", "true") \
        .getOrCreate()

    for table in table_names:
        table = table.replace(' ','').replace('\'', '')
        print(table)
        tables_schema = generate_table_schema(table)

        mapped_columns, first_elements_list, columns_string = get_mapped_lists(tables_schema)
        #fields, schema = create_struct_type(mapped_columns)
        errors = []
        successes = []

        query_string = generate_query_string(first_elements_list,table)
        print(query_string)
        df = spark.read.format("jdbc") \
            .option("url", f"jdbc:sqlserver://;serverName={socket.gethostname()};port={port};databaseName={sql_database};encrypt=true;trustServerCertificate=true") \
            .option("query", query_string) \
            .option("user", sql_username) \
            .option("password", sql_password) \
            .load()
        

        df = cast_numeric_columns_to_common_type(df)
        df = insert_metadata(df)
        write_parquet_file(df,table)


StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 19, Finished, Available, Finished)

0.0.0.0:42701
{('0.0.0.0', 42701): True}
FPG010
SELECT FPG_VLUNIT, FPG_ZPLACA, FPG_DTENT, FPG_PROJET, FPG_OK, FPG_NATURE, FPG_FILIAL, FPG_VALOR, FPG_STATUS, FPG_PVNUM, FPG_DOCORI, FPG_DESPES, FPG_TAXAV, FPG_ZMULTR, FPG_TAXAP, FPG_TIPO, FPG_QUANT, FPG_DESCRI, FPG_VALTOT, FPG_OBRA, R_E_C_D_E_L_, FPG_NRAS, FPG_RECORI, FPG_CUSTO, FPG_COBRAT, FPG_JUNTO, FPG_COBRA, FPG_PVITEM, FPG_PRODUT, FPG_DTPAG, FPG_SEQ, FPG_USERGI, FPG_CODDES, FPG_USERGA, FPG_ZOBS, R_E_C_N_O_, D_E_L_E_T_ FROM FPG010
FQ4010
SELECT FQ4_NOME, FQ4_ZSUBST, FQ4_NOMTRA, FQ4_ZDESSU, FQ4_STSOLD, FQ4_SERVIC, FQ4_CODBEM, FQ4_OS, FQ4_DTFIM, FQ4_CODFAM, FQ4_PRELIB, FQ4_TPSERV, FQ4_SERREM, FQ4_DOCUME, FQ4_FABRIC, FQ4_CENTRA, FQ4_SERIE, FQ4_EST, FQ4_MUNIC, FQ4_DESTAT, FQ4_LOJCLI, R_E_C_D_E_L_, FQ4_POSCON, FQ4_STATUS, FQ4_AS, FQ4_SUBLOC, FQ4_FILIAL, FQ4_TIPMOD, FQ4_DTINI, FQ4_ZANAMU, FQ4_PREDES, FQ4_ZDTTIM, FQ4_LOG, FQ4_NFREM, FQ4_CODMUN, FQ4_NOMCLI, FQ4_SEQ, FQ4_ZUSER, FQ4_PROJET, R_E_C_N_O_, D_E_L_E_T_, FQ4_CODCLI, FQ4_OBRA FROM FQ40

In [None]:
# Create SSH tunnel
with sshtunnel.SSHTunnelForwarder(
    (ssh_host, int(ssh_port)),
    ssh_username=ssh_username,
    ssh_password=ssh_password,
    remote_bind_address=(sql_host, int(sql_port))
) as tunnel:
    '''
    Establishes an SSH tunnel to a remote SQL server, forwarding local connections to the remote server.

    This block:
    1. Creates and starts an SSH tunnel to forward local connections to the SQL server.
    2. Prints the local server and port information for verification.
    3. Ensures the tunnel is active and properly set up.

    Attributes:
        ssh_host (str): The SSH server hostname.
        ssh_port (int): The SSH server port.
        ssh_username (str): The username for SSH authentication.
        ssh_password (str): The password for SSH authentication.
        sql_host (str): The SQL server hostname.
        sql_port (int): The SQL server port.
    '''

    server = tunnel.local_bind_address[0]
    port = tunnel.local_bind_address[1]
    print(f'{server}:{port}')
    tunnel.skip_tunnel_checkup = False
    tunnel.start()
    tunnel.check_tunnels()
    print(tunnel.tunnel_is_up, flush=True)

    # Initialize Spark session with specific configurations
    spark = SparkSession.builder.appName("JDBCExample") \
        .config("spark.sql.parquet.vorder.enabled", "true") \
        .config("spark.microsoft.delta.optimizeWrite.enabled", "true") \
        .getOrCreate()

    '''
    Initializes a Spark session with configurations for parquet and delta optimizations.

    Attributes:
        spark (SparkSession): The Spark session object.
    '''

    for table in table_names:
        '''
        Processes each table in the provided list of table names by:
        1. Cleaning up the table name.
        2. Retrieving the schema of the table.
        3. Mapping columns and data types.
        4. Generating the SQL query string.
        5. Loading data from the SQL server into a DataFrame.
        6. Casting numeric columns to a common type.
        7. Inserting metadata.
        8. Writing the DataFrame to a Parquet file and a Delta table.

        Attributes:
            table_names (list): A list of table names to process.
        '''

        table = table.replace(' ', '').replace('\'', '')
        print(table)
        tables_schema = generate_table_schema(table)

        mapped_columns, first_elements_list, columns_string = get_mapped_lists(tables_schema)
        #fields, schema = create_struct_type(mapped_columns)
        errors = []
        successes = []

        query_string = generate_query_string(first_elements_list, table)
        print(query_string)
        df = spark.read.format("jdbc") \
            .option("url", f"jdbc:sqlserver://;serverName={socket.gethostname()};port={port};databaseName={sql_database};encrypt=true;trustServerCertificate=true") \
            .option("query", query_string) \
            .option("user", sql_username) \
            .option("password", sql_password) \
            .load()
        
        # Cast numeric columns to a common data type
        df = cast_numeric_columns_to_common_type(df)
        # Insert ingestion metadata into the DataFrame
        df = insert_metadata(df)
        # Write DataFrame to both Parquet file and Delta table
        write_parquet_file(df, table)


In [20]:
"""
This script establishes an SSH tunnel to securely connect to a remote SQL Server database, 
then initializes a Spark session to extract data from specified tables, process it, 
and write the resulting DataFrame as Parquet files.
 
The script performs the following steps:
1. Establishes an SSH tunnel using the `sshtunnel` library.
2. Initializes a Spark session with specific configurations for Parquet and Delta Lake optimization.
3. Iterates through a list of table names to:
    - Clean the table name.
    - Generate the schema and query string for the table.
    - Read the table data into a Spark DataFrame using JDBC.
    - Process the DataFrame (e.g., casting numeric columns, inserting metadata).
    - Write the DataFrame as a Parquet file.
"""
 
# Create SSH tunnel
with sshtunnel.SSHTunnelForwarder(
    (ssh_host, int(ssh_port)),
    ssh_username=ssh_username,
    ssh_password=ssh_password,
    remote_bind_address=(sql_host, int(sql_port))
) as tunnel:
    server = tunnel.local_bind_address[0]  # Get local server address
    port = tunnel.local_bind_address[1]    # Get local port address
    print(f'{server}:{port}')  # Output the server and port to confirm connection
 
    tunnel.skip_tunnel_checkup = False  # Ensure the tunnel performs a checkup
    tunnel.start()  # Start the SSH tunnel
    tunnel.check_tunnels()  # Check the status of the tunnel
    print(tunnel.tunnel_is_up, flush=True)  # Print tunnel status (True if up)
 
    # Initialize Spark session with specific configurations
    spark = SparkSession.builder.appName("JDBCExample") \
        .config("spark.sql.parquet.vorder.enabled", "true") \
        .config("spark.microsoft.delta.optimizeWrite.enabled", "true") \
        .getOrCreate()
 
    # Iterate over the list of table names
    for table in table_names:
        table = table.replace(' ', '').replace('\'', '')  # Clean the table name
        print(table)  # Print the cleaned table name
 
        tables_schema = generate_table_schema(table)  # Generate the schema for the table
 
        # Get the mapped columns, first elements list, and columns string
        mapped_columns, first_elements_list, columns_string = get_mapped_lists(tables_schema)
 
        # Initialize lists for errors and successes
        errors = []
        successes = []
 
        # Generate the SQL query string for the table
        query_string = generate_query_string(first_elements_list, table)
        print(query_string)  # Print the generated query string
 
        # Read data from the table into a Spark DataFrame using JDBC
        df = spark.read.format("jdbc") \
            .option("url", f"jdbc:sqlserver://;serverName={socket.gethostname()};port={port};databaseName={sql_database};encrypt=true;trustServerCertificate=true") \
            .option("query", query_string) \
            .option("user", sql_username) \
            .option("password", sql_password) \
            .load()
 
        # Process the DataFrame
        df = cast_numeric_columns_to_common_type(df)  # Cast numeric columns to a common type
        df = insert_metadata(df)  # Insert metadata into the DataFrame
 
        # Write the processed DataFrame as a Parquet file
        write_parquet_file(df, table)

StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 24, Finished, Available, Finished)

0.0.0.0:38975
{('0.0.0.0', 38975): True}
FPG010
SELECT FPG_VLUNIT, FPG_ZPLACA, FPG_DTENT, FPG_PROJET, FPG_OK, FPG_NATURE, FPG_FILIAL, FPG_VALOR, FPG_STATUS, FPG_PVNUM, FPG_DOCORI, FPG_DESPES, FPG_TAXAV, FPG_ZMULTR, FPG_TAXAP, FPG_TIPO, FPG_QUANT, FPG_DESCRI, FPG_VALTOT, FPG_OBRA, R_E_C_D_E_L_, FPG_NRAS, FPG_RECORI, FPG_CUSTO, FPG_COBRAT, FPG_JUNTO, FPG_COBRA, FPG_PVITEM, FPG_PRODUT, FPG_DTPAG, FPG_SEQ, FPG_USERGI, FPG_CODDES, FPG_USERGA, FPG_ZOBS, R_E_C_N_O_, D_E_L_E_T_ FROM FPG010
FQ4010
SELECT FQ4_NOME, FQ4_ZSUBST, FQ4_NOMTRA, FQ4_ZDESSU, FQ4_STSOLD, FQ4_SERVIC, FQ4_CODBEM, FQ4_OS, FQ4_DTFIM, FQ4_CODFAM, FQ4_PRELIB, FQ4_TPSERV, FQ4_SERREM, FQ4_DOCUME, FQ4_FABRIC, FQ4_CENTRA, FQ4_SERIE, FQ4_EST, FQ4_MUNIC, FQ4_DESTAT, FQ4_LOJCLI, R_E_C_D_E_L_, FQ4_POSCON, FQ4_STATUS, FQ4_AS, FQ4_SUBLOC, FQ4_FILIAL, FQ4_TIPMOD, FQ4_DTINI, FQ4_ZANAMU, FQ4_PREDES, FQ4_ZDTTIM, FQ4_LOG, FQ4_NFREM, FQ4_CODMUN, FQ4_NOMCLI, FQ4_SEQ, FQ4_ZUSER, FQ4_PROJET, R_E_C_N_O_, D_E_L_E_T_, FQ4_CODCLI, FQ4_OBRA FROM FQ40

In [16]:
for table in table_names:
    table = table.replace(' ','').replace('\'', '')
    print(table)

StatementMeta(, 025c3015-e649-4543-86ef-80d9a6d021e9, 20, Finished, Available, Finished)

FPG010
FQ4010
STL010
SCP010
