# Team Octopus: Pipeline Skeleton
### This is the pseudocode for our pipeline plan for the RDMF surge hackathon

### Libraries import

In [4]:
import pandas as pd
from cryptography.fernet import Fernet
from datetime import datetime
from utils import Logger
from pyspark.sql import SparkSession

### Log info

In [None]:
logger = Logger("EncryptionLogger").get_logger()

### Class to generate salt from today's date and customer name

In [64]:
class SaltGenerator:
    def generate_salt(self, customer_name):
        today_date = datetime.now().strftime("%Y-%m-%d")
        salt = f"{today_date}_{customer_name}"
        return salt

### Class to encrypt a column

In [65]:
class ColumnEncryptor:
    def __init__(self, salt):
        self.salt = salt

    def encrypt_value_with_salt(self, value):
        cipher_suite = Fernet(Fernet.generate_key())
        value_with_salt = value + self.salt
        encrypted_value = cipher_suite.encrypt(value_with_salt.encode())
        return encrypted_value

### Encrypted Dataset Class

In [66]:
class EncryptedDataset:
    def __init__(self, bucket_name, file_name, column_name, salt_generator, column_encryptor):
        self.bucket_name = bucket_name
        self.file_name = file_name
        self.column_name = column_name
        self.salt_generator = salt_generator
        self.column_encryptor = column_encryptor
        self.dataset = pd.read_csv(f"gs://{self.bucket_name}/{self.file_name}")
    
    def encrypt_column_with_salt(self, df):
        selected_column = self.dataset[self.column_name].astype(str)
        encrypted_values = []
        
        df = self.dataset
        
        for value in selected_column:
            value = self.column_encryptor.encrypt_value_with_salt(value)
            encrypted_values.append(value)
            
        df["encrypted_id"] = encrypted_values
        
        return df

### Usage example

In [67]:
salt_generator = SaltGenerator()
salt = salt_generator.generate_salt("some_customer")
print(type(salt))

<class 'str'>


In [68]:
column_encryptor = ColumnEncryptor(salt)

In [69]:
dataset = EncryptedDataset("rdmf_mock_data", "BI_mockdata.csv", "BI_ID", salt_generator, column_encryptor)

In [70]:
processed = dataset.encrypt_column_with_salt(dataset)

In [71]:
processed.head()

Unnamed: 0,BI_ID,Business_name,PAYE_scheme_ref,SIC_code,UPRN,encrypted_id
0,606162501213544,Infinidum Enterprises,100000,25143,950187498346,b'gAAAAABl6SN4TRlirAu8M5dB21m_QqPDDm0SGJfybQn7...
1,606948361742718,Nitzsche PLC,100001,55303,950698168954,b'gAAAAABl6SN4HOepbXQ7Ssk8SB4zCtwgVIlXqeNXFDdp...
2,606089685526234,Schamberger-Smith,100002,52512,950320937277,b'gAAAAABl6SN4DcZZNkbiNSBp1yc38gucCz6r3OAwGCsI...
3,606885102242594,Heidenreich-Schimmel,100003,68588,950186421264,b'gAAAAABl6SN4V_8fYzt1QRmEWtBop6VcH2WV_f2q1iZE...
4,606517518834848,"Turner, Davis and Weber",100004,85252,950680091532,b'gAAAAABl6SN41ri3KYvw86xiPr8dd53sGijA8FvdyQXS...


In [None]:
spark = (SparkSession.builder.appName('rdmf_encryption')
         .master('yarn')
         .config('spark.driver.maxResultSize','2g')
         .config('spark.e

In [2]:
def save_to_bigquery(
    df: pyspark.sql.DataFrame,
    table: str,
    project: str = None,
    mode: str = "append",
    schema: pyspark.sql.types.StructType = None,
) -> None:
    """
    Save a PySpark DataFrame to BigQuery.

    Args:
        df (pyspark.sql.DataFrame): DataFrame to save.
        table (str): Name of the BigQuery table to save to.
        project (str, optional): BigQuery project to save to. Defaults to the project associated with the Spark context.
        mode (str, optional): Write mode. One of "append", "overwrite", or "ignore". Defaults to "append".
        schema (pyspark.sql.types.StructType, optional): Schema of the DataFrame. If not provided, it will be inferred from the DataFrame.

    Returns:
        None
    """

    if project is None:
        project = df.sql_ctx.sparkSession._jsc.hadoopConfiguration().get("fs.gs.project.id")

    if schema is None:
        schema = df.schema

    df.write \
        .format("bigquery") \
        .option("temporaryGcsBucket", "YOUR_GCS_BUCKET_NAME") \
        .option("table", table) \
        .option("project", project) \
        .option("writeDisposition", mode) \
        .schema(schema) \
        .save()

NameError: name 'pyspark' is not defined