# ITABLES

In [1]:
# to enable itables
from itables import init_notebook_mode

init_notebook_mode(all_interactive=True)

# CONFIGURACION DELTA SPARK

In [2]:
import os
import shutil

from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession


def ensure_directories_exist(warehouse_dir, metastore_db_path):
    """
    Ensures the necessary directories for the warehouse and metastore exist.

    Parameters:
        warehouse_dir (str): Path to the warehouse directory (Spark catalog).
        metastore_db_path (str): Path to the metastore database.
    """
    os.makedirs(warehouse_dir, exist_ok=True)
    os.makedirs(os.path.dirname(metastore_db_path), exist_ok=True)

def create_spark_session(
    app_name="DeltaCatalog",
    warehouse_dir="./warehouse-spark/spark_catalog",
    metastore_db_path="./warehouse-spark/metastore_db",
):
    """
    Creates and initializes a SparkSession with Delta Lake support and persistent metastore.

    Parameters:
        app_name (str): Name of the Spark application.
        warehouse_dir (str): Path to the Spark catalog warehouse directory.
        metastore_db_path (str): Path to the persistent metastore database (Derby).

    Returns:
        SparkSession: Configured SparkSession instance.
    """
    # Ensure required directories exist
    ensure_directories_exist(warehouse_dir, metastore_db_path)

    # Configure SparkSession with Delta Lake
    builder = (
        SparkSession.builder.appName(app_name)
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
        .config("spark.sql.parquet.compression.codec", "gzip")
        .config("spark.databricks.delta.optimizeWrite.enabled", "true")
        .config("spark.databricks.delta.autoCompact.enabled", "true")
        .config("spark.sql.warehouse.dir", os.path.abspath(warehouse_dir))
        .config(
            "javax.jdo.option.ConnectionURL",
            f"jdbc:derby:{os.path.abspath(metastore_db_path)};create=true",
        )
    )  # .config("spark.sql.catalogImplementation", "hive") \
    # .enableHiveSupport()

    # Initialize Spark with Delta
    spark = configure_spark_with_delta_pip(builder).getOrCreate()

    print(
        f"SparkSession created with Delta and persistent metastore at: {warehouse_dir}"
    )
    return spark

# CREAR DATABASE AND DELTA TABLES

In [3]:
def create_database(spark_session, database_name):
    """
    Creates a database if it does not already exist.

    Parameters:
        spark_session (SparkSession): The active SparkSession.
        database_name (str): The name of the database to be created.
    """
    spark_session.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
    print(f"Database '{database_name}' created or already exists.")


def list_all_databases_and_tables(spark_session):
    """
    Lists all databases and their tables in Spark.

    Parameters:
        spark_session (SparkSession): The active SparkSession.

    Returns:
        dict: A dictionary where keys are database names and values are lists of table names in each database.
    """
    # List all databases
    databases = spark_session.catalog.listDatabases()

    # Create a dictionary to store database names and their corresponding table lists
    database_tables = {}

    print("The following databases and tables are present in the Spark Catalog.")
    print()

    for database in databases:
        # Set the current database to the specified database
        spark_session.sql(f"USE {database.name}")

        # List all tables in the database
        tables = spark_session.catalog.listTables(database.name)

        # Extract table names from the list of table objects
        table_names = [table.name for table in tables]

        # Store the tables for the current database
        database_tables[database.name] = table_names

        # Print the database and tables
        for table in table_names:
            print(f"Database: {database.name}, Table: {table}")
            print()

    return database_tables

# CREATE SPARK DATAFRAME

In [4]:
def create_dataframe_from_list_dict_using_alphabetical_order_from_columns(
    spark_session, list_data_dict
):
    """
    Creates a Spark DataFrame from a list of dictionaries, reordering columns in alphabetical order.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        list_data_dict (list): A list of dictionaries, where each dictionary represents a row.

    Returns:
        DataFrame: The created Spark DataFrame with columns in alphabetical order.
    """
    if list_data_dict:
        df = spark_session.createDataFrame(list_data_dict)
        return df
    else:
        raise ValueError("The input list is empty.")


def create_dataframe_from_list_dict(spark_session, list_data_dict):
    """
    Creates a Spark DataFrame from a list of dictionaries, preserving the order of the keys.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        list_data_dict (list): A list of dictionaries, where each dictionary represents a row.

    Returns:
        DataFrame: The created Spark DataFrame with columns in the order of the keys.
    """
    if not list_data_dict:
        raise ValueError("The input list is empty.")

    # Get the order of keys from the first dictionary
    columns_order = list(list_data_dict[0].keys())

    # Create the DataFrame and reorder columns
    df = spark_session.createDataFrame(list_data_dict)
    df = df.select(*columns_order)  # Reorder columns explicitly

    return df


def split_spark_dataframe(spark_dataframe, num_parts):
    """
    Splits a Spark DataFrame into the specified number of parts, ensuring each part has at least one row.
    If the requested number of parts exceeds the total rows, it creates as many balanced parts as possible.

    Parameters:
        spark_dataframe (DataFrame): The Spark DataFrame to be split.
        num_parts (int): The desired number of parts to split the DataFrame into.

    Returns:
        List[DataFrame]: A list containing the split DataFrames.
    """
    total_rows = spark_dataframe.count()

    if total_rows == 0:
        print("The DataFrame is empty. No parts created.")
        return []

    # Adjust number of parts if more parts are requested than rows
    actual_parts = min(num_parts, total_rows)

    # Calculate base rows per part and distribute remaining rows
    rows_per_part = total_rows // actual_parts
    extra_rows = total_rows % actual_parts

    split_dataframes = []
    start_row = 0
    
    for i in range(actual_parts):
        # Calculate rows for the current part
        rows_in_this_part = rows_per_part + (1 if i < extra_rows else 0)
        end_row = start_row + rows_in_this_part
        
        # Select the rows for the current part using offset and limit
        split_dataframes.append(
            spark_dataframe.offset(start_row).limit(rows_in_this_part)
        )
        
        start_row = end_row  # Update start row for the next part

    print(f"Successfully created {len(split_dataframes)} DataFrames.")
    return split_dataframes

# CREAR TABLAS DELTA

In [5]:
import os
import shutil

def create_delta_table_with_spark_dataframe_and_register(
    spark_session, database_name, table_name, spark_dataframe, warehouse_dir, partition_by=None
):
    """
    Creates a Delta table in the specified database and registers it with SQL.

    Parameters:
        spark_session (SparkSession): The active SparkSession.
        database_name (str): The name of the database where the table will be created.
        table_name (str): The name of the table to be created.
        spark_dataframe (DataFrame): The Spark DataFrame whose data will be used to create the table.
        warehouse_dir (str): The root directory for the warehouse.
        partition_by (list, optional): List of column names to partition the table by (default is None).
    """
    # Ensure the database exists or create it
    available_databases = [db.name for db in spark_session.catalog.listDatabases()]
    if database_name not in available_databases:
        spark_session.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

    # Set the active database
    spark_session.sql(f"USE {database_name}")

    # Define the table path based on the database and table name
    table_path = f"{warehouse_dir}/{database_name}/{table_name}"

    write_options = spark_dataframe.write.format("delta").mode("overwrite")

    # Apply partitioning if specified
    if partition_by:
        write_options.partitionBy(*partition_by)

    # Save the DataFrame as a Delta table in the location
    write_options.save(table_path)

    # Register the Delta table in the Spark catalog using SQL
    spark_session.sql(
        f"""
        CREATE OR REPLACE TABLE {table_name}
        USING DELTA
        AS
        SELECT * FROM parquet.`{table_path}`;
    """
    )

    # Delete the files generated in the table path
    if os.path.exists(table_path):
        shutil.rmtree(table_path)
        print(f"Temporary files at '{table_path}' have been deleted.")
    else:
        print(f"No temporary files found at '{table_path}'.")

    print(
        f"Table '{table_name}' created and registered at '{table_path}' in database '{database_name}' with partitioning by {partition_by if partition_by else 'None'}."
    )
    
def save_dataframe_as_parquet(spark_dataframe, file_name, file_path, partition_by=None):
    """
    Saves a Spark DataFrame to the specified path in Parquet format.

    Parameters:
        spark_dataframe (DataFrame): The Spark DataFrame to be saved.
        file_name (str): The name of the file (or dataset) to be created.
        file_path (str): The location where the file will be stored.
        partition_by (list, optional): List of column names to partition the data by (default is None).
    """
    write_options = spark_dataframe.write.format("parquet").mode("overwrite")

    if partition_by:
        write_options.partitionBy(*partition_by)
        
    write_options.save(file_path)
    print(f"DataFrame saved as Delta Parquet at '{file_path}/{file_name}' with partitioning by {partition_by if partition_by else 'None'}.")

def save_dataframe_as_delta_parquet(spark_dataframe, file_name, file_path, partition_by=None):
    """
    Saves a Spark DataFrame to the specified path in Delta Parquet format.

    Parameters:
        spark_dataframe (DataFrame): The Spark DataFrame to be saved.
        file_name (str): The name of the file (or dataset) to be created.
        file_path (str): The location where the file will be stored.
        partition_by (list, optional): List of column names to partition the data by (default is None).
    """
    write_options = spark_dataframe.write.format("delta").mode("overwrite")

    # Apply partitioning if partition_by is provided
    if partition_by:
        write_options.partitionBy(*partition_by)

    # Save the DataFrame as a Delta Parquet file at the specified location
    write_options.save(f"{file_path}/{file_name}")

    print(f"DataFrame saved as Delta Parquet at '{file_path}/{file_name}' with partitioning by {partition_by if partition_by else 'None'}.")


def create_delta_table_in_database(spark_session, database_name, table_name, spark_dataframe, warehouse_dir, partition_by=None):
    """
    Creates a Delta table in a Spark database, ensuring that any existing table or directory is deleted before creating
    the new table. The table is created with the delta.enableChangeDataFeed property enabled.

    Parameters:
    spark_session (SparkSession): The active Spark session to run SQL queries.
    database_name (str): The name of the database where the table will be stored.
    table_name (str): The name of the table to be created.
    spark_dataframe (DataFrame): The DataFrame to be saved as a Delta table.
    warehouse_dir (str): The base directory where Delta tables are stored.
    partition_by (list, optional): List of columns to partition the table by. If not provided, the table will not be partitioned.

    Exceptions:
    - If the specified partition columns do not exist in the DataFrame, a ValueError will be raised.
    """
    
    # Define the path for the Delta table
    table_path = f"{warehouse_dir}/{database_name}/{table_name}"
    full_table_name = f"{database_name}.{table_name}"

    # Remove the table directory if it exists (manually)
    if os.path.exists(table_path):
        shutil.rmtree(table_path)  # Remove the directory and its contents
        print(f"Directory for table '{full_table_name}' deleted.")

    # Drop the table from the database if it exists (to avoid conflicts)
    try:
        spark_session.sql(f"DROP TABLE IF EXISTS {full_table_name}")
        print(f"Table '{full_table_name}' dropped.")
    except Exception as e:
        print(f"Could not drop the table: {str(e)}")

    # Write the new Delta table with overwrite mode
    write_options = spark_dataframe.write.format("delta").mode("overwrite")

    # Check if partitioning is specified and verify columns exist in the DataFrame
    if partition_by:
        missing_columns = [col for col in partition_by if col not in spark_dataframe.columns]
        if missing_columns:
            raise ValueError(f"The following partition columns do not exist in the DataFrame: {missing_columns}")
        write_options = write_options.partitionBy(*partition_by)

    # Save the Delta table in the database
    write_options.saveAsTable(full_table_name)

    # Enable Change Data Feed on the Delta table
    try:
        spark_session.sql(f"ALTER TABLE {full_table_name} SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')")
        print(f"Property 'delta.enableChangeDataFeed' enabled for table '{full_table_name}'.")
    except Exception as e:
        print(f"Error enabling 'delta.enableChangeDataFeed': {str(e)}")

    print(f"Table '{table_name}' created in database '{database_name}' at '{table_path}' with partitioning by {partition_by if partition_by else 'none'}.")

def read_parquet_or_delta_file(spark_session, directory_path):
    """
    Reads a Parquet or Delta file from the specified directory path.

    This function attempts to read a Parquet file from the provided path using the Spark session. 
    If the operation fails (e.g., the file does not exist or the format is incompatible), 
    an error message is displayed, and None is returned.

    Parameters:
        spark_session (SparkSession): The active Spark session used for reading the file.
        directory_path (str): The path to the directory containing the Parquet or Delta file.

    Returns:
        DataFrame: The DataFrame containing the file's data if successfully read.
        None: If an error occurs during the file reading process.

    Example:
        # Create a Spark session
        spark = SparkSession.builder.appName("ReadExample").getOrCreate()

        # Read a Parquet or Delta file
        df = read_parquet_or_delta_file(spark, "/path/to/directory")

        if df is not None:
            df.show()
        else:
            print("Failed to read the file.")
    """
    try:
        df = spark_session.read.parquet(directory_path)
        print(f"Successfully read Parquet file from '{directory_path}'.")
        return df
    except Exception as e:
        print(f"Error reading file from '{directory_path}': {e}")
        return None
        
def execute_spark_sql_query(
    spark_session: SparkSession, query: str
) -> "pyspark.sql.dataframe.DataFrame":
    """
    Execute a SQL query and return the results as a DataFrame.

    :param spark_session: The SparkSession object.
    :param query: The SQL query to execute.
    :return: A PySpark DataFrame containing the query results.
    """
    return spark_session.sql(query)

# CRUD SPARK DATAFRAMES

In [6]:
from pyspark.sql.functions import expr, col
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
import pandas as pd
from pandas import Timestamp

def is_delta_table(spark_session: SparkSession, delta_table_path: str) -> bool:
    """
    Checks if a given path corresponds to a Delta table.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The file path to check.

    Returns:
        bool: True if the path is a Delta table, False otherwise.
    """
    return (
        DeltaTable.forPath(spark_session, delta_table_path)
        if DeltaTable.isDeltaTable(spark_session, delta_table_path)
        else False
    )


def restore_delta_lake_to_version(
    spark_session: SparkSession, delta_table_path: str, version: int = None, timestamp: str = None
):
    """
    Restores a Delta table to a specific version or timestamp.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The Delta table path.
        version (int, optional): The version to restore to.
        timestamp (str, optional): The timestamp to restore to in ISO format.

    Returns:
        pandas.DataFrame: The updated Delta table details or history.
    """
    try:
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            delta_table.optimize().executeCompaction()
            history_df = delta_table.history().toPandas()
            available_versions = history_df["version"].tolist()
            available_timestamps = history_df["timestamp"].tolist()

            if version is not None:
                if version not in available_versions:
                    print(f"Error: Version {version} does not exist.")
                    return history_df
                delta_table.restoreToVersion(version)
                print(f"Restored to version {version}.")
                return delta_table.detail().toPandas()

            if timestamp is not None:
                timestamp_obj = Timestamp(timestamp)
                if timestamp_obj not in available_timestamps:
                    print(f"Error: Timestamp {timestamp} does not exist.")
                    return history_df
                delta_table.restoreToTimestamp(timestamp)
                print(f"Restored to timestamp {timestamp}.")
                return delta_table.detail().toPandas()

            return history_df
        else:
            print(f"{delta_table_path} does not contain a Delta table.")
    except Exception as e:
        print(f"Error restoring Delta table: {str(e)}")

def write_into_delta_lake(
    spark_session: SparkSession, delta_table_path: str, spark_dataframe
):
    """
    Writes data into a Delta table, avoiding duplicates by comparing existing columns 
    and adding new columns if necessary.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The Delta table path.
        spark_dataframe (DataFrame): The Spark DataFrame to write.

    Returns:
        None
    """
    try:
        # Check if the path corresponds to a Delta table
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            # Optimize the Delta table before processing
            delta_table.optimize().executeCompaction()

            # Load the existing data from the Delta table
            existing_data = spark_session.read.format("delta").load(delta_table_path)

            # Identify common columns between the existing data and the new data
            common_columns = list(set(existing_data.columns).intersection(set(spark_dataframe.columns)))

            # Join the existing data with the new data on the common columns
            # We will use 'left_anti' to find new records based on differences
            new_rows = spark_dataframe.join(
                existing_data, on=common_columns, how="left_anti"
            )

            # If there are new rows, append them to the table
            if not new_rows.isEmpty():
                # Write the new rows (with new columns if present)
                new_rows.write.option("mergeSchema", "true").mode("append").format("delta").save(delta_table_path)
                print("Added new data without duplicates.")
            else:
                print("No new rows to append.")
        else:
            # If the Delta table does not exist, create a new table
            print(f"{delta_table_path} does not contain a Delta table.")
            spark_dataframe.write.format("delta").mode("overwrite").save(delta_table_path)
            print("Created Delta table with new data.")
    except Exception as e:
        print(f"Error writing to Delta table: {str(e)}")

def delete_from_delta_lake(
    spark_session: SparkSession, delta_table_path: str, condition: str
):
    """
    Deletes rows from a Delta table based on a condition.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The Delta table path.
        condition (str): The condition for deletion.

    Returns:
        pandas.DataFrame: Updated Delta table details.
    """
    try:
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            delta_table.optimize().executeCompaction()
            preview_df = delta_table.toDF().filter(condition).limit(1).collect()

            if not preview_df:
                print(f"No records match the condition '{condition}'.")
                return delta_table.toDF().filter(condition).toPandas()

            delta_table.delete(condition)
            print(f"Deleted records with condition '{condition}'.")
            return delta_table.detail().toPandas()
        else:
            print(f"{delta_table_path} does not contain a Delta table.")
    except Exception as e:
        print(f"Error deleting from Delta table: {str(e)}")


def update_from_delta_lake(
    spark_session: SparkSession, delta_table_path: str, condition: str, set_expression: dict
):
    """
    Updates rows in a Delta table based on a condition.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The Delta table path.
        condition (str): The condition for updating rows.
        set_expression (str): The update expression (e.g., "column = value").

    Returns:
        pandas.DataFrame: Updated Delta table details.
    """
    try:
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            delta_table.optimize().executeCompaction()
            preview_df = delta_table.toDF().filter(condition).limit(1).collect()

            if not preview_df:
                print(f"No records match the condition '{condition}'.")
                return delta_table.toDF().filter(condition).toPandas()

            delta_table.update(condition=expr(condition), set=set_expression)
            print(f"Updated records with condition '{condition}'.")
            return delta_table.detail().toPandas()
        else:
            print(f"{delta_table_path} does not contain a Delta table.")
    except Exception as e:
        print(f"Error updating Delta table: {str(e)}")

def merge_from_delta_lake(
    spark_session: SparkSession, delta_table_path: str, sync_data_df, identifier_column: str
):
    """
    Merges synchronized data into a Delta table.

    This function performs a merge operation to update existing records or insert new ones
    from the source DataFrame into the target Delta table. The merge is based on a specified
    unique identifier column.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        delta_table_path (str): The file path to the Delta table.
        sync_data_df (DataFrame): The source DataFrame containing the synchronized data.
        identifier_column (str): The column used as the unique identifier for the merge.

    Returns:
        None

    Example:
        >>> merge_from_delta_lake(spark_session, "/path/to/delta", sync_data_df, "id")
        Merge completed.
    """
    try:
        # Check if the path corresponds to a Delta table
        delta_table = is_delta_table(spark_session, delta_table_path)

        if delta_table:
            # Optimize the Delta table before the merge
            delta_table.optimize().executeCompaction()

            # Perform the merge operation
            delta_table.alias("sync").merge(
                sync_data_df.alias("source"),
                f"sync.{identifier_column} = source.{identifier_column}",
            ).whenMatchedUpdate(
                set={col_name: col(f"source.{col_name}") for col_name in sync_data_df.columns}
            ).whenNotMatchedInsert(
                values={col_name: col(f"source.{col_name}") for col_name in sync_data_df.columns}
            ).execute()

            print("Merge operation completed successfully.")
        else:
            # Create the Delta table if it does not exist
            print(f"The directory '{delta_table_path}' does not contain a Delta table.")
            sync_data_df.write.format("delta").mode("overwrite").save(delta_table_path)
            print(f"Delta table created at '{delta_table_path}' with synchronized data.")

    except Exception as e:
        print(f"Error during the merge operation on the Delta table '{delta_table_path}': {str(e)}")

def show_historic_version_from_delta_file(
    spark_session, file_path, version=None, operation_filter=None, sort_by=None
):
    """
    Show historic changes from a Delta table version, handling column mismatches.
    """
    try:
        from pyspark.sql.functions import lit

        # Retrieve Delta Table and its history
        delta_table = DeltaTable.forPath(spark_session, file_path)
        history_df = delta_table.history()

        if operation_filter:
            # Filter history based on operation type
            result_df = history_df.filter(
                history_df.operation.contains(operation_filter)
            ).toPandas()
        elif version is not None:
            if version < 0 or version >= history_df.count():
                result_df = history_df.toPandas()
            else:
                # Get metadata for the requested version
                history_row = history_df.filter(history_df.version == version).collect()[0]
                operation, timestamp, user = (
                    history_row.operation,
                    history_row.timestamp,
                    history_row.userName or "Unknown",
                )

                # Load current and previous versions of the Delta table
                df_current = (
                    spark_session.read.format("delta")
                    .option("versionAsOf", version)
                    .load(file_path)
                )
                df_previous = (
                    spark_session.read.format("delta")
                    .option("versionAsOf", version - 1)
                    .load(file_path)
                    if version > 0
                    else None
                )

                # Align columns
                if df_previous:
                    # Get all unique columns from both DataFrames
                    current_columns = set(df_current.columns)
                    previous_columns = set(df_previous.columns)

                    # Add missing columns to each DataFrame with null values
                    for col in previous_columns - current_columns:
                        df_current = df_current.withColumn(col, lit(None))
                    for col in current_columns - previous_columns:
                        df_previous = df_previous.withColumn(col, lit(None))

                    # Reorder columns to ensure the same order in both DataFrames
                    common_columns = sorted(list(current_columns | previous_columns))
                    df_current = df_current.select(common_columns)
                    df_previous = df_previous.select(common_columns)

                # Handle operations
                if operation == "UPDATE":
                    df_removed = (
                        df_previous.subtract(df_current).toPandas()
                        if df_previous
                        else None
                    )
                    df_added = (
                        df_current.subtract(df_previous).toPandas()
                        if df_previous
                        else df_current.toPandas()
                    )

                    if df_removed is not None:
                        df_removed["ChangeType"] = "PRE UPDATE"
                    if df_added is not None:
                        df_added["ChangeType"] = "UPDATE"

                    result_df = pd.concat([df_removed, df_added]).reset_index(drop=True)
                elif operation == "DELETE" and df_previous:
                    result_df = (
                        df_previous.subtract(df_current)
                        .toPandas()
                        .assign(ChangeType="Deleted")
                    )
                else:
                    result_df = (
                        df_current.subtract(df_previous).toPandas()
                        if df_previous
                        else df_current.toPandas()
                    )

            # Sort results if requested
            if sort_by and sort_by in result_df.columns:
                result_df = result_df.sort_values(by=sort_by)
            else:
                print(f"Warning: Column '{sort_by}' not found in the DataFrame.")

        else:
            # Return full history if no version is specified
            result_df = history_df.toPandas()

        return result_df

    except Exception as e:
        print(f"Error: {str(e)}. Could not retrieve version or history from the Delta table.")
        return None

def read_delta_table_with_change_data_control(spark_session, delta_table_path, starting_version=0, ending_version=0):
    """
    Reads data from a Delta table with change data capture (CDC) enabled.

    This function loads data from a Delta table and retrieves the changes between specified versions.
    If the provided versions are invalid or if `starting_version` is greater than `ending_version`,
    the function adjusts the versions to ensure the correct range is used.

    Args:
        spark_session (SparkSession): The Spark session used to load the Delta table.
        delta_table_path (str): The path to the Delta table.
        starting_version (int, optional): The starting version for change data retrieval. Defaults to 0.
        ending_version (int, optional): The ending version for change data retrieval. Defaults to 0.

    Returns:
        pandas.DataFrame: A Pandas DataFrame containing the data from the Delta table.
    """
    
    try:
        # Initialize DeltaTable object
        delta_table = DeltaTable.forPath(spark_session, delta_table_path)
        
        # Get the Delta table history to check version range
        history_df = delta_table.history()
        max_version = history_df.select("version").rdd.max()[0]

        # Adjust if starting_version is greater than ending_version
        if starting_version > ending_version:
            starting_version, ending_version = ending_version, starting_version

        # Check if the provided versions are valid
        if 0 <= starting_version <= max_version and 0 <= ending_version <= max_version:
            return (
                spark_session.read
                .format("delta")
                .option("readChangeData", "true")
                .option("startingVersion", starting_version)
                .option("endingVersion", ending_version)
                .load(delta_table_path)
                .toPandas()
            )
        else:
            # Default behavior when versions are out of range
            return (
                spark_session.read
                .format("delta")
                .option("readChangeData", "true")
                .option("startingVersion", 0)
                .load(delta_table_path)
                .toPandas()
            )
    except Exception as e:
        print(f"Error reading Delta table: {e}")
        return None

#### CRUD VERSION 2

In [7]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

def insert_into_delta_table(spark_session, database_name, table_name, spark_dataframe):
    """
    Inserts records into a Delta table.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        database_name (str): The name of the database where the Delta table exists.
        table_name (str): The name of the Delta table where data will be inserted.
        df (DataFrame): The Spark DataFrame containing the data to insert.

    Returns:
        None
    """
    # Create the full table name with the database
    full_table_name = f"{database_name}.{table_name}"

    # Insert the data into the Delta table in append mode
    spark_dataframe.write.format("delta").mode("append").saveAsTable(full_table_name)

    print(f"Records inserted into Delta table: {full_table_name}")

def delete_from_delta_table(spark_session, database_name, table_name, condition):
    """
    Deletes records from a Delta table based on a specified condition.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        database_name (str): The name of the database where the Delta table exists.
        table_name (str): The name of the Delta table where data will be deleted.
        condition (str): The SQL condition to specify which records to delete.

    Returns:
        None
    """
    from delta.tables import DeltaTable

    try:
        # Create the full table name with the database
        full_table_name = f"{database_name}.{table_name}"

        # Load the Delta table
        delta_table = DeltaTable.forName(spark_session, full_table_name)

        # Perform the delete operation
        delta_table.delete(condition)

        print(f"Records matching condition '{condition}' deleted from Delta table: {full_table_name}")

    except Exception as e:
        print(f"Error: Could not delete records from Delta table. {str(e)}")
        
def update_in_delta_table(spark_session, database_name, table_name, condition, set_dict):
    """
    Updates records in a Delta table based on a specified condition and set clause.

    Parameters:
        spark_session (SparkSession): The active Spark session.
        database_name (str): The name of the database where the Delta table exists.
        table_name (str): The name of the Delta table where data will be updated.
        set_dict (dict): A dictionary where the keys are column names and the values are the new values for those columns.
        condition (str): The SQL condition to specify which records to update.

    Returns:
        None
    """
    try:
        # Create the full table name with the database
        full_table_name = f"{database_name}.{table_name}"

        # Load the Delta table
        delta_table = DeltaTable.forName(spark_session, full_table_name)

        # Convert integer and other literals into Spark SQL expressions using F.lit()
        set_expr = {col: F.lit(value) if isinstance(value, (int, float, str)) else value 
                    for col, value in set_dict.items()}

        # Perform the update operation
        delta_table.update(condition, set_expr)

        print(f"Records matching condition '{condition}' updated in Delta table: {full_table_name}")

    except Exception as e:
        print(f"Error: Could not update records in Delta table. {str(e)}")

# FUNCION PARA CREAR JOINS EN SPARK

In [8]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def join_spark_dataframes(
    left_df: DataFrame,
    right_df: DataFrame,
    join_type: str,
    join_condition: str = None,
    left_column: str = None,
    right_column: str = None,
    return_pandas: bool = False,
    execution_plan: bool = False,
):
    """
    Realiza diferentes tipos de uniones entre dos DataFrames y permite capturar los nombres de las variables reales.
    """
    # Diccionario de alias para cada tipo de unión
    join_aliases = {
        "inner": ["inner", "inner_join"],
        "left": ["left", "left_join"],
        "left_anti": ["left_anti", "anti_left", "anti_left_join"],
        "right": ["right", "right_join"],
        "right_anti": ["right_anti", "anti_right", "anti_right_join"],
        "outer": ["outer", "outer_join"],
        "outer_anti": ["outer_anti", "anti_outer_join", "outer_join_anti"],
        "cross": ["cross", "cross_join"],
        "self": ["self", "self_join"],
        "except": ["except", "exception","exceptAll", "except All", "except_all", "except all"],
        "intersect": ["intersect", "intersection"],
        "union": ["union"],
        "union_all": ["union_all", "union all"],
    }

    # Mapeo único de uniones a su tipo principal
    join_operations = {
        "inner": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "inner"
        ),
        "left": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "left"
        ),
        "left_anti": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "left_anti"
        ),
        "right": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "right"
        ),
        "right_anti": lambda: right_df.join(
            left_df, right_df[right_column] == left_df[left_column], "left_anti"
        ),
        "right_anti_v2": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "right"
        )
        .filter(left_df[left_column].isNull())
        .select(*right_df.columns),
        "outer": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "outer"
        ),
        "outer_anti": lambda: left_df.join(
            right_df, left_df[left_column] == right_df[right_column], "outer"
        ).filter(
            (left_df[left_column].isNotNull() & right_df[right_column].isNull())
            | (left_df[left_column].isNull() & right_df[right_column].isNotNull())
        ),
        "cross": lambda: left_df.crossJoin(right_df),
        "self": lambda: left_df.alias("table_one").join(
            left_df.alias("table_two"),
            F.col(f"table_one.{left_column}") == F.col(f"table_two.{right_column}"),
            "inner"
        ),
        "intersect": lambda: left_df.intersect(right_df),
        "except": lambda: left_df.exceptAll(right_df),
        "union": lambda: left_df.union(right_df),
        "union_all": lambda: left_df.unionByName(right_df, allowMissingColumns=True),  # tiene que ser los mismos nombres de columnas en ambos dataframes, sino toma la de la izquierda
    }

    # Validar que el tipo de join sea válido (revisando en el mapeo de alias)
    requested_join = join_type
    join_type = next(
        (key for key, aliases in join_aliases.items() if join_type in aliases), None
    )

    if not join_type:
        raise ValueError(f"Tipo de join '{requested_join}' no es válido.")

    if join_condition and isinstance(join_condition, str):
        # Si join_condition es una cadena, convertirla a expresión de columna de PySpark
        join_condition = F.expr(join_condition)

        joined_df = left_df.join(right_df, join_condition, join_type)

    elif left_column and right_column:
        # Ejecutar la operación de unión
        joined_df = join_operations[join_type]()

    if execution_plan:
        print(f"Query ejecutado:")
        joined_df.explain(True)  # Usar .explain(True) para obtener detalles del plan de ejecución

    # Retornar el resultado en Pandas o como DataFrame de Spark
    if return_pandas:
        return joined_df.toPandas()
    else:
        return joined_df

# <center> CLAUSULAS PARA CONSULTAS </center>

| **Operación**                | **SQL Normal (ANSI SQL)**                                          | **Spark SQL**                                                      | **DataFrame API**                                                        |
|------------------------------|-------------------------------------------------------------------|---------------------------------------------------------------------|-------------------------------------------------------------------------|
| **SELECT**                    | `SELECT columna FROM tabla`                                      | `SELECT columna FROM tabla`                                        | `df.select("columna")`                                                   |
| **FROM**                      | `SELECT * FROM tabla`                                             | `SELECT * FROM tabla`                                               | `df`                                                                     |
| **WHERE**                     | `SELECT * FROM tabla WHERE condicion`                             | `SELECT * FROM tabla WHERE condicion`                               | `df.filter("condicion")`                                                 |
| **GROUP BY**                  | `SELECT columna, COUNT(*) FROM tabla GROUP BY columna`            | `SELECT columna, COUNT(*) FROM tabla GROUP BY columna`              | `df.groupBy("columna").count()`                                          |
| **HAVING**                    | `SELECT columna, COUNT(*) FROM tabla GROUP BY columna HAVING COUNT(*) > 10` | `SELECT columna, COUNT(*) FROM tabla GROUP BY columna HAVING COUNT(*) > 10` | `df.groupBy("columna").count().filter("count > 10")`                      |
| **JOIN (INNER)**              | `SELECT * FROM tabla1 INNER JOIN tabla2 ON tabla1.id = tabla2.id` | `SELECT * FROM tabla1 INNER JOIN tabla2 ON tabla1.id = tabla2.id`   | `df1.join(df2, df1.id == df2.id)`                                        |
| **JOIN (LEFT OUTER)**         | `SELECT * FROM tabla1 LEFT JOIN tabla2 ON tabla1.id = tabla2.id`  | `SELECT * FROM tabla1 LEFT JOIN tabla2 ON tabla1.id = tabla2.id`    | `df1.join(df2, df1.id == df2.id, "left")`                                |
| **JOIN (RIGHT OUTER)**        | `SELECT * FROM tabla1 RIGHT JOIN tabla2 ON tabla1.id = tabla2.id` | `SELECT * FROM tabla1 RIGHT JOIN tabla2 ON tabla1.id = tabla2.id`   | `df1.join(df2, df1.id == df2.id, "right")`                               |
| **JOIN (FULL OUTER)**         | `SELECT * FROM tabla1 FULL OUTER JOIN tabla2 ON tabla1.id = tabla2.id` | `SELECT * FROM tabla1 FULL OUTER JOIN tabla2 ON tabla1.id = tabla2.id` | `df1.join(df2, df1.id == df2.id, "outer")`                               |
| **LEFT ANTI JOIN (Equivalente en ANSI SQL)**            | `SELECT * FROM tabla1 LEFT JOIN tabla2 ON tabla1.id = tabla2.id WHERE tabla2.id IS NULL` | `SELECT * FROM tabla1 LEFT ANTI JOIN tabla2 ON tabla1.id = tabla2.id` | `df1.join(df2, df1.id == df2.id, "left_anti")`                           |
| **UNION**                     | `SELECT columna FROM tabla1 UNION SELECT columna FROM tabla2`    | `SELECT columna FROM tabla1 UNION SELECT columna FROM tabla2`      | `df1.union(df2)`                                                         |
| **UNION ALL**                 | `SELECT columna FROM tabla1 UNION ALL SELECT columna FROM tabla2` | `SELECT columna FROM tabla1 UNION ALL SELECT columna FROM tabla2`  | `df1.unionByName(df2)`                                                   |
| **INTERSECT**                 | `SELECT columna FROM tabla1 INTERSECT SELECT columna FROM tabla2` | `SELECT columna FROM tabla1 INTERSECT SELECT columna FROM tabla2`  | `df1.intersect(df2)`                                                     |
| **EXCEPT**                    | `SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2`   | `SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2`     | `df1.exceptAll(df2)`                                                        |
| **EXCEPT**    | `SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2` | `SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2` | `df1.select("columna").subtract(df2.select("columna"))` |
| **AGGREGATE (SUM)**           | `SELECT SUM(columna) FROM tabla`                                  | `SELECT SUM(columna) FROM tabla`                                    | `df.agg({"columna": "sum"})`                                             |
| **AGGREGATE (AVG)**           | `SELECT AVG(columna) FROM tabla`                                  | `SELECT AVG(columna) FROM tabla`                                    | `df.agg({"columna": "avg"})`                                             |
| **AGGREGATE (MAX)**           | `SELECT MAX(columna) FROM tabla`                                  | `SELECT MAX(columna) FROM tabla`                                    | `df.agg({"columna": "max"})`                                             |
| **AGGREGATE (MIN)**           | `SELECT MIN(columna) FROM tabla`                                  | `SELECT MIN(columna) FROM tabla`                                    | `df.agg({"columna": "min"})`                                             |
| **AGGREGATE (COUNT)**         | `SELECT COUNT(*) FROM tabla`                                      | `SELECT COUNT(*) FROM tabla`                                        | `df.count()`                                                             |
| **DISTINCT**                  | `SELECT DISTINCT columna FROM tabla`                              | `SELECT DISTINCT columna FROM tabla`                                | `df.select("columna").distinct()`                                        |
| **ORDER BY**                  | `SELECT * FROM tabla ORDER BY columna ASC`                        | `SELECT * FROM tabla ORDER BY columna ASC`                          | `df.orderBy("columna")`                                                 |
| **LIMIT**                     | `SELECT * FROM tabla LIMIT 10`                                    | `SELECT * FROM tabla LIMIT 10`                                      | `df.limit(10)`                                                           |
| **OFFSET**                     | `SELECT columna FROM tabla LIMIT 10 OFFSET 5`                    | `SELECT columna FROM tabla LIMIT 10 OFFSET 5`                       | `df.offset(5).limit(10)`                                                           |
| **CASE WHEN (IF ELSE)**       | `SELECT CASE WHEN columna > 10 THEN 'Alto' ELSE 'Bajo' END FROM tabla` | `SELECT CASE WHEN columna > 10 THEN 'Alto' ELSE 'Bajo' END FROM tabla` | `df.withColumn("nuevo_columna", when(df["columna"] > 10, "Alto").otherwise("Bajo"))` |
| **IS NULL**                   | `SELECT * FROM tabla WHERE columna IS NULL`                       | `SELECT * FROM tabla WHERE columna IS NULL`                         | `df.filter(df["columna"].isNull())`                                      |
| **IS NOT NULL**               | `SELECT * FROM tabla WHERE columna IS NOT NULL`                   | `SELECT * FROM tabla WHERE columna IS NOT NULL`                     | `df.filter(df["columna"].isNotNull())`                                   |
| **CAST**                      | `SELECT CAST(columna AS INT) FROM tabla`                          | `SELECT CAST(columna AS INT) FROM tabla`                            | `df.withColumn("columna", df["columna"].cast("int"))`                    |

# DOWNLOAD DATASET Y CREATE PANDAS DATAFRAME

In [9]:
import pandas as pd
from pandas_dataset_handler import PandasDatasetHandler
import gc

# URL del archivo Parquet en GitHub
parquet_file = "https://raw.githubusercontent.com/JorgeCardona/data-collection-json-csv-sql/main/parquet/bulk_data_20.parquet"
pandas_df = PandasDatasetHandler.load_dataset(parquet_file)

parquet_file_name = "dataset.parquet"
pandas_df.to_parquet(parquet_file_name, index=False)

print(pandas_df.shape)
pandas_df.head()

# Liberar memoria eliminando el DataFrame
del pandas_df
gc.collect()  # Forzar recolección de basura

print("Pandas DataFrame eliminado de la memoria.")

File 'https://raw.githubusercontent.com/JorgeCardona/data-collection-json-csv-sql/main/parquet/bulk_data_20.parquet' successfully loaded as parquet.
(250000, 19)
Pandas DataFrame eliminado de la memoria.


# <center> SESION DE SPARK

In [10]:
# Example usage
app_name = "Delta Spark"
# Ruta para el directorio del metastore
base_dir = "./warehouse-spark"
warehouse_dir = f"{base_dir}/spark_catalog/database"
metastore_db_path = f"{base_dir}/metastore/metastore_db"

spark_session = create_spark_session(
    app_name=app_name, warehouse_dir=warehouse_dir, metastore_db_path=metastore_db_path
)

# Set this configuration before running your queries
spark_session.conf.set(
    "spark.sql.debug.maxToStringFields", "1000"
)  # Set this to a higher number

# Ajusta el número de particiones dinámicamente en base al número de núcleos disponibles
num_particiones = max(2, spark_session.sparkContext.defaultParallelism)  # Al menos 2 particiones, ajustado a los núcleos
spark_session.conf.set("spark.sql.shuffle.partitions", num_particiones)  # Ajusta el número de particiones según el entorno
spark_session.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") # enable schema evolution
spark_session.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")#config to enable all new Delta tables with Change Data Feed



:: loading settings :: url = jar:file:/usr/local/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-04b8c5cb-1ce8-427c-a23a-cd93ec9c8b52;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.1 in central
	found io.delta#delta-storage;3.2.1 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 156ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.1 from central in [default]
	io.delta#delta-storage;3.2.1 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   

SparkSession created with Delta and persistent metastore at: ./warehouse-spark/spark_catalog/database


# [SPARK USER INTERFACE](http://localhost:4040/)

[CLICK HERE](http://localhost:4040/)

# <center> CREATE SPARK DATAFRAME DE UN ARCHIVO PARQUET CLASICO

In [11]:
spark_dataframe_sample = spark_session.read.parquet(parquet_file_name)

# Almacenar en caché el DataFrame para evitar recalcularlo en operaciones subsecuentes, mejorando el rendimiento.
spark_dataframe_sample.cache()

spark_dataframe_sample.show(truncate=False)

                                                                                

+-----------------------------+--------------------------------------------------------+-------------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+------------------------+--------------+--------------+--------------+-------------------------+------------+
|email                        |address                                                 |country                  |sex   |age|profession     |zodiac_sign|favorite_food|favorite_sport|favorite_movie_genre|favorite_animal|preferred_language|hobby            |favorite_tv_show        |favorite_color|favorite_drink|favorite_music|favorite_technology      |favorite_car|
+-----------------------------+--------------------------------------------------------+-------------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+------------

# <center> DIVIDIR DATAFRAME

In [12]:
# divide el dataframe
num_parts = 2
spark_dataframe_sql, spark_dataframe_delta = split_spark_dataframe(spark_dataframe_sample, num_parts)

# Almacenar en caché el DataFrame para evitar recalcularlo en operaciones subsecuentes, mejorando el rendimiento.
spark_dataframe_sql.cache()
spark_dataframe_delta.cache()

spark_dataframe_sql.show()
spark_dataframe_delta.show()

Successfully created 2 DataFrames.


                                                                                

+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+--------------------+--------------+--------------+--------------+--------------------+------------+
|               email|             address|             country|   sex|age|     profession|zodiac_sign|favorite_food|favorite_sport|favorite_movie_genre|favorite_animal|preferred_language|            hobby|    favorite_tv_show|favorite_color|favorite_drink|favorite_music| favorite_technology|favorite_car|
+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+--------------------+--------------+--------------+--------------+--------------------+------------+
|kathryn69@example...|060 Steven Row Ap...|                Guam|  Male| 38|    



+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+--------------+--------------------+--------------+--------------+--------------+--------------------+-------------+
|               email|             address|             country|   sex|age|     profession|zodiac_sign|favorite_food|favorite_sport|favorite_movie_genre|favorite_animal|preferred_language|         hobby|    favorite_tv_show|favorite_color|favorite_drink|favorite_music| favorite_technology| favorite_car|
+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+--------------+--------------------+--------------+--------------+--------------+--------------------+-------------+
|  dawn38@example.net|14388 Li Flat\nSo...|                Togo|Female| 22|   Entrepr

                                                                                

# <center> CREATE DATABASE

In [13]:
# Ejemplo de uso
database_name = "delta_spark_database"
create_database(spark_session, database_name)

print()

list_all_databases_and_tables(spark_session)

Database 'delta_spark_database' created or already exists.

The following databases and tables are present in the Spark Catalog.



{'default': [], 'delta_spark_database': []}

# <center> GUARDAR DELTA TABLE CON SQL

In [14]:
database_sql = "default"
table_sql = "delta_sql"
dataframe_sql = spark_dataframe_sql
sql_warehouse_dir = warehouse_dir
partition_by = ['zodiac_sign']

create_delta_table_with_spark_dataframe_and_register(
    spark_session, database_sql, table_sql, dataframe_sql, sql_warehouse_dir, partition_by
)

24/12/22 02:56:16 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Temporary files at './warehouse-spark/spark_catalog/database/default/delta_sql' have been deleted.
Table 'delta_sql' created and registered at './warehouse-spark/spark_catalog/database/default/delta_sql' in database 'default' with partitioning by ['zodiac_sign'].


# <center> GUARDAR DELTA TABLE DE UN SPARK DATAFRAME

In [15]:
database_delta = "delta_spark_database"
table_delta_1 = "delta_dataframe_1"
partition_by = ['zodiac_sign']
table_delta_2 = "delta_dataframe_2"
dataframe_delta_1 = spark_dataframe_delta
dataframe_delta_2 = spark_dataframe_sql
delta_warehouse_dir = warehouse_dir

create_delta_table_in_database(
    spark_session, database_delta, table_delta_1, dataframe_delta_1, delta_warehouse_dir, partition_by
)
create_delta_table_in_database(
    spark_session, database_delta, table_delta_2, dataframe_delta_2, delta_warehouse_dir
)

Table 'delta_spark_database.delta_dataframe_1' dropped.


                                                                                

Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_1'.
Table 'delta_dataframe_1' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_1' with partitioning by ['zodiac_sign'].
Table 'delta_spark_database.delta_dataframe_2' dropped.


                                                                                

Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_2'.
Table 'delta_dataframe_2' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_2' with partitioning by none.


# <center> GUARDAR DATAFRAME COMO **DELTA** PARQUET

In [16]:
parquet_dataframe = spark_dataframe_sample
file_name = "spark_dataframe_delta"
file_path_delta = f"{base_dir}/spark_files/delta_parquet"
partition_by = ['zodiac_sign']

save_dataframe_as_delta_parquet(spark_dataframe_sample, file_name, file_path_delta, partition_by)

24/12/22 02:56:30 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/12/22 02:56:30 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/22 02:56:30 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/22 02:56:30 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/22 02:56:30 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/12/22 02:56:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/22 02:56:31 WARN MemoryManager: Total allocation exceeds 95.

DataFrame saved as Delta Parquet at './warehouse-spark/spark_files/delta_parquet/spark_dataframe_delta' with partitioning by ['zodiac_sign'].


# <center> GUARDAR DATAFRAME COMO PARQUET CLASICO  1 SOLO ARCHIVO

In [17]:
parquet_dataframe = spark_dataframe_sample
file_name_compact = "spark_dataframe_compact"
file_path_compact_parquet = f"{base_dir}/spark_files/classic_parquet/parquet_clasico/parque_compacto"

save_dataframe_as_parquet(spark_dataframe_sample, file_name_compact, file_path_compact_parquet)



DataFrame saved as Delta Parquet at './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parque_compacto/spark_dataframe_compact' with partitioning by None.


                                                                                

# <center> GUARDAR DATAFRAME COMO PARQUET CLASICO PARTICIONADO

In [18]:
parquet_dataframe = spark_dataframe_sample
file_name = "spark_dataframe_patitioned"
file_path_partitioned_parquet = f"{base_dir}/spark_files/classic_parquet/parquet_clasico/parquet_particionado"
partition_by = ['zodiac_sign']

save_dataframe_as_parquet(spark_dataframe_sample, file_name, file_path_partitioned_parquet)



DataFrame saved as Delta Parquet at './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parquet_particionado/spark_dataframe_patitioned' with partitioning by None.


                                                                                

# <center> LEER ARCHIVOS PARQUET

### **DELTA PARQUET**

In [19]:
delta_parquet = read_parquet_or_delta_file(spark_session, file_path_delta)
display(delta_parquet.toPandas())

del delta_parquet
gc.collect()

Successfully read Parquet file from './warehouse-spark/spark_files/delta_parquet'.


email,address,country,sex,age,profession,favorite_food,favorite_sport,favorite_movie_genre,favorite_animal,preferred_language,hobby,favorite_tv_show,favorite_color,favorite_drink,favorite_music,favorite_technology,favorite_car,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,


20

### **REGULAR PARQUET**

#### COMPACT PARQUET

In [20]:
file_parquet = read_parquet_or_delta_file(spark_session, file_path_compact_parquet)
display(file_parquet.toPandas())

del file_parquet
gc.collect()

Successfully read Parquet file from './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parque_compacto'.


                                                                                

email,address,country,sex,age,profession,zodiac_sign,favorite_food,favorite_sport,favorite_movie_genre,favorite_animal,preferred_language,hobby,favorite_tv_show,favorite_color,favorite_drink,favorite_music,favorite_technology,favorite_car
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,


29

#### PARTITIONED PARQUET

In [21]:
file_parquet = read_parquet_or_delta_file(spark_session, file_path_partitioned_parquet)
display(file_parquet.toPandas())

del file_parquet 
gc.collect()

Successfully read Parquet file from './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parquet_particionado'.


                                                                                

email,address,country,sex,age,profession,zodiac_sign,favorite_food,favorite_sport,favorite_movie_genre,favorite_animal,preferred_language,hobby,favorite_tv_show,favorite_color,favorite_drink,favorite_music,favorite_technology,favorite_car
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,


20

# <center> LISTAR LAS TABLAS DE LA BASE DE DATOS

In [22]:
list_all_databases_and_tables(spark_session)

The following databases and tables are present in the Spark Catalog.

Database: default, Table: delta_sql

Database: delta_spark_database, Table: delta_dataframe_1

Database: delta_spark_database, Table: delta_dataframe_2



{'default': ['delta_sql'],
 'delta_spark_database': ['delta_dataframe_1', 'delta_dataframe_2']}

# CONSULTAS

# <center> SELECT, WHERE-FILTER, LIMIT

### USANDO SPARK SQL

In [23]:
database = "default"
table = "delta_sql"

query = f"""
SELECT email, hobby, country 
FROM {database}.{table} 
WHERE AGE <= 18 
LIMIT 5
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.show(truncate=False)
spark_dataframe.toPandas()

+---------------------------+---------+--------------+
|email                      |hobby    |country       |
+---------------------------+---------+--------------+
|esparzalouis@example.net   |Traveling|Timor-Leste   |
|melinda87@example.net      |Gaming   |Montserrat    |
|fnelson@example.net        |Knitting |American Samoa|
|jenniferjohnson@example.net|Chess    |New Zealand   |
|vmiller@example.net        |Antiquing|Egypt         |
+---------------------------+---------+--------------+



email,hobby,country
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,


### USANDO SPARK DATAFRAME

In [24]:
from pyspark.sql.functions import col

# Aplicar el filtro directamente sobre el DataFrame
filtered_dataframe = (
    spark_dataframe_sql.select("email", "hobby", "country").filter(col("AGE") <= 18).limit(5)
)

# Mostrar los resultados
filtered_dataframe.toPandas()

email,hobby,country
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,


In [25]:
# Aplicar el filtro directamente sobre el DataFrame
filtered_dataframe = (
    spark_dataframe_sql.select("email", "hobby", "country").where(spark_dataframe_sql["AGE"] <= 18).limit(5)
)

# Mostrar los resultados
filtered_dataframe.toPandas()

email,hobby,country
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,


# <center> CTE, SELECT, WHERE, BETWEEN, GROUP BY, HAVING, ORDER BY, LIMIT, OFFSET

In [26]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
WITH FOURTY AS (
    SELECT * 
    FROM {database}.{table}
    WHERE AGE <> 0
    AND AGE BETWEEN 1 AND 99
)

SELECT age, country, zodiac_sign, count(*) as TOTAL
FROM FOURTY
GROUP BY age, country, zodiac_sign
HAVING TOTAL > 0
ORDER BY TOTAL DESC, AGE DESC, COUNTRY
LIMIT 13
OFFSET 31
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

age,country,zodiac_sign,TOTAL
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,


### USANDO SPARK SQL

In [27]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
WITH FOURTY AS (
    SELECT * 
    FROM {database}.{table}
    WHERE AGE <> 0
    AND AGE BETWEEN 1 AND 99
)

SELECT age, country, zodiac_sign, count(*) as TOTAL
FROM FOURTY
GROUP BY age, country, zodiac_sign
HAVING TOTAL > 0
ORDER BY TOTAL DESC, AGE DESC, COUNTRY
LIMIT 13
OFFSET 31
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

age,country,zodiac_sign,TOTAL
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,


### USANDO SPARK DATAFRAME

In [28]:
# Bloque único de operaciones
result_df = (
    spark_dataframe_delta.filter(
        (spark_dataframe_delta["AGE"] > 0) & (spark_dataframe_delta["AGE"].between(1, 99))
    )  # Filtrar filas donde AGE <> 0 y AGE BETWEEN 1 AND 99
    .groupBy("age", "country", "zodiac_sign")  # Agrupar por age, country y zodiac_sign
    .count()  # Contar las filas en cada grupo
    .withColumnRenamed("count", "TOTAL")  # Renombrar la columna 'count' a 'TOTAL'
    .filter("TOTAL > 0")  # Filtrar grupos donde TOTAL > 0 (HAVING en SQL)
    .orderBy(
        ["TOTAL", "age", "country"], ascending=[False, False, True]
    )  # Ordenar por TOTAL y age en orden descendente y por country en orden Ascendente
    .offset(31)  # Saltar las primeras 31 filas
    .limit(13)  # Limitar el resultado a 31 filas
)

# Mostrar resultados
result_df.toPandas()

age,country,zodiac_sign,TOTAL
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,


# <center> FUNCIONES DE AGREGADO

### USANDO SPARK SQL

In [29]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
SELECT
    SUM(age) AS total_age,
    AVG(age) AS average_age,
    COUNT(age) AS total_records,
    MAX(age) AS max_age,
    MIN(age) AS min_age,
    APPROX_COUNT_DISTINCT(age) AS distinct_ages,
    STDDEV(age) AS stddev_age,
    VARIANCE(age) AS variance_age,
    SKEWNESS(age) AS skewness_age,
    KURTOSIS(age) AS kurtosis_age,
    COLLECT_LIST(age) AS age_list,
    COLLECT_SET(age) AS unique_ages
FROM {database}.{table}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

total_age,average_age,total_records,max_age,min_age,distinct_ages,stddev_age,variance_age,skewness_age,kurtosis_age,age_list,unique_ages
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [30]:
from pyspark.sql.functions import (
    sum as spark_sum, avg, count, max as spark_max, min as spark_min, 
    approx_count_distinct, stddev, variance, skewness, kurtosis, collect_list, collect_set
)

spark_dataframe_delta.agg(
    spark_sum("age").alias("total_age"),
    avg("age").alias("average_age"),
    count("age").alias("total_records"),
    spark_max("age").alias("max_age"),
    spark_min("age").alias("min_age"),
    approx_count_distinct("age").alias("distinct_ages"),
    stddev("age").alias("stddev_age"),
    variance("age").alias("variance_age"),
    skewness("age").alias("skewness_age"),
    kurtosis("age").alias("kurtosis_age"),
    collect_list("age").alias("age_list"),
    collect_set("age").alias("unique_ages")
).toPandas()

total_age,average_age,total_records,max_age,min_age,distinct_ages,stddev_age,variance_age,skewness_age,kurtosis_age,age_list,unique_ages
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,


# <center> CASE WHEN - THEN

### USANDO SPARK SQL

In [31]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
SELECT
    CASE 
        WHEN age < 1 THEN 'Newborn'
        WHEN age >= 1 AND age < 3 THEN 'Infant'
        WHEN age >= 3 AND age < 5 THEN 'Toddler'
        WHEN age >= 5 AND age < 7 THEN 'Preschooler'
        WHEN age >= 7 AND age < 10 THEN 'Early School Age'
        WHEN age >= 10 AND age < 14 THEN 'Pre-Adolescent'
        WHEN age >= 14 AND age < 18 THEN 'Teenager'
        WHEN age >= 18 AND age < 28 THEN 'Young Adult'
        WHEN age >= 28 AND age < 40 THEN 'Adult'
        WHEN age >= 40 AND age < 50 THEN 'Midlife Adult'
        WHEN age >= 50 AND age < 60 THEN 'Experienced Adult'
        WHEN age >= 60 AND age < 70 THEN 'Mature Adult'
        ELSE 'Senior'
    END AS age_category,
    *
FROM {database}.{table}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

age_category,email,address,country,sex,age,profession,zodiac_sign,favorite_food,favorite_sport,favorite_movie_genre,favorite_animal,preferred_language,hobby,favorite_tv_show,favorite_color,favorite_drink,favorite_music,favorite_technology,favorite_car
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [32]:
import pyspark.sql.functions as F

spark_dataframe_delta_case = spark_dataframe_delta.withColumn(
    "age_category",
    F.when(spark_dataframe_delta["age"] < 1, "Newborn")
    .when((spark_dataframe_delta["age"] >= 1) & (spark_dataframe_delta["age"] < 3), "Infant")
    .when((spark_dataframe_delta["age"] >= 3) & (spark_dataframe_delta["age"] < 5), "Toddler")
    .when((spark_dataframe_delta["age"] >= 5) & (spark_dataframe_delta["age"] < 7), "Preschooler")
    .when((spark_dataframe_delta["age"] >= 7) & (spark_dataframe_delta["age"] < 10), "Early School Age")
    .when((spark_dataframe_delta["age"] >= 10) & (spark_dataframe_delta["age"] < 14), "Pre-Adolescent")
    .when((spark_dataframe_delta["age"] >= 14) & (spark_dataframe_delta["age"] < 18), "Teenager")
    .when((spark_dataframe_delta["age"] >= 18) & (spark_dataframe_delta["age"] < 28), "Young Adult")
    .when((spark_dataframe_delta["age"] >= 28) & (spark_dataframe_delta["age"] < 40), "Adult")
    .when((spark_dataframe_delta["age"] >= 40) & (spark_dataframe_delta["age"] < 50), "Midlife Adult")
    .when((spark_dataframe_delta["age"] >= 50) & (spark_dataframe_delta["age"] < 60), "Experienced Adult")
    .when((spark_dataframe_delta["age"] >= 60) & (spark_dataframe_delta["age"] < 70), "Mature Adult")
    .otherwise("Senior")
)

# Reorganizar para poner "age_category" en la primera posición
cols = ['age_category'] + [col for col in spark_dataframe_delta_case.columns if col != 'age_category']
spark_dataframe_delta_case.select(cols).toPandas()

                                                                                

age_category,email,address,country,sex,age,profession,zodiac_sign,favorite_food,favorite_sport,favorite_movie_genre,favorite_animal,preferred_language,hobby,favorite_tv_show,favorite_color,favorite_drink,favorite_music,favorite_technology,favorite_car
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,


# <center> DISTINCT - CAST

### USANDO SPARK SQL

In [33]:
database = "delta_spark_database"
table = "delta_dataframe_1"

query = f"""
SELECT DISTINCT
    sex, 
    country, 
    FORMAT_NUMBER(CAST(age AS DOUBLE), 2) AS age,
    zodiac_sign, 
    profession
FROM {database}.{table}
ORDER BY sex ASC, country DESC, age ASC, zodiac_sign DESC, profession ASC
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

sex,country,age,zodiac_sign,profession
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,


### USANDO SPARK DATAFRAME

In [34]:
from pyspark.sql import functions as F

spark_dataframe_delta.select(
        "sex", 
        "country", 
        F.format_number(F.col("age").cast("float"), 2).alias("age"), 
        "zodiac_sign", 
        "profession"
    ) \
    .distinct() \
    .orderBy(
        F.col("sex").asc(), 
        F.col("country").desc(), 
        F.col("age").asc(), 
        F.col("zodiac_sign").desc(), 
        F.col("profession").asc()
    ).toPandas()

                                                                                

sex,country,age,zodiac_sign,profession
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,


# <center> CREAR DATASET PARA EJEMPLOS CORTOS

In [35]:
# Esquema explícito con el orden deseado
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, MapType
from pyspark.sql.functions import col, struct

schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("related_id", IntegerType(), True),
        StructField("zodiac_sign", StringType(), True),
        StructField("country", StringType(), True),
        StructField("values", ArrayType(IntegerType()), True),
        StructField("fruits_and_vitamins", MapType(StringType(), StringType()), True)
    ]
)

list_data_dict = [
  {"id": 1, "name": "Nathalie", "age": 1, "related_id": 11, "zodiac_sign": "Capricorn", "country": "Colombia", "values": [1, 2, 3, 4, 5], "fruits_and_vitamins": {"Apple": "Vitamin C", "Banana": "Vitamin B6", "Orange": "Vitamin C"}}, 
  {"id": 2, "name": "Cora", "age": 3, "related_id": 5, "zodiac_sign": "Taurus", "country": "USA", "values": [6, 7, 8, 9, 10], "fruits_and_vitamins": {"Mango": "Vitamin A", "Grapes": "Vitamin K", "Papaya": "Vitamin C"}}, 
  {"id": 3, "name": "Gaby", "age": 5, "related_id": 14, "zodiac_sign": "Gemini", "country": "Colombia", "values": [11, 12, 13, 14, 15], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Pineapple": "Vitamin A", "Strawberry": "Vitamin C"}}, 
  {"id": 4, "name": "Muneca", "age": 7, "related_id": 6, "zodiac_sign": "Cancer", "country": "Portugal", "values": [16, 17, 18, 19, 20], "fruits_and_vitamins": {"Peach": "Vitamin A", "Blueberry": "Vitamin C", "Watermelon": "Vitamin A"}}, 
  {"id": 5, "name": "Principe", "age": 9, "related_id": 2, "zodiac_sign": "Leo", "country": "Argentina", "values": [21, 22, 23, 24, 25], "fruits_and_vitamins": {"Avocado": "Vitamin E", "Apple": "Vitamin C", "Grapefruit": "Vitamin C"}}, 
  {"id": 6, "name": "Ana", "age": 11, "related_id": 3, "zodiac_sign": "Virgo", "country": "Colombia", "values": [26, 27, 28, 29, 30], "fruits_and_vitamins": {"Pineapple": "Vitamin C", "Mango": "Vitamin A", "Banana": "Vitamin B6"}}, 
  {"id": 7, "name": "Cecilia", "age": 13, "related_id": 20, "zodiac_sign": "Libra", "country": "Colombia", "values": [31, 32, 33, 34, 35], "fruits_and_vitamins": {"Watermelon": "Vitamin C", "Apple": "Vitamin C", "Orange": "Vitamin C"}}, 
  {"id": 8, "name": "Lucia", "age": 15, "related_id": 13, "zodiac_sign": "Scorpio", "country": "Peru", "values": [36, 37, 38, 39, 40], "fruits_and_vitamins": {"Peach": "Vitamin A", "Mango": "Vitamin A", "Pineapple": "Vitamin C"}}, 
  {"id": 9, "name": "Zeus", "age": 17, "related_id": 7, "zodiac_sign": "Sagittarius", "country": "Mexico", "values": [41, 42, 43, 44, 45], "fruits_and_vitamins": {"Strawberry": "Vitamin C", "Orange": "Vitamin C", "Papaya": "Vitamin C"}}, 
  {"id": 10, "name": "Guadalupe", "age": 15, "related_id": 17, "zodiac_sign": "Aries", "country": "Colombia", "values": [46, 47, 48, 49, 50], "fruits_and_vitamins": {"Blueberry": "Vitamin C", "Banana": "Vitamin B6", "Grapes": "Vitamin K"}}, 
  {"id": 21, "name": "Spark", "age": 3, "related_id": 22, "zodiac_sign": "Aquarius", "country": "Portugal", "values": [51, 52, 53, 54, 55], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Papaya": "Vitamin C", "Mango": "Vitamin A"}}, 
  {"id": 22, "name": "Delta", "age": 7, "related_id": 21, "zodiac_sign": "Pisces", "country": "Rusia", "values": [56, 57, 58, 59, 60], "fruits_and_vitamins": {"Apple": "Vitamin C", "Banana": "Vitamin B6", "Strawberry": "Vitamin C"}}, 
  {"id": 11, "name": "Augusto", "age": 17, "related_id": 1, "zodiac_sign": "Aries", "country": "Argentina", "values": [61, 62, 63, 64, 65], "fruits_and_vitamins": {"Peach": "Vitamin A", "Grapefruit": "Vitamin C", "Mango": "Vitamin A"}}, 
  {"id": 12, "name": "Muiscas", "age": 13, "related_id": 16, "zodiac_sign": "Taurus", "country": "Portugal", "values": [66, 67, 68, 69, 70], "fruits_and_vitamins": {"Watermelon": "Vitamin A", "Orange": "Vitamin C", "Papaya": "Vitamin C"}}, 
  {"id": 13, "name": "Jorge", "age": 11, "related_id": 8, "zodiac_sign": "Gemini", "country": "USA", "values": [71, 72, 73, 74, 75], "fruits_and_vitamins": {"Strawberry": "Vitamin C", "Apple": "Vitamin C", "Kiwi": "Vitamin C"}}, 
  {"id": 14, "name": "Sandra", "age": 9, "related_id": 4, "zodiac_sign": "Cancer", "country": "Argentina", "values": [76, 77, 78, 79, 80], "fruits_and_vitamins": {"Grapes": "Vitamin K", "Orange": "Vitamin C", "Blueberry": "Vitamin C"}}, 
  {"id": 15, "name": "Carlos", "age": 2, "related_id": 18, "zodiac_sign": "Leo", "country": "Peru", "values": [81, 82, 83, 84, 85], "fruits_and_vitamins": {"Watermelon": "Vitamin C", "Mango": "Vitamin A", "Apple": "Vitamin C"}}, 
  {"id": 16, "name": "Isabel", "age": 4, "related_id": 12, "zodiac_sign": "Virgo", "country": "Mexico", "values": [86, 87, 88, 89, 90], "fruits_and_vitamins": {"Papaya": "Vitamin C", "Kiwi": "Vitamin C", "Pineapple": "Vitamin A"}}, 
  {"id": 17, "name": "Paola", "age": 6, "related_id": 9, "zodiac_sign": "Libra", "country": "Colombia", "values": [91, 92, 93, 94, 95], "fruits_and_vitamins": {"Strawberry": "Vitamin C", "Grapefruit": "Vitamin C", "Banana": "Vitamin B6"}}, 
  {"id": 18, "name": "David", "age": 8, "related_id": 15, "zodiac_sign": "Scorpio", "country": "Mexico", "values": [96, 97, 98, 99, 100], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Papaya": "Vitamin C", "Apple": "Vitamin C"}}, 
  {"id": 19, "name": "Sara", "age": 10, "related_id": 19, "zodiac_sign": "Sagittarius", "country": "Colombia", "values": [101, 102, 103, 104, 105], "fruits_and_vitamins": {"Pineapple": "Vitamin C", "Orange": "Vitamin C", "Mango": "Vitamin A"}}, 
  {"id": 20, "name": "Claudia", "age": 12, "related_id": 10, "zodiac_sign": "Capricorn", "country": "Mexico", "values": [106, 107, 108, 109, 110], "fruits_and_vitamins": {"Peach": "Vitamin A", "Apple": "Vitamin C", "Grapefruit": "Vitamin C"}},
  {"id": 21, "name": "Spark", "age": 3, "related_id": 22, "zodiac_sign": "Aquarius", "country": "Portugal", "values": [51, 52, 53, 54, 55], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Papaya": "Vitamin C", "Mango": "Vitamin A"}}, 
  {"id": 22, "name": "Delta", "age": 7, "related_id": 21, "zodiac_sign": "Pisces", "country": "Rusia", "values": [56, 57, 58, 59, 60], "fruits_and_vitamins": {"Apple": "Vitamin C", "Banana": "Vitamin B6", "Strawberry": "Vitamin C"}}, 
]

# Crear DataFrame con el esquema explícito
spark_dataframe_joins_full = spark_session.createDataFrame(list_data_dict, schema)

# Almacenar en caché el DataFrame para evitar recalcularlo en operaciones subsecuentes, mejorando el rendimiento.
spark_dataframe_joins_full.cache()

# Mostrar el DataFrame con el orden correcto
display(spark_dataframe_joins_full.toPandas())

# elimina columna hashmap para evitar issues con union, except.....
spark_dataframe_joins = spark_dataframe_joins_full.drop("fruits_and_vitamins")

# Almacenar en caché el DataFrame para evitar recalcularlo en operaciones subsecuentes, mejorando el rendimiento.
spark_dataframe_joins.cache()

# Mostrar el DataFrame con el orden correcto
display(spark_dataframe_joins.toPandas())

                                                                                

id,name,age,related_id,zodiac_sign,country,values,fruits_and_vitamins
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,


id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


# DESCOMPONER LISTA Y HASH MAP EN MULTIPLES FILAS Y COMBINARLOS EN NUEVAS FILAS

In [36]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark_dataframe_explode = spark_dataframe_joins_full \
    .withColumn("row_number", F.row_number().over(Window.partitionBy(F.col("country")).orderBy(F.lit(1)))) \
    .withColumn("value", F.explode(F.col("values"))) \
    .withColumn("fruit_vitamin", F.explode(F.map_entries(F.col("fruits_and_vitamins")))) \
    .select(
        "*", 
        F.col("fruit_vitamin.key").alias("fruit"), 
        F.col("fruit_vitamin.value").alias("vitamin")
    ) \
    .drop("fruit_vitamin") 

# muestra el dataframe en version pandas
spark_dataframe_explode.toPandas()

id,name,age,related_id,zodiac_sign,country,values,fruits_and_vitamins,row_number,value,fruit,vitamin
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,


# CREAR MULTIPLES DATASETS

In [37]:
num_parts = 2
spark_dataframe_uno, spark_dataframe_dos = split_spark_dataframe(spark_dataframe_joins, num_parts)

# Almacenar en caché el DataFrame para evitar recalcularlo en operaciones subsecuentes, mejorando el rendimiento.
spark_dataframe_uno.cache()
spark_dataframe_dos.cache()

display(spark_dataframe_uno.toPandas(), spark_dataframe_dos.toPandas())

Successfully created 2 DataFrames.


id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


## CROSS TAB

In [38]:
# Realiza el crosstab entre las 2 columnas
df_crosstab = spark_dataframe_joins.crosstab("age", "zodiac_sign")

df_crosstab.toPandas()

age_zodiac_sign,Aquarius,Aries,Cancer,Capricorn,Gemini,Leo,Libra,Pisces,Sagittarius,Scorpio,Taurus,Virgo
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,


### ADICIONAR COLUMNA SUMATORIA TOTAL

In [39]:
from pyspark.sql import functions as F

# Calcula la suma de todas las columnas y agrega la columna 'total'
df_crosstab_with_total = df_crosstab.select(
    "*",  # Mantén todas las columnas del crosstab
    sum(F.coalesce(F.col(col), F.lit(0)) for col in df_crosstab.columns[1:]).alias(
        "total"
    ),
)

# Ordena el DataFrame de mayor a menor por la columna 'total'
df_crosstab_with_total_sorted = df_crosstab_with_total.orderBy(
    ["total", "age_zodiac_sign"], ascending=[False, True]
)


# Convertir el resultado a un DataFrame de Pandas si es necesario
df_crosstab_with_total_sorted.toPandas()

age_zodiac_sign,Aquarius,Aries,Cancer,Capricorn,Gemini,Leo,Libra,Pisces,Sagittarius,Scorpio,Taurus,Virgo,total
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


## CREAR NUEVAS TABLAS

In [40]:
database_delta = "delta_spark_database"
table_delta_all = "delta_dataframe_all"
table_delta_uno = "delta_dataframe_uno"
table_delta_dos = "delta_dataframe_dos"
dataframe_delta_all = spark_dataframe_joins
dataframe_delta_1 = spark_dataframe_uno
dataframe_delta_2 = spark_dataframe_dos
delta_warehouse_dir = warehouse_dir
partition_by = ['age']

create_delta_table_in_database(
    spark_session,
    database_delta,
    table_delta_all,
    dataframe_delta_all,
    delta_warehouse_dir,
    partition_by
)

create_delta_table_in_database(
    spark_session,
    database_delta,
    table_delta_uno,
    dataframe_delta_1,
    delta_warehouse_dir,
    partition_by
)
create_delta_table_in_database(
    spark_session,
    database_delta,
    table_delta_dos,
    dataframe_delta_2,
    delta_warehouse_dir,
    partition_by
)
list_all_databases_and_tables(spark_session)

Table 'delta_spark_database.delta_dataframe_all' dropped.


24/12/22 02:57:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/12/22 02:57:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/22 02:57:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/22 02:57:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/22 02:57:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/22 02:57:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/22 02:57:04 WARN MemoryManager: Total allocation exceeds 95.0

Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_all'.
Table 'delta_dataframe_all' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_all' with partitioning by ['age'].
Table 'delta_spark_database.delta_dataframe_uno' dropped.
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_uno'.
Table 'delta_dataframe_uno' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_uno' with partitioning by ['age'].
Table 'delta_spark_database.delta_dataframe_dos' dropped.
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_dos'.
Table 'delta_dataframe_dos' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_dos' with partitioning by ['age'].
The following databa

{'default': ['delta_sql'],
 'delta_spark_database': ['delta_dataframe_1',
  'delta_dataframe_2',
  'delta_dataframe_all',
  'delta_dataframe_dos',
  'delta_dataframe_uno']}

# <center> WINDOW FUNCTIONS

### USANDO SPARK SQL

In [41]:
database = "delta_spark_database"
table = "delta_dataframe_all"

query = f"""
SELECT
    country,  -- País de residencia
    age,  -- Edad del usuario
    -- Funciones de ventana aplicadas a la edad y el país
    ROW_NUMBER() OVER (PARTITION BY country ORDER BY age DESC) AS row_number,  -- Asigna un número de fila según la edad dentro del país
    COUNT(*) OVER (PARTITION BY country) AS count_by_country,  -- Cuenta el número de usuarios por país
    RANK() OVER (PARTITION BY country ORDER BY age DESC) AS rank_by_country,  -- Asigna un rango según la edad dentro de cada país, considerando empates
    DENSE_RANK() OVER (PARTITION BY country ORDER BY age DESC) AS dense_rank_by_country,  -- Rango denso por país, sin saltos de rango en caso de empate
    MIN(age) OVER (PARTITION BY country) AS min_age_by_country,  -- Edad mínima dentro de cada país
    MAX(age) OVER (PARTITION BY country) AS max_age_by_country,  -- Edad máxima dentro de cada país
    SUM(age) OVER (PARTITION BY country) AS sum_age_by_country,  -- Suma de las edades dentro de cada país
    AVG(age) OVER (PARTITION BY country) AS avg_age_by_country,  -- Promedio de edades dentro de cada país

    -- Funciones de agrupación para la edad
    NTILE(3) OVER (PARTITION BY country ORDER BY age DESC) AS ntile_by_country,  -- Divide los usuarios por cuartiles según la edad dentro de cada país
    zodiac_sign,  -- Signo zodiacal del usuario
    LAG(zodiac_sign, 1) OVER (PARTITION BY country ORDER BY age DESC) AS lag_zodiac_sign,  -- Edad del usuario anterior dentro de cada país
    LEAD(zodiac_sign, 1) OVER (PARTITION BY country ORDER BY age DESC) AS lead_zodiac_sign,  -- Edad del usuario siguiente dentro de cada país
    FIRST_VALUE(zodiac_sign) OVER (PARTITION BY country ORDER BY age DESC) AS first_zodiac_sign,  -- Primera edad registrada dentro de cada país
    LAST_VALUE(zodiac_sign) OVER (PARTITION BY country ORDER BY age DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_zodiac_sign -- Última edad registrada dentro de cada país, considerando toda la partición

FROM {database}.{table}
ORDER BY country ASC
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

country,age,row_number,count_by_country,rank_by_country,dense_rank_by_country,min_age_by_country,max_age_by_country,sum_age_by_country,avg_age_by_country,ntile_by_country,zodiac_sign,lag_zodiac_sign,lead_zodiac_sign,first_zodiac_sign,last_zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [42]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Aplicar todas las operaciones en una sola sentencia
spark_dataframe_joins.select(
    "country",
    "age",
    F.row_number().over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("row_number"),
    F.count("*").over(Window.partitionBy("country")).alias("count_by_country"),
    F.rank().over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("rank_by_country"),
    F.dense_rank().over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("dense_rank_by_country"),
    F.min("age").over(Window.partitionBy("country")).alias("min_age_by_country"),
    F.max("age").over(Window.partitionBy("country")).alias("max_age_by_country"),
    F.sum("age").over(Window.partitionBy("country")).alias("sum_age_by_country"),
    F.avg("age").over(Window.partitionBy("country")).alias("avg_age_by_country"),
    F.ntile(3).over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("ntile_by_country"),
    "zodiac_sign",
    F.lag("zodiac_sign", 1).over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("lag_zodiac_sign"),
    F.lead("zodiac_sign", 1).over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("lead_zodiac_sign"),
    F.first("zodiac_sign").over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("first_zodiac_sign"),
    F.last("zodiac_sign").over(
        Window.partitionBy("country")
        .orderBy(F.col("age").desc())
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    ).alias("last_zodiac_sign")
).orderBy("country", "row_number").toPandas()

country,age,row_number,count_by_country,rank_by_country,dense_rank_by_country,min_age_by_country,max_age_by_country,sum_age_by_country,avg_age_by_country,ntile_by_country,zodiac_sign,lag_zodiac_sign,lead_zodiac_sign,first_zodiac_sign,last_zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,


# <center> CONCATENATE VALUES AND CREATE ADDITIONAL COLUMNS

### USANDO SPARK SQL

In [43]:
database = "delta_spark_database"
table = "delta_dataframe_all"

query = f"""
WITH NumberedRows AS (
    SELECT
        ROW_NUMBER() OVER (PARTITION BY country ORDER BY id) AS row_number,  -- Asigna números consecutivos por cada país
        (id + age + related_id) AS SUM_NUMBERS,                             -- Suma las columnas numéricas fila por fila
        CONCAT(name, ' ', zodiac_sign, ' ', country) AS CONCATENATE_STRINGS,  -- Une las cadenas con espacios
        values
    FROM {database}.{table}
)
SELECT * FROM NumberedRows;
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

row_number,SUM_NUMBERS,CONCATENATE_STRINGS,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,


### USANDO SPARK DATAFRAME

In [44]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Crear DataFrame procesado
spark_dataframe_joins \
    .withColumn("row_number", F.row_number().over(Window.partitionBy(col("country")).orderBy(F.lit(1)))) \
    .select(
        "row_number",
        (F.col("id") + F.col("age") + F.col("related_id")).alias("SUM_NUMBERS"),
        F.concat_ws(" ", F.col("name"), F.col("zodiac_sign"), F.col("country")).alias("CONCATENATE_STRINGS"),
        F.col("values")
    ) \
    .toPandas()

row_number,SUM_NUMBERS,CONCATENATE_STRINGS,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,


# SEPARAR LOS VALORES DE LA LISTA PARA CADA REGISTRO INDEPENDIENTE

### USANDO SPARK SQL

In [45]:
database = "delta_spark_database"
table = "delta_dataframe_all"

query = f"""
WITH NumberedRows AS (
    SELECT
        ROW_NUMBER() OVER (PARTITION BY country ORDER BY id) AS row_number,  -- Asigna números consecutivos por cada país
        (id + age + related_id) AS SUM_NUMBERS,                             -- Suma las columnas numéricas fila por fila
        CONCAT(name, ' ', zodiac_sign, ' ', country) AS CONCATENATE_STRINGS,  -- Une las cadenas con espacios
        explode(values) AS value                                              -- Expande los elementos del array
    FROM {database}.{table}
)
SELECT * FROM NumberedRows;
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

row_number,SUM_NUMBERS,CONCATENATE_STRINGS,value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,


### USANDO SPARK DATAFRAME

In [46]:
from pyspark.sql.functions import explode

# Crear DataFrame procesado
spark_dataframe_joins \
    .withColumn("row_number", F.row_number().over(Window.partitionBy(col("country")).orderBy(F.lit(1)))) \
    .select(
        "row_number",
        (F.col("id") + F.col("age") + F.col("related_id")).alias("SUM_NUMBERS"),
        F.concat_ws(" ", F.col("name"), F.col("zodiac_sign"), F.col("country")).alias("CONCATENATE_STRINGS"),
        explode(F.col("values")).alias("value")
    ) \
    .toPandas()

row_number,SUM_NUMBERS,CONCATENATE_STRINGS,value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,


# <center> OPERACIONES DE CONJUNTOS

# <center> UNION

### USANDO SPARK SQL

In [47]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT * 
FROM {database}.{table1}

UNION

SELECT * 
FROM {database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


### USANDO SPARK DATAFRAME

In [48]:
spark_dataframe_uno.union(spark_dataframe_dos).toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


# <center> UNION ALL

### USANDO SPARK SQL

In [49]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT * 
FROM {database}.{table1}

UNION ALL

SELECT * 
FROM {database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


### USANDO SPARK DATAFRAME

In [50]:
spark_dataframe_uno.unionByName(spark_dataframe_dos).toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


# <center> INTERCEPT

### USANDO SPARK SQL

In [51]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1}

INTERSECT

SELECT *
FROM {database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


### USANDO SPARK DATAFRAME

In [52]:
spark_dataframe_uno.intersect(spark_dataframe_dos).toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


# <center> EXCEPT

### USANDO SPARK SQL

In [53]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1}

EXCEPT

SELECT *
FROM {database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


### USANDO SPARK DATAFRAME

In [54]:
spark_dataframe_uno.exceptAll(spark_dataframe_dos).toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


In [55]:
spark_dataframe_uno.subtract(spark_dataframe_dos).toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


# <center> RENAME DATAFRAMES COLUMNS FOR EXAMPLES

# <center> SELF JOIN

### USANDO SPARK SQL

In [56]:
database = "delta_spark_database"
table = "delta_dataframe_uno"

query = f"""
SELECT *
FROM {database}.{table} AS TABLE_UNO_1
INNER JOIN {database}.{table} AS TABLE_UNO_2
    ON TABLE_UNO_1.id = TABLE_UNO_2.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values,id.1,name.1,age.1,related_id.1,zodiac_sign.1,country.1,values.1
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [57]:
from pyspark.sql import functions as F

# Obtener todos los nombres de las columnas de ambos DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

# Realizamos el self join
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "inner"
    ) \
    .select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
    .toPandas()

spark_dataframe_uno.id,spark_dataframe_uno.name,spark_dataframe_uno.age,spark_dataframe_uno.related_id,spark_dataframe_uno.zodiac_sign,spark_dataframe_uno.country,spark_dataframe_uno.values,spark_dataframe_dos.id,spark_dataframe_dos.name,spark_dataframe_dos.age,spark_dataframe_dos.related_id,spark_dataframe_dos.zodiac_sign,spark_dataframe_dos.country,spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


# <center> INNER JOIN

### USANDO SPARK SQL

In [58]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
INNER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values,id.1,name.1,age.1,related_id.1,zodiac_sign.1,country.1,values.1
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [59]:
# Obtener todos los nombres de las columnas de ambos DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

spark_dataframe_uno.alias("spark_dataframe_uno").join(
    spark_dataframe_dos.alias("spark_dataframe_dos"),
    col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"),
    "inner"
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas()

spark_dataframe_uno.id,spark_dataframe_uno.name,spark_dataframe_uno.age,spark_dataframe_uno.related_id,spark_dataframe_uno.zodiac_sign,spark_dataframe_uno.country,spark_dataframe_uno.values,spark_dataframe_dos.id,spark_dataframe_dos.name,spark_dataframe_dos.age,spark_dataframe_dos.related_id,spark_dataframe_dos.zodiac_sign,spark_dataframe_dos.country,spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


# <center> LEFT JOIN

### USANDO SPARK SQL

In [60]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
LEFT OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values,id.1,name.1,age.1,related_id.1,zodiac_sign.1,country.1,values.1
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [61]:
# Obtener todos los nombres de las columnas de ambos DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "left"
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas()

spark_dataframe_uno.id,spark_dataframe_uno.name,spark_dataframe_uno.age,spark_dataframe_uno.related_id,spark_dataframe_uno.zodiac_sign,spark_dataframe_uno.country,spark_dataframe_uno.values,spark_dataframe_dos.id,spark_dataframe_dos.name,spark_dataframe_dos.age,spark_dataframe_dos.related_id,spark_dataframe_dos.zodiac_sign,spark_dataframe_dos.country,spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


# <center> ANTI LEFT JOIN

### USANDO SPARK SQL

In [62]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
LEFT ANTI JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


In [63]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT TABLE_UNO.*
FROM {database}.{table1} AS TABLE_UNO
LEFT OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
WHERE TABLE_DOS.related_id IS NULL
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


### USANDO SPARK DATAFRAME

In [64]:
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "left_anti"
    ) \
    .toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


# <center> RIGHT JOIN

### USANDO SPARK SQL

In [65]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
RIGHT OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values,id.1,name.1,age.1,related_id.1,zodiac_sign.1,country.1,values.1
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [66]:
# Obtener todos los nombres de las columnas de ambos DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "right"
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas()

spark_dataframe_uno.id,spark_dataframe_uno.name,spark_dataframe_uno.age,spark_dataframe_uno.related_id,spark_dataframe_uno.zodiac_sign,spark_dataframe_uno.country,spark_dataframe_uno.values,spark_dataframe_dos.id,spark_dataframe_dos.name,spark_dataframe_dos.age,spark_dataframe_dos.related_id,spark_dataframe_dos.zodiac_sign,spark_dataframe_dos.country,spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


# <center> ANTI RIGHT JOIN

### USANDO SPARK SQL

In [67]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT TABLE_DOS.*
FROM {database}.{table1} AS TABLE_UNO
RIGHT OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
WHERE TABLE_UNO.id IS NULL
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


### USANDO SPARK DATAFRAME

In [68]:
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "right"
    ) \
    .filter(F.col("spark_dataframe_uno.id").isNull()) \
    .select([F.col(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]) \
    .toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


### INVIRTIENDO LAS TABLAS Y USANDO EL ANTILEFT JOIN

In [69]:
spark_dataframe_dos.alias("spark_dataframe_dos") \
    .join(
        spark_dataframe_uno.alias("spark_dataframe_uno"),
        F.col("spark_dataframe_dos.id") == F.col("spark_dataframe_uno.related_id"),
        "left_anti"
    ) \
    .toPandas()

id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


# <center> OUTER JOIN

### USANDO SPARK SQL

In [70]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
FULL OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values,id.1,name.1,age.1,related_id.1,zodiac_sign.1,country.1,values.1
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [71]:
# Obtener todos los nombres de las columnas de ambos DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

# Realizamos la unión con los alias y seleccionamos todas las columnas automáticamente
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),
        "outer"
    ) \
    .select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
    .toPandas()  # Convertir el resultado a un DataFrame de Pandas

spark_dataframe_uno.id,spark_dataframe_uno.name,spark_dataframe_uno.age,spark_dataframe_uno.related_id,spark_dataframe_uno.zodiac_sign,spark_dataframe_uno.country,spark_dataframe_uno.values,spark_dataframe_dos.id,spark_dataframe_dos.name,spark_dataframe_dos.age,spark_dataframe_dos.related_id,spark_dataframe_dos.zodiac_sign,spark_dataframe_dos.country,spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


# <center> ANTI OUTER JOIN

### USANDO SPARK SQL

In [72]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

# Ejecutar la consulta SQL
query = f"""
SELECT 
    TABLE_UNO.*, 
    TABLE_DOS.* 
FROM {database}.{table1} AS TABLE_UNO
FULL OUTER JOIN {database}.{table2} AS TABLE_DOS
    ON TABLE_UNO.id = TABLE_DOS.related_id
WHERE TABLE_UNO.id IS NULL
    OR TABLE_DOS.related_id IS NULL
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values,id.1,name.1,age.1,related_id.1,zodiac_sign.1,country.1,values.1
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [73]:
# Obtener todos los nombres de las columnas de ambos DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

spark_dataframe_uno.alias("spark_dataframe_uno") \
    .join(
        spark_dataframe_dos.alias("spark_dataframe_dos"),  # Alias de spark_dataframe_dos
        F.col("spark_dataframe_uno.id") == F.col("spark_dataframe_dos.related_id"),  # Condición de unión
        "outer"  # Tipo de unión: OUTER JOIN
    ) \
    .filter(
        (
            (F.col("spark_dataframe_uno.id").isNotNull() & F.col("spark_dataframe_dos.related_id").isNull())  # Condición cuando spark_dataframe_uno.id no es nulo y spark_dataframe_dos.related_id es nulo
            | (F.col("spark_dataframe_uno.id").isNull() & F.col("spark_dataframe_dos.related_id").isNotNull())  # Condición cuando spark_dataframe_uno.id es nulo y spark_dataframe_dos.related_id no es nulo
        )
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas()

spark_dataframe_uno.id,spark_dataframe_uno.name,spark_dataframe_uno.age,spark_dataframe_uno.related_id,spark_dataframe_uno.zodiac_sign,spark_dataframe_uno.country,spark_dataframe_uno.values,spark_dataframe_dos.id,spark_dataframe_dos.name,spark_dataframe_dos.age,spark_dataframe_dos.related_id,spark_dataframe_dos.zodiac_sign,spark_dataframe_dos.country,spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


# <center> CROSS JOIN

### USANDO SPARK SQL

In [74]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

# RESTORE TABLE {database}.{table1} TO VERSION AS OF {version};
query = f"""
SELECT * FROM {database}.{table1}
CROSS JOIN
{database}.{table2}
"""

spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values,id.1,name.1,age.1,related_id.1,zodiac_sign.1,country.1,values.1
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


In [75]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"

query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
CROSS JOIN {database}.{table2} AS TABLE_DOS
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()

id,name,age,related_id,zodiac_sign,country,values,id.1,name.1,age.1,related_id.1,zodiac_sign.1,country.1,values.1
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


### USANDO SPARK DATAFRAME

In [76]:
from pyspark.sql import functions as F

# Obtener todos los nombres de las columnas de ambos DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]

# Realizamos el CROSS JOIN con los alias y seleccionamos todas las columnas automáticamente
spark_dataframe_uno.alias("spark_dataframe_uno") \
    .crossJoin(spark_dataframe_dos.alias("spark_dataframe_dos")) \
    .select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
    .toPandas()  # Convertir el resultado a un DataFrame de Pandas

spark_dataframe_uno.id,spark_dataframe_uno.name,spark_dataframe_uno.age,spark_dataframe_uno.related_id,spark_dataframe_uno.zodiac_sign,spark_dataframe_uno.country,spark_dataframe_uno.values,spark_dataframe_dos.id,spark_dataframe_dos.name,spark_dataframe_dos.age,spark_dataframe_dos.related_id,spark_dataframe_dos.zodiac_sign,spark_dataframe_dos.country,spark_dataframe_dos.values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,


# <center> REALIZAR LOS JOINs USANDO LA FUNCION CON PARAMETROS 

# RENAME DATAFRAMES COLUMNS FOR EXAMPLES

In [77]:
spark_dataframe_uno_renamed = spark_dataframe_uno.select(
    spark_dataframe_uno["id"].alias("spark_dataframe_uno_id"),
    spark_dataframe_uno["name"].alias("spark_dataframe_uno_name"),
    spark_dataframe_uno["age"].alias("spark_dataframe_uno_age"),
    spark_dataframe_uno["related_id"].alias("spark_dataframe_uno_related_id"),
    spark_dataframe_uno["zodiac_sign"].alias("spark_dataframe_uno_zodiac_sign"),
    spark_dataframe_uno["country"].alias("spark_dataframe_uno_country"),
    spark_dataframe_uno["values"].alias("spark_dataframe_uno_values"),
)

spark_dataframe_dos_renamed = spark_dataframe_dos.select(
    spark_dataframe_dos["id"].alias("spark_dataframe_dos_id"),
    spark_dataframe_dos["name"].alias("spark_dataframe_dos_name"),
    spark_dataframe_dos["age"].alias("spark_dataframe_dos_age"),
    spark_dataframe_dos["related_id"].alias("spark_dataframe_dos_related_id"),
    spark_dataframe_dos["zodiac_sign"].alias("spark_dataframe_dos_zodiac_sign"),
    spark_dataframe_dos["country"].alias("spark_dataframe_dos_country"),
    spark_dataframe_dos["values"].alias("spark_dataframe_dos_values"),
)

In [78]:
spark_dataframe_uno_renamed.toPandas()

spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


In [79]:
joins = [
    "inner_join",
    "left_join",
    "anti_left_join",
    "right_join",
    "anti_right_join",
    "outer_join",
    "anti_outer_join",
    "cross_join",
    "intersect",
    "except",
    "union",
    "union_all",
]

for join in joins:
    join_results = join_spark_dataframes(
        left_df=spark_dataframe_uno_renamed,
        right_df=spark_dataframe_dos_renamed,
        join_type=join,
        left_column="spark_dataframe_uno_id",
        right_column="spark_dataframe_dos_related_id",
        return_pandas=True,
    )

    print(f"Join -> {join}")
    display(join_results)
    print()

print("Join -> self")
display(join_spark_dataframes(
        left_df=spark_dataframe_uno_renamed,
        right_df=spark_dataframe_dos_renamed,
        join_type='self',
        left_column="spark_dataframe_uno_id",
        right_column="spark_dataframe_uno_related_id",
        return_pandas=True,
    ))
print()

Join -> inner_join


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values,spark_dataframe_dos_id,spark_dataframe_dos_name,spark_dataframe_dos_age,spark_dataframe_dos_related_id,spark_dataframe_dos_zodiac_sign,spark_dataframe_dos_country,spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,



Join -> left_join


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values,spark_dataframe_dos_id,spark_dataframe_dos_name,spark_dataframe_dos_age,spark_dataframe_dos_related_id,spark_dataframe_dos_zodiac_sign,spark_dataframe_dos_country,spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,



Join -> anti_left_join


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Join -> right_join


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values,spark_dataframe_dos_id,spark_dataframe_dos_name,spark_dataframe_dos_age,spark_dataframe_dos_related_id,spark_dataframe_dos_zodiac_sign,spark_dataframe_dos_country,spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,



Join -> anti_right_join


spark_dataframe_dos_id,spark_dataframe_dos_name,spark_dataframe_dos_age,spark_dataframe_dos_related_id,spark_dataframe_dos_zodiac_sign,spark_dataframe_dos_country,spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Join -> outer_join


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values,spark_dataframe_dos_id,spark_dataframe_dos_name,spark_dataframe_dos_age,spark_dataframe_dos_related_id,spark_dataframe_dos_zodiac_sign,spark_dataframe_dos_country,spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,



Join -> anti_outer_join


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values,spark_dataframe_dos_id,spark_dataframe_dos_name,spark_dataframe_dos_age,spark_dataframe_dos_related_id,spark_dataframe_dos_zodiac_sign,spark_dataframe_dos_country,spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,



Join -> cross_join


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values,spark_dataframe_dos_id,spark_dataframe_dos_name,spark_dataframe_dos_age,spark_dataframe_dos_related_id,spark_dataframe_dos_zodiac_sign,spark_dataframe_dos_country,spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,



Join -> intersect


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Join -> except


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Join -> union


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Join -> union_all


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values,spark_dataframe_dos_id,spark_dataframe_dos_name,spark_dataframe_dos_age,spark_dataframe_dos_related_id,spark_dataframe_dos_zodiac_sign,spark_dataframe_dos_country,spark_dataframe_dos_values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,



Join -> self


spark_dataframe_uno_id,spark_dataframe_uno_name,spark_dataframe_uno_age,spark_dataframe_uno_related_id,spark_dataframe_uno_zodiac_sign,spark_dataframe_uno_country,spark_dataframe_uno_values,spark_dataframe_uno_id.1,spark_dataframe_uno_name.1,spark_dataframe_uno_age.1,spark_dataframe_uno_related_id.1,spark_dataframe_uno_zodiac_sign.1,spark_dataframe_uno_country.1,spark_dataframe_uno_values.1
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,





In [80]:
list_all_databases_and_tables(spark_session)

The following databases and tables are present in the Spark Catalog.

Database: default, Table: delta_sql

Database: delta_spark_database, Table: delta_dataframe_1

Database: delta_spark_database, Table: delta_dataframe_2

Database: delta_spark_database, Table: delta_dataframe_all

Database: delta_spark_database, Table: delta_dataframe_dos

Database: delta_spark_database, Table: delta_dataframe_uno



{'default': ['delta_sql'],
 'delta_spark_database': ['delta_dataframe_1',
  'delta_dataframe_2',
  'delta_dataframe_all',
  'delta_dataframe_dos',
  'delta_dataframe_uno']}

# <center> OPERACIONES CRUD SPARK

In [81]:
file_path_delta_1 = 'warehouse-spark/spark_catalog/database/delta_spark_database.db/delta_dataframe_uno'

### INSERT

In [82]:
# Llamar a la función para insertar los datos en la tabla Delta dentro de la base de datos 'delta_spark_database'
insert_into_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno", spark_dataframe_dos)

write_into_delta_lake(spark_session, file_path_delta_1, spark_dataframe_dos)

Records inserted into Delta table: delta_spark_database.delta_dataframe_uno
No new rows to append.


### UPDATE

In [83]:
update_in_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno", 
                      "zodiac_sign = 'Capricorn'", {"zodiac_sign": "Capricornio"})

update_in_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno",
                      "zodiac_sign = 'Cancer'", {"zodiac_sign": "Capricornio"})

update_from_delta_lake(spark_session, file_path_delta_1, "age >= 15", {"age": "age * 1.5", "name": "UPPER(name)"})

Records matching condition 'zodiac_sign = 'Capricorn'' updated in Delta table: delta_spark_database.delta_dataframe_uno
Records matching condition 'zodiac_sign = 'Cancer'' updated in Delta table: delta_spark_database.delta_dataframe_uno
Updated records with condition 'age >= 15'.


format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,


### DELETE

In [84]:
delete_condition = "id > 20"

# Llamada a la función
delete_from_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno", delete_condition)

delete_from_delta_lake(spark_session, file_path_delta_1, 'age >= 40')

Records matching condition 'id > 20' deleted from Delta table: delta_spark_database.delta_dataframe_uno
No records match the condition 'age >= 40'.


id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,


# RESTORE VERSION

In [85]:
restore_delta_lake_to_version(spark_session, file_path_delta_1, 1)

24/12/22 02:57:32 WARN DAGScheduler: Broadcasting large task binary with size 1078.9 KiB


Restored to version 1.


format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,


### MERGE

In [86]:
# Sample data
data0 = [
    {
        "id": 23, "name": "Sun", "age": 13, "related_id": 7, "zodiac_sign": "Leo", "country": "Brazil", "values": [31, 13, 3, 7, 11]
    },
    {
        "id": 24, "name": "Moon", "age": 31, "related_id": 7, "zodiac_sign": "Capricorn", "country": "Bolivia", "values": [12, 13, 27, 31]
    },
    {
        "id": 25, "name": "Earth", "age": 25, "related_id": 7, "zodiac_sign": "Taurus", "country": "USA", "values": [15, 28, 7]
    },
    {
        "id": 26, "name": "Mars", "age": 35, "related_id": 8, "zodiac_sign": "Aries", "country": "Argentina", "values": [18, 21, 4]
    }, 
]

data1 = [
    {
        "id": 27, "name": "Venus", "age": 29, "related_id": 5, "zodiac_sign": "Libra", "country": "Argentina", "values": [19, 25, 8, 11]
    },
    {
        "id": 28, "name": "Jupiter", "age": 45, "related_id": 6, "zodiac_sign": "Sagittarius", "country": "Brazil", "values": [23, 30, 12]
    },
    {
        "id": 29, "name": "Saturn", "age": 60, "related_id": 3, "zodiac_sign": "Aquarius", "country": "Peru", "values": [17, 35, 22, 40]
    },
    {
        "id": 30, "name": "Uranus", "age": 28, "related_id": 9, "zodiac_sign": "Scorpio", "country": "Uruguay", "values": [20, 26, 14]
    },
    {
        "id": 31, "name": "Neptune", "age": 50, "related_id": 4, "zodiac_sign": "Pisces", "country": "Boliva", "values": [29, 19, 33]
    }
]

data2 = [
    {
        "id": 32, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Cancer", "country": "Peru", "values": [8, 13, 5]
    },
    {
        "id": 33, "name": "Ceres", "age": 38, "related_id": 10, "zodiac_sign": "Virgo", "country": "Brazil", "values": [21, 18, 6]
    },
    {
        "id": 34, "name": "Eris", "age": 22, "related_id": 11, "zodiac_sign": "Gemini", "country": "Peru", "values": [15, 25, 12]
    },
    {
        "id": 35, "name": "Haumea", "age": 19, "related_id": 1, "zodiac_sign": "Aries", "country": "Mexico", "values": [10, 14, 7, 9]
    },
    {
        "id": 36, "name": "Makemake", "age": 27, "related_id": 12, "zodiac_sign": "Taurus", "country": "Colombia", "values": [18, 22, 11]
    }
]

# Define schema for data (this step is optional, but ensures correct data types)
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("related_id", IntegerType(), True),
    StructField("zodiac_sign", StringType(), True),
    StructField("country", StringType(), True),
    StructField("values", ArrayType(IntegerType()), True),
])

# Create a DataFrame from the sample data

for data in [data0, data1, data2]:
    new_data_df = spark_session.createDataFrame(data, schema)
    display(merge_from_delta_lake(spark_session, file_path_delta_1, new_data_df, "id"))

Merge operation completed successfully.


None

Merge operation completed successfully.


None

Merge operation completed successfully.


None

# INSERT DATA AVOING DUPLICATES

In [87]:
data2 = [
    {
        "id": 32, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Cancer", "country": "Peru", "values": [8, 13, 5]
    },
    {
        "id": 33, "name": "Ceres", "age": 38, "related_id": 10, "zodiac_sign": "Virgo", "country": "Brazil", "values": [21, 18, 6]
    },
    {
        "id": 99, "name": "Eris", "age": 22, "related_id": 11, "zodiac_sign": "Gemini", "country": "Peru", "values": [15, 25, 12]
    },
    {
        "id": 35, "name": "Haumea", "age": 19, "related_id": 1, "zodiac_sign": "Aries", "country": "Mexico", "values": [10, 14, 7, 9]
    },
    {
        "id": 36, "name": "Makemake", "age": 27, "related_id": 12, "zodiac_sign": "Taurus", "country": "Colombia", "values": [18, 22, 11]
    }
]


for data in [data0, data1, data2]:
    new_data_df = spark_session.createDataFrame(data, schema)
    display(write_into_delta_lake(spark_session, file_path_delta_1, new_data_df))

No new rows to append.


None

No new rows to append.


None

Added new data without duplicates.


None

# EVOLUTION SCHEMA

In [88]:
evolution_schema = [
    {
        "id": 36, "name": "Makemake", "age": 27, "related_id": 12, "zodiac_sign": "Taurus", "country": "Colombia", "values": [18, 22, 11], "color":"Blue"
    },
    {
        "id": 1227, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Cancer", "country": "Peru", "values": [8, 13, 5], "fruit": "Guama"
    }
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("related_id", IntegerType(), True),
    StructField("zodiac_sign", StringType(), True),
    StructField("country", StringType(), True),
    StructField("values", ArrayType(IntegerType()), True),
    StructField("color", StringType(), True),
    StructField("fruit", StringType(), True),
])

for data in [evolution_schema]:
    new_data_df = spark_session.createDataFrame(data, schema)
    display(write_into_delta_lake(spark_session, file_path_delta_1, new_data_df))


evolution_schema_2 = [
    {
    "id": 7831, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Capricorn", "country": "Peru", "values": [8, 13, 5], "continent": "America"
    },
        {
    "id": 7831, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Leo", "country": "Peru", "values": [8, 13, 5], "continent": "America", "sport":"Tennis" 
    }
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("related_id", IntegerType(), True),
    StructField("zodiac_sign", StringType(), True),
    StructField("country", StringType(), True),
    StructField("values", ArrayType(IntegerType()), True),
    StructField("continent", StringType(), True),
])

for data in [evolution_schema_2]:
    new_data_df = spark_session.createDataFrame(data, schema)
    merge_from_delta_lake(spark_session, file_path_delta_1, new_data_df, "id")

Added new data without duplicates.


None

Merge operation completed successfully.


# Review the version history to explore Delta Lake's time travel functionality.

In [89]:
historic_version = show_historic_version_from_delta_file(spark_session, file_path_delta_1)
historic_version

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,


# CONTENT CHANGES BY VERSION Method 1 - History Version

In [90]:
total_versions = max(historic_version.version)
total_operations = historic_version.operation.to_list()
total_operations.reverse()

for version in range(total_versions + 1):
    print(f"Version : {version}, Operation : {total_operations[version]}, Content")
    display(show_historic_version_from_delta_file(spark_session, file_path_delta_1, version, None, 'id'))
    print()

Version : 0, Operation : CREATE OR REPLACE TABLE AS SELECT, Content


Unnamed: 0,id,name,age,related_id,zodiac_sign,country,values
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,



Version : 1, Operation : SET TBLPROPERTIES, Content


age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Version : 2, Operation : WRITE, Content


Unnamed: 0,age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,



Version : 3, Operation : OPTIMIZE, Content


age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Version : 4, Operation : UPDATE, Content


Unnamed: 0,age,country,id,name,related_id,values,zodiac_sign,ChangeType
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,



Version : 5, Operation : UPDATE, Content


Unnamed: 0,age,country,id,name,related_id,values,zodiac_sign,ChangeType
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,



Version : 6, Operation : UPDATE, Content


Unnamed: 0,age,country,id,name,related_id,values,zodiac_sign,ChangeType
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,



Version : 7, Operation : DELETE, Content


Unnamed: 0,age,country,id,name,related_id,values,zodiac_sign,ChangeType
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,



Version : 8, Operation : RESTORE, Content


Unnamed: 0,age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,



Version : 9, Operation : MERGE, Content


Unnamed: 0,age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,



Version : 10, Operation : OPTIMIZE, Content


age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Version : 11, Operation : MERGE, Content


Unnamed: 0,age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,



Version : 12, Operation : MERGE, Content


Unnamed: 0,age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,



Version : 13, Operation : OPTIMIZE, Content


age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Version : 14, Operation : WRITE, Content


age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Version : 15, Operation : OPTIMIZE, Content


age,country,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,



Version : 16, Operation : WRITE, Content


age,color,country,fruit,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,



Version : 17, Operation : OPTIMIZE, Content


age,color,country,fruit,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,



Version : 18, Operation : MERGE, Content


age,color,continent,country,fruit,id,name,related_id,values,zodiac_sign
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,





# CONTENT CHANGES BY VERSION Method 2 - Change Data Feed Version Validation and Control

In [91]:
total_versions = max(historic_version.version)
total_operations = historic_version.operation.to_list()
total_operations.reverse()

for version in range(total_versions + 1):
    print(f"Version : {version}, Operation : {total_operations[version]}, Content")
    display(read_delta_table_with_change_data_control(spark_session, file_path_delta_1, version, version))
    print()


Version : 0, Operation : CREATE OR REPLACE TABLE AS SELECT, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 1, Operation : SET TBLPROPERTIES, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 2, Operation : WRITE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 3, Operation : OPTIMIZE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 4, Operation : UPDATE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 5, Operation : UPDATE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 6, Operation : UPDATE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 7, Operation : DELETE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 8, Operation : RESTORE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 9, Operation : MERGE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 10, Operation : OPTIMIZE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 11, Operation : MERGE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 12, Operation : MERGE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 13, Operation : OPTIMIZE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 14, Operation : WRITE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 15, Operation : OPTIMIZE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 16, Operation : WRITE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 17, Operation : OPTIMIZE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,



Version : 18, Operation : MERGE, Content


id,name,age,related_id,zodiac_sign,country,values,color,fruit,continent,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,





# Review the version history FROM FILE

# DESCARGAR ARCHIVOS

In [92]:
import pandas as pd
import os
import gc

# Ruta del directorio
file_path = "documents/"
file_text_1 = "https://raw.githubusercontent.com/JorgeCardona/recursos/refs/heads/main/delta-spark/documents/IngenieriaDeDatos.txt"
file_text_2 = "https://raw.githubusercontent.com/JorgeCardona/recursos/refs/heads/main/delta-spark/documents/DeltaLake.txt"
file_text_3 = "https://raw.githubusercontent.com/JorgeCardona/recursos/refs/heads/main/delta-spark/documents/MLOps.txt"

# Crear el directorio si no existe
os.makedirs(file_path, exist_ok=True)

for file_url in [file_text_1, file_text_2, file_text_3]:
    # Leer el archivo de texto desde la URL
    pandas_df = pd.read_csv(file_url, encoding="utf-8")

    # Obtener el nombre del archivo
    file_name = file_url.split("/")[-1]
    print(file_name)

    # Ruta para guardar el archivo
    save_path = f"{file_path}/{file_name}"

    # Guardar el archivo de texto sin modificaciones
    pandas_df.to_csv(save_path, index=False, encoding="utf-8")

    print(f"Archivo descargado: {file_name} y guardado en el directorio -> {save_path}")

# Liberar memoria eliminando el DataFrame
del pandas_df
gc.collect()  # Forzar recolección de basura

print("Pandas DataFrame eliminado de la memoria.")


IngenieriaDeDatos.txt
Archivo descargado: IngenieriaDeDatos.txt y guardado en el directorio -> documents//IngenieriaDeDatos.txt
DeltaLake.txt
Archivo descargado: DeltaLake.txt y guardado en el directorio -> documents//DeltaLake.txt
MLOps.txt
Archivo descargado: MLOps.txt y guardado en el directorio -> documents//MLOps.txt
Pandas DataFrame eliminado de la memoria.


### PROCESAR ARCHIVOS

In [93]:
# Leer el archivo de texto como un DataFrame de Spark
df_text_1 = spark_session.read.text("documents/IngenieriaDeDatos.txt")
df_text_2 = spark_session.read.text("documents/DeltaLake.txt")
df_text_3 = spark_session.read.text("documents/MLOps.txt")

delta_lake_text_path = "warehouse-spark/spark_files/delta_parquet/texto"

# Guardar el DataFrame combinado en una tabla Delta
for new_data_df in [df_text_1, df_text_2, df_text_3]:
    display(write_into_delta_lake(spark_session, delta_lake_text_path, new_data_df))

spark_session.read.format("delta").load(delta_lake_text_path).toPandas()

warehouse-spark/spark_files/delta_parquet/texto does not contain a Delta table.
Created Delta table with new data.


None

Added new data without duplicates.


None

Added new data without duplicates.


None

value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)


# Method 1 - History Version

In [94]:
historic_version_file = show_historic_version_from_delta_file(spark_session, delta_lake_text_path)
historic_version_file

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,


In [95]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(show_historic_version_from_delta_file(spark_session, delta_lake_text_path, version))
    print()

Version : 0, Operation : WRITE, Content


value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)



Version : 1, Operation : WRITE, Content


value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)



Version : 2, Operation : OPTIMIZE, Content


value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)



Version : 3, Operation : WRITE, Content


value
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?)





# Method 2 - Change Data Feed Version Validation and Control

In [96]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(read_delta_table_with_change_data_control(spark_session, delta_lake_text_path, version, version))
    print()

Version : 0, Operation : WRITE, Content


value,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,



Version : 1, Operation : WRITE, Content


value,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,



Version : 2, Operation : OPTIMIZE, Content


value,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,



Version : 3, Operation : WRITE, Content


value,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,





# DELTA STREAMING

In [97]:
import pandas as pd
import numpy as np
from time import sleep
import os

def process_and_save_files(
    csv_streaming_path, 
    base_streaming_directory, 
    metadata_streaming_path, 
    delta_output_streaming_path, 
    file_name='flight_logs', 
    num_files=5, 
    sleep_time=10, 
    raw_url_base='https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/', 
    file_format='csv', 
    multiline_json=False
):
    """
    Processes and saves streaming files in the specified formats (csv, json, or parquet), merging data from two parts
    based on the 'id' or 'flight_id' column, and saves the final merged file.

    Parameters:
    csv_streaming_path (str): Path to save the processed CSV files.
    base_streaming_directory (str): Base directory for file storage.
    metadata_streaming_path (str): Path for metadata storage.
    delta_output_streaming_path (str): Path to save Delta format files.
    file_name (str, optional): Base name for the files. Default is 'flight_logs'.
    num_files (int, optional): Number of files to process. Default is 5.
    sleep_time (int, optional): Time in seconds to wait before processing the next file. Default is 10 seconds.
    raw_url_base (str, optional): Base URL for downloading files. Default is a GitHub URL.
    file_format (str, optional): File format to process ('csv', 'json', 'parquet'). Default is 'csv'.
    multiline_json (bool, optional): Whether to handle JSON files as multiline. Default is False.

    Raises:
    ValueError: If the file format is unsupported.
    """
    # Create directories if they do not exist
    os.makedirs(csv_streaming_path, exist_ok=True)
    os.makedirs(metadata_streaming_path, exist_ok=True)
    os.makedirs(delta_output_streaming_path, exist_ok=True)

    # Process files
    for index in range(1, num_files + 1):
        final_file = f'{csv_streaming_path}/{file_name}_{index}.{file_format}'

        # Download and read the first part of the file in the appropriate format
        file_1 = f'{file_name}_part_1_{index}.{file_format}'
        base_url1 = f'{raw_url_base}{file_1}'

        if file_format == 'csv':
            df1 = pd.read_csv(base_url1)
        elif file_format == 'json':
            if multiline_json:
                df1 = pd.read_json(base_url1, lines=True)  # For multiline JSON
            else:
                df1 = pd.read_json(base_url1)  # For JSON in list
        elif file_format == 'parquet':
            df1 = pd.read_parquet(base_url1)
        else:
            raise ValueError(f"Unsupported file format {file_format}")

        # Download and read the second part of the file in the appropriate format
        file_2 = f'{file_name}_part_2_{index}.{file_format}'
        base_url2 = f'{raw_url_base}{file_2}'

        if file_format == 'csv':
            df2 = pd.read_csv(base_url2)
        elif file_format == 'json':
            if multiline_json:
                df2 = pd.read_json(base_url2, lines=True)  # For multiline JSON
            else:
                df2 = pd.read_json(base_url2)  # For JSON in list
        elif file_format == 'parquet':
            df2 = pd.read_parquet(base_url2)
        else:
            raise ValueError(f"Unsupported file format {file_format}")

        # Merge the data on the 'id' or 'flight_id' column
        df3 = pd.merge(df1, df2, left_on='id', right_on='flight_id', how='inner')

        # Save the final file in the appropriate format
        if file_format == 'csv':
            df3.to_csv(f'{final_file}', index=False)
        elif file_format == 'json':
            df3.to_json(f'{final_file}', orient='records', lines=True)
        elif file_format == 'parquet':
            df3.to_parquet(f'{final_file}')
        else:
            raise ValueError(f"Unsupported file format {file_format}")

        print(f'{final_file} saved Successfully!!')
        
        # Pause for the specified time
        sleep(sleep_time)


# DEFINE LA ESTRUCTIRA DE LOS DATOS A LEER, PARA ASIGNAR EL ESQUEMA ADECUADO

In [98]:
import shutil
from datetime import datetime

from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, lit
from pyspark.sql.types import (
    DoubleType,
    FloatType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

# Define the schema
customSchema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("secure_code", StringType(), True),
        StructField("airline", StringType(), True),
        StructField("departure_city", StringType(), True),
        StructField("departure_date", StringType(), True),
        StructField("arrival_airport", StringType(), True),
        StructField("arrival_city", StringType(), True),
        StructField("arrival_time", StringType(), True),
        StructField("passenger_name", StringType(), True),
        StructField("passenger_gender", StringType(), True),
        StructField("seat_number", StringType(), True),
        StructField("currency", StringType(), True),
        StructField("departure_gate", StringType(), True),
        StructField("flight_status", StringType(), True),
        StructField("co_pilot_name", StringType(), True),
        StructField("aircraft_type", StringType(), True),
        StructField("fuel_consumption", DoubleType(), True),
        StructField("flight_id", IntegerType(), True),
        StructField("flight_number", IntegerType(), True),
        StructField("departure_airport", StringType(), True),
        StructField("departure_country", StringType(), True),
        StructField("departure_time", StringType(), True),
        StructField("arrival_country", StringType(), True),
        StructField("arrival_date", StringType(), True),
        StructField("flight_duration", DoubleType(), True),
        StructField("passenger_age", IntegerType(), True),
        StructField("passenger_nationality", StringType(), True),
        StructField("ticket_price", DoubleType(), True),
        StructField("baggage_weight", DoubleType(), True),
        StructField("arrival_gate", StringType(), True),
        StructField("pilot_name", StringType(), True),
        StructField("cabin_crew_count", IntegerType(), True),
        StructField("aircraft_registration", StringType(), True),
        StructField("flight_distance", DoubleType(), True),
    ]
)

# EJECUTA LA VERSION DE DELTA STREAMING

In [99]:
import time
from datetime import datetime
from pyspark.sql.functions import upper

def process_streaming_files(
    spark_session,
    input_path,
    output_path,
    checkpoint_path,
    file_format="csv",
    custom_schema=None,
    header=True,
    output_mode="append",
    idle_timeout=20,
    multiline_json=False,  # For multiline JSON
    capitalized_column="departure_city"  # New parameter for the column to capitalize
):
    """
    Processes streaming files from the specified input path, handles different file formats (csv, json, parquet),
    and writes the processed data to an output path in Delta format. It also handles checkpoints and manages idle timeout.
    Additionally, it capitalizes the values of the specified column before writing the stream.

    Parameters:
    spark_session (SparkSession): The active Spark session used to run the streaming job.
    input_path (str): The path where the streaming files are located.
    output_path (str): The path where the processed Delta files will be written.
    checkpoint_path (str): The path to store the checkpoint data.
    file_format (str, optional): The format of the input files ('csv', 'json', or 'parquet'). Default is 'csv'.
    custom_schema (StructType, optional): The schema to apply to the incoming data. Default is None.
    header (bool, optional): Whether to include the header in CSV files. Default is True.
    output_mode (str, optional): The output mode for the streaming job. Default is 'append'.
    idle_timeout (int, optional): The timeout in seconds to stop the stream if no new data is processed. Default is 20 seconds.
    multiline_json (bool, optional): Whether to handle JSON files as multiline. Default is False.
    capitalized_column (str, optional): The name of the column whose values should be capitalized. Default is 'departure_city'.

    Raises:
    ValueError: If the file format is unsupported.
    """
    
    # Set up the stream reader depending on the file format
    stream_reader = spark_session.readStream.format(file_format)
    
    if file_format == "csv":
        stream_reader = stream_reader.option("header", str(header).lower())
    elif file_format == "json":
        if multiline_json:
            stream_reader = stream_reader.option("multiline", "true")  # For multiline JSON
        else:
            stream_reader = stream_reader.option("multiline", "false")  # For JSON by line
    elif file_format == "parquet":
        # No additional options needed for Parquet
        pass
    else:
        raise ValueError(f"Unsupported file format {file_format}")

    # Apply custom schema if provided
    if custom_schema:
        stream_reader = stream_reader.schema(custom_schema)

    # Read the stream from the input path
    input_stream = stream_reader.load(input_path)

    # DATA TRANSFORMATION
    # Capitalize the specified column if it exists in the DataFrame
    if capitalized_column in input_stream.columns:
        input_stream = input_stream.withColumn(
            capitalized_column, 
            upper(input_stream[capitalized_column])
        )

    # Write the stream to Delta format with checkpointing
    output_stream = input_stream.writeStream \
        .format("delta") \
        .outputMode(output_mode) \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(processingTime="10 seconds")  # Process every 10 seconds

    # Start the streaming query
    query = output_stream.start(output_path)

    last_processed_time = time.time()  # Initialize the last processed time

    while query.isActive:
        # Get the last progress of the stream
        last_progress = query.lastProgress
        if last_progress:
            processing_time = last_progress.get("processedRowsPerSecond", 0)
            print(f"Last progress: {last_progress}, Processed per second: {processing_time}")

            if processing_time == 0:
                # If no new rows have been processed, check the idle time
                time_since_last_processed = time.time() - last_processed_time
                print(f"Time since last processed: {time_since_last_processed} seconds.")
                
                if time_since_last_processed > idle_timeout:
                    print(f"No new data for {idle_timeout} seconds. Stopping the stream.")
                    query.stop()
                    break

            else:
                last_processed_time = time.time()  # Update the time when data is processed
        
        # Sleep briefly before checking again
        time.sleep(5)

    print("Stream has stopped.")

# PROCESAMIENTO DE DATOS EN STREAMING CON HILOS

In [100]:
import threading

# Function to execute 'process_and_save_files'
def run_process_and_save_files():
    """
    Runs the process_and_save_files function to download, process, and save files in the specified directories.
    """
    process_and_save_files(
        csv_streaming_path="csv_streaming_files/",
        base_streaming_directory="warehouse-spark/delta_spark_streaming",
        metadata_streaming_path="warehouse-spark/delta_spark_streaming/metadata",
        delta_output_streaming_path="warehouse-spark/delta_spark_streaming/proccesed",
        file_name="flight_logs",
        num_files=5,
        sleep_time=10,
        raw_url_base="https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/",
    )

# Function to execute 'process_streaming_files'
def run_process_streaming_files():
    """
    Runs the process_streaming_files function to process streaming data and write it to a Delta table.
    """
    process_streaming_files(
        spark_session=spark_session,
        input_path=csv_streaming_path,
        output_path=delta_output_streaming_path,
        checkpoint_path=metadata_streaming_path,
        file_format="csv",
        custom_schema=customSchema,
        header=True,
        output_mode="append",
        idle_timeout=20,
        capitalized_column="departure_city",
    )

# Define directorios para procesamiento
csv_streaming_path = "csv_streaming_files/"
base_streaming_directory = "warehouse-spark/delta_spark_streaming"
metadata_streaming_path = f"{base_streaming_directory}/metadata" 
delta_output_streaming_path = f"{base_streaming_directory}/proccesed"

# Crear hilos para ejecutar ambas funciones en paralelo
process_and_save_files_thread = threading.Thread(target=run_process_and_save_files)
process_streaming_files_thread = threading.Thread(target=run_process_streaming_files)

# Iniciar ambos hilos en paralelo
process_and_save_files_thread.start()
process_streaming_files_thread.start()

# Esperar a que ambos hilos terminen antes de continuar con la siguiente celda
process_and_save_files_thread.join()
process_streaming_files_thread.join()

print("Both processes have finished.")

24/12/22 02:58:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


csv_streaming_files//flight_logs_1.csv saved Successfully!!
Last progress: {'id': 'b29132dd-faab-499c-867a-0786807473ba', 'runId': '2c9c8da3-78ea-4da2-8745-11b9a80d3eff', 'name': None, 'timestamp': '2024-12-22T02:58:50.600Z', 'batchId': 0, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 3, 'triggerExecution': 8}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': None, 'endOffset': None, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last processed: 5.0137786865234375 seconds.
Last progress: {'id': 'b29132dd-faab-499c-867a-0786807473ba', 'runId': '2c9c8da3-78ea-4da2-8745-11b9a80d3eff', 'name': None, 'timestamp': '2024-12-22T02:58:50.600Z', 'batchId': 0, 'numInp

INFO:py4j.clientserver:Closing down clientserver connection


Last progress: {'id': 'b29132dd-faab-499c-867a-0786807473ba', 'runId': '2c9c8da3-78ea-4da2-8745-11b9a80d3eff', 'name': None, 'timestamp': '2024-12-22T03:00:10.000Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 2, 'triggerExecution': 2}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 4}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last processed: 20.006556510925293 seconds.
No new data for 20 seconds. Stopping the stream.
Stream has stopped.
Both processes have finished.


# Method 1 - History Version

In [101]:
historic_version_file = show_historic_version_from_delta_file(spark_session, delta_output_streaming_path)
historic_version_file

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,


In [102]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(show_historic_version_from_delta_file(spark_session, delta_output_streaming_path, version))
    print()

Version : 0, Operation : STREAMING UPDATE, Content


id,secure_code,airline,departure_city,departure_date,arrival_airport,arrival_city,arrival_time,passenger_name,passenger_gender,seat_number,currency,departure_gate,flight_status,co_pilot_name,aircraft_type,fuel_consumption,flight_id,flight_number,departure_airport,departure_country,departure_time,arrival_country,arrival_date,flight_duration,passenger_age,passenger_nationality,ticket_price,baggage_weight,arrival_gate,pilot_name,cabin_crew_count,aircraft_registration,flight_distance
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,



Version : 1, Operation : STREAMING UPDATE, Content


aircraft_registration,aircraft_type,airline,arrival_airport,arrival_city,arrival_country,arrival_date,arrival_gate,arrival_time,baggage_weight,cabin_crew_count,co_pilot_name,currency,departure_airport,departure_city,departure_country,departure_date,departure_gate,departure_time,flight_distance,flight_duration,flight_id,flight_number,flight_status,fuel_consumption,id,passenger_age,passenger_gender,passenger_name,passenger_nationality,pilot_name,seat_number,secure_code,ticket_price
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,



Version : 2, Operation : STREAMING UPDATE, Content


aircraft_registration,aircraft_type,airline,arrival_airport,arrival_city,arrival_country,arrival_date,arrival_gate,arrival_time,baggage_weight,cabin_crew_count,co_pilot_name,currency,departure_airport,departure_city,departure_country,departure_date,departure_gate,departure_time,flight_distance,flight_duration,flight_id,flight_number,flight_status,fuel_consumption,id,passenger_age,passenger_gender,passenger_name,passenger_nationality,pilot_name,seat_number,secure_code,ticket_price
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,



Version : 3, Operation : STREAMING UPDATE, Content


aircraft_registration,aircraft_type,airline,arrival_airport,arrival_city,arrival_country,arrival_date,arrival_gate,arrival_time,baggage_weight,cabin_crew_count,co_pilot_name,currency,departure_airport,departure_city,departure_country,departure_date,departure_gate,departure_time,flight_distance,flight_duration,flight_id,flight_number,flight_status,fuel_consumption,id,passenger_age,passenger_gender,passenger_name,passenger_nationality,pilot_name,seat_number,secure_code,ticket_price
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,



Version : 4, Operation : STREAMING UPDATE, Content


aircraft_registration,aircraft_type,airline,arrival_airport,arrival_city,arrival_country,arrival_date,arrival_gate,arrival_time,baggage_weight,cabin_crew_count,co_pilot_name,currency,departure_airport,departure_city,departure_country,departure_date,departure_gate,departure_time,flight_distance,flight_duration,flight_id,flight_number,flight_status,fuel_consumption,id,passenger_age,passenger_gender,passenger_name,passenger_nationality,pilot_name,seat_number,secure_code,ticket_price
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,





# Method 2 - Change Data Feed Version Validation and Control

In [103]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()

for version in range(total_versions_file + 1):
    print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
    display(read_delta_table_with_change_data_control(spark_session, delta_output_streaming_path, version, version))
    print()

Version : 0, Operation : STREAMING UPDATE, Content


id,secure_code,airline,departure_city,departure_date,arrival_airport,arrival_city,arrival_time,passenger_name,passenger_gender,seat_number,currency,departure_gate,flight_status,co_pilot_name,aircraft_type,fuel_consumption,flight_id,flight_number,departure_airport,departure_country,departure_time,arrival_country,arrival_date,flight_duration,passenger_age,passenger_nationality,ticket_price,baggage_weight,arrival_gate,pilot_name,cabin_crew_count,aircraft_registration,flight_distance,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,



Version : 1, Operation : STREAMING UPDATE, Content


id,secure_code,airline,departure_city,departure_date,arrival_airport,arrival_city,arrival_time,passenger_name,passenger_gender,seat_number,currency,departure_gate,flight_status,co_pilot_name,aircraft_type,fuel_consumption,flight_id,flight_number,departure_airport,departure_country,departure_time,arrival_country,arrival_date,flight_duration,passenger_age,passenger_nationality,ticket_price,baggage_weight,arrival_gate,pilot_name,cabin_crew_count,aircraft_registration,flight_distance,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,



Version : 2, Operation : STREAMING UPDATE, Content


id,secure_code,airline,departure_city,departure_date,arrival_airport,arrival_city,arrival_time,passenger_name,passenger_gender,seat_number,currency,departure_gate,flight_status,co_pilot_name,aircraft_type,fuel_consumption,flight_id,flight_number,departure_airport,departure_country,departure_time,arrival_country,arrival_date,flight_duration,passenger_age,passenger_nationality,ticket_price,baggage_weight,arrival_gate,pilot_name,cabin_crew_count,aircraft_registration,flight_distance,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,



Version : 3, Operation : STREAMING UPDATE, Content


id,secure_code,airline,departure_city,departure_date,arrival_airport,arrival_city,arrival_time,passenger_name,passenger_gender,seat_number,currency,departure_gate,flight_status,co_pilot_name,aircraft_type,fuel_consumption,flight_id,flight_number,departure_airport,departure_country,departure_time,arrival_country,arrival_date,flight_duration,passenger_age,passenger_nationality,ticket_price,baggage_weight,arrival_gate,pilot_name,cabin_crew_count,aircraft_registration,flight_distance,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,



Version : 4, Operation : STREAMING UPDATE, Content


id,secure_code,airline,departure_city,departure_date,arrival_airport,arrival_city,arrival_time,passenger_name,passenger_gender,seat_number,currency,departure_gate,flight_status,co_pilot_name,aircraft_type,fuel_consumption,flight_id,flight_number,departure_airport,departure_country,departure_time,arrival_country,arrival_date,flight_duration,passenger_age,passenger_nationality,ticket_price,baggage_weight,arrival_gate,pilot_name,cabin_crew_count,aircraft_registration,flight_distance,_change_type,_commit_version,_commit_timestamp
Loading ITables v2.2.4 from the init_notebook_mode cell... (need help?),,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,





# LIBERAR TODOS LOS RECURSOS UTILIZADOS

In [104]:
# Liberar memoria eliminando los DataFrames de Spark del caché
# Esto es necesario para evitar que ocupen espacio innecesario en memoria
spark_dataframe_sample.unpersist()  # Elimina el caché del DataFrame 'spark_dataframe_sample'
spark_dataframe_sql.unpersist()  # Elimina el caché del DataFrame 'spark_dataframe_sql'
spark_dataframe_delta.unpersist()  # Elimina el caché del DataFrame 'spark_dataframe_delta'
spark_dataframe_joins.unpersist()  # Elimina el caché del DataFrame 'spark_dataframe_joins'
spark_dataframe_uno.unpersist()  # Elimina el caché del DataFrame 'spark_dataframe_uno'
spark_dataframe_dos.unpersist()  # Elimina el caché del DataFrame 'spark_dataframe_dos'
spark_dataframe_uno_renamed.unpersist()  # Elimina el caché del DataFrame 'spark_dataframe_uno_renamed'
spark_dataframe_dos_renamed.unpersist()  # Elimina el caché del DataFrame 'spark_dataframe_dos_renamed'

# Detener la sesión de Spark para liberar todos los recursos asociados
spark_session.stop()