In [0]:
from pyspark.sql.datasource import DataSource, DataSourceReader, DataSourceWriter
from pyspark.sql.types import StructType, StructField, StringType
from typing import Iterator, Tuple

In [0]:
def caesar_cypher(text: str, shift: int, decrypt: bool = True):
    """
    Aplica a Cifra de César a uma string de texto.

    Esta função pode tanto cifrar quanto decifrar o texto fornecido, dependendo do valor do parâmetro 'decrypt'.

    Parâmetros:
    - text (str): A string de texto que será processada.
    - shift (int): O número de posições que cada letra será deslocada no alfabeto.
    - decrypt (bool, opcional): Se True, a função decifra o texto; se False, cifra o texto. O padrão é True.

    Retorna:
    - str: O texto resultante após a aplicação da Cifra de César.

    Exemplo de uso:
    >>> caesar_cypher('abc', 3)
    'xyz'
    >>> caesar_cypher('xyz', 3, decrypt=True)
    'abc'
    """
    if not decrypt:
        shift *= -1
    result = []
    for char in text:
        if char.isalpha():
            effective_shift = shift % 26
            base_code = ord('a') if char.islower() else ord('A')
            new_char = chr(base_code + (ord(char) - base_code - effective_shift) % 26)
            result.append(new_char)
        else:
            result.append(char)
    return ''.join(result)

In [0]:
class CypherDataSourceReader(DataSourceReader):
    """
    A custom DataSourceReader for reading and decrypting files encrypted with the Caesar Cypher.

    This class reads encrypted text files and applies the Caesar Cypher decryption based on the provided shift value.

    Attributes:
        schema (StructType): The schema of the DataFrame to be returned.
        options (dict): A dictionary of options, including 'path' to the file and 'shift' for decryption.
    """

    def __init__(self, schema: StructType, options: dict):
        """
        Initializes the CypherDataSourceReader with the given schema and options.

        Args:
            schema (StructType): The schema of the DataFrame.
            options (dict): A dictionary containing 'path' to the encrypted file and 'shift' value for decryption.
        """
        self.schema = schema
        self.options = options

    def read(self, partition: int) -> Iterator[Tuple[str]]:
        """
        Reads the encrypted file and yields decrypted lines as tuples.

        Args:
            partition (int): The partition number (not used in this implementation).

        Yields:
            Iterator[Tuple[str]]: An iterator over tuples containing decrypted lines of text.
        """
        # Retrieve the file path and shift value from options
        file_path = self.options.get("path")
        shift = int(self.options.get("shift", 3))
        decrypt = self.options.get("decrypt", "true").lower() == "true"
        if not file_path:
            raise ValueError("No file path provided in options.")

        # Read the file and apply decryption
        with open(file_path, 'r') as file:
            for line in file:
                decrypted_text = caesar_cypher(line.strip(), shift, decrypt=decrypt)
                yield (decrypted_text,)

In [0]:
class CypherDataSource(DataSource):
    """
    A custom DataSource for reading files encrypted with the Caesar Cypher.

    This class allows Spark to read encrypted text files by applying the Caesar Cypher decryption during the read process.

    Methods:
        name: Returns the short name identifier for the data source.
        schema: Defines the schema of the DataFrame to be returned.
        reader: Creates a DataSourceReader instance for reading data.
    """

    @classmethod
    def name(cls):
        """
        Returns the short name identifier for the data source.

        Returns:
            str: The short name 'cypher'.
        """
        return "cypher"

    def schema(self):
        """
        Defines the schema of the DataFrame to be returned.

        Returns:
            StructType: A StructType object defining the schema with a single field 'decrypted_text' of type String.
        """
        return StructType([StructField("decrypted_text", StringType(), True)])

    def reader(self, schema: StructType):
        """
        Creates a DataSourceReader instance for reading data.

        Args:
            schema (StructType): The schema of the DataFrame.

        Returns:
            DataSourceReader: An instance of CypherDataSourceReader initialized with the provided schema and options.
        """
        return CypherDataSourceReader(schema, self.options)

In [0]:
spark.dataSource.register(CypherDataSource)

In [0]:
df_decrypted = (spark.read.format("cypher")
                .option("shift", 5)
                .load("test_encrypted.txt"))

display(df_decrypted)

In [0]:
df_decrypted = (spark.read.format("cypher")
                .option("shift", 3)
                .option("decrypt", False)
                .load("test.txt"))

display(df_decrypted)