In [0]:
dbutils.widgets.text("proc_name", "", "Process Name")
dbutils.widgets.text("proc_run_id", "", "Process Run ID")
dbutils.widgets.text("data_dt", "", "Data Date")
proc_name = dbutils.widgets.get("proc_name")
proc_run_id = dbutils.widgets.get("proc_run_id")
data_dt = dbutils.widgets.get("data_dt")

In [0]:
%run /Workspace/Users/themallpocaws@inteltion.com/Data-Integration-Scenarios/00_common/common_function

In [0]:
%pip install paramiko
%pip install boto3 python-gnupg

In [0]:
import boto3
from botocore.exceptions import ClientError
import paramiko
from io import StringIO, BytesIO
import pandas as pd
import os
import tempfile
import sys
import time
import gnupg

In [0]:
def get_secret():
  secret_name = "TMG/POC/SFTP"
  region_name = "ap-southeast-1"

  # Create a Secrets Manager client
  session = boto3.session.Session()
  client = session.client(
      service_name='secretsmanager',
      region_name=region_name
  )

  try:
      get_secret_value_response = client.get_secret_value(
          SecretId=secret_name
      )
  except ClientError as e:
      # For a list of exceptions thrown, see
      # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
      raise e
  
  return json.loads(get_secret_value_response['SecretString'])

In [0]:
def progress_callback(transferred, total):
    current_time = time.time()
    # Only update if more than 1 second has passed since the last update
    if current_time - last_update_time[0] >= 1 or transferred == total:
        percent = transferred / total * 100
        sys.stdout.write(f"\rTransferred: {transferred} / {total} bytes ({percent:.2f}%)")
        sys.stdout.flush()
        last_update_time[0] = current_time

In [0]:
def get_file_path(data_dt):
    _data_dt=data_dt.replace("-","") 
    if _data_dt=='20190921':
        return {"remote_file_path":"/shared/uat/DWH_POC/Out/POC_20190921.csv.gpg",
                "s3_bucket":"s3://tmg-poc-awsdb-apse1-stack-97db8-bucket/unity-catalog/catalog/poc_raw/POC_20190921.csv.gpg"}
    elif _data_dt=='20190920':
        return {"remote_file_path":"/shared/uat/DWH_POC/Out/POC_20190920.csv",
                "s3_bucket":"s3://tmg-poc-awsdb-apse1-stack-97db8-bucket/unity-catalog/catalog/poc_raw/POC_20190920.csv"}
    else:
        raise SystemError(f"Invalid data date, File with date {data_dt} does not exist (TMG didn't provided)")
    

In [0]:
def decrypt_gpg(data_dt,gpg_decrypt_passphrase):

    try:
        thai_tz = pytz.timezone('Asia/Bangkok')
        timestamp = datetime.now(thai_tz).strftime('%Y%m%d%H%M%S')
        _data_dt=data_dt.replace("-","") 
        # Define S3 Paths
        s3_bucket = "s3://tmg-poc-awsdb-apse1-stack-97db8-bucket/unity-catalog/catalog/"
        gpg_private_key_s3 = s3_bucket + "poc_landing/TMG-POC-DWH-Vendors_0x4BB20771_SECRET.asc"
        encrypted_csv_s3 = s3_bucket + f"poc_raw/POC_{_data_dt}.csv.gpg"
        decrypted_csv_s3 = s3_bucket + f"poc_raw/POC_{_data_dt}.csv"

        # Define Local Paths
        local_privkey_path = f"/tmp/TMG-POC-DWH-Vendors_0x4BB20771_SECRET.asc"
        encrypted_csv_path = f"/tmp/POC_{_data_dt}.csv.gpg"
        decrypted_csv_path = f"/tmp/POC_{_data_dt}.csv"

        # Step 1: Download the Private Key and Encrypted CSV from S3 to Local
        dbutils.fs.cp(gpg_private_key_s3, "file:" + local_privkey_path)
        print(" Private key downloaded from S3 to:", local_privkey_path)

        dbutils.fs.cp(encrypted_csv_s3, "file:" + encrypted_csv_path)
        print(" Encrypted CSV file downloaded from S3 to:", encrypted_csv_path)

        # Step 2: Initialize GPG and Import the Private Key
        gpg = gnupg.GPG()

        with open(local_privkey_path, "r") as key_file:
            private_key_data = key_file.read()

        import_result = gpg.import_keys(private_key_data)
        print("Imported private key:", import_result.results)

        # Step 3: Read the Encrypted CSV File
        with open(encrypted_csv_path, 'rb') as enc_file:
            encrypted_content = enc_file.read()

        # Step 4: Decrypt the CSV File Using the Private Key and Passphrase
        decrypted_data = gpg.decrypt(encrypted_content, passphrase=gpg_decrypt_passphrase)
        if not decrypted_data.ok:
            raise Exception("Decryption failed: " + decrypted_data.status)
        # Step 5: Save the Decrypted CSV File Locally
        with open(decrypted_csv_path, "w") as dec_file:
            dec_file.write(str(decrypted_data))
        print("Decrypted CSV file saved locally at:", decrypted_csv_path)

        # Step 6: Upload the Decrypted CSV File Back to S3
        dbutils.fs.cp("file:" + decrypted_csv_path, decrypted_csv_s3)
        print("Decrypted CSV file uploaded to S3 at:", decrypted_csv_s3)

        #Move decrypted file to archived folder
        thai_tz = pytz.timezone('Asia/Bangkok')
        _archived_data_file = f's3://tmg-poc-awsdb-apse1-stack-97db8-bucket/unity-catalog/catalog/poc_raw/archived/{timestamp}/POC_{_data_dt}.csv.gpg'
        dbutils.fs.cp(encrypted_csv_s3, _archived_data_file, True)
        dbutils.fs.rm(encrypted_csv_s3)
        
    except Exception as err:
        print(f'error : {err}')
        raise err

    finally:
        for i in [local_privkey_path,encrypted_csv_path,decrypted_csv_path]:
            if os.path.exists(i):
                os.remove(i)
                print(f'Deleted local file : {i}')   

In [0]:
# SFTP connection details
secret = get_secret() #Get SFTP secrets
hostname = secret["hostname"]
username = secret["username"]
password = secret["password"]
port = int(secret["port"])
gpg_decrypt_passphrase = secret["gpg_decrypt_passphrase"]


In [0]:
# Main process and throw error for a WorkflowException.
batch_status=set_process_success()
try:
    print(f"=> start process. : {proc_name} Process Run ID : {proc_run_id}")
 
    file_path = get_file_path(data_dt)
    remote_file_path = file_path['remote_file_path']  # Remote file location on the SFTP server
    s3_bucket = file_path['s3_bucket']

    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
    tmp_dir=f'/tmp/{timestamp}/'
    
    last_update_time = [0]
    
    # Create a transport object
    transport = paramiko.Transport((hostname, port))
    
    # Connect to the SFTP server using the private key
    transport.connect(username=username, password=password)
    
    # Create an SFTP session
    sftp = paramiko.SFTPClient.from_transport(transport)

    # Create a temporary file in the specified directory on DBFS
    if not os.path.exists(tmp_dir):
        os.makedirs(tmp_dir)
    with tempfile.NamedTemporaryFile(delete=False, dir=tmp_dir) as tmp_file:
        local_file_path = tmp_file.name
    
    # Download the remote file to the local temporary file
    file_stat = sftp.stat(remote_file_path)
    print(f"File size: {file_stat.st_size} bytes")
    sftp.get(remote_file_path, local_file_path, callback=progress_callback)

    print(f"Downloaded {remote_file_path} to local temporary file: {local_file_path}")

    dbutils.fs.cp("file:" + local_file_path, s3_bucket)
    print(f'Copied {local_file_path} to {s3_bucket}')
    if s3_bucket.endswith("gpg"):
        print(f'Starting decrypt on : {s3_bucket}')
        decrypt_gpg(data_dt,gpg_decrypt_passphrase)
    else:
        print("Extracted file does not encrypted, skip decrypt phase")

except Exception as e:
    print(f"An error occurred: {e}")
    batch_status=set_process_failed(e)

finally:
    # Clean up the temporary file and close connections
    if os.path.exists(local_file_path):
        os.remove(local_file_path)
    sftp.close()
    transport.close()
dbutils.notebook.exit(batch_status)