In [0]:
# Source DB: Defaults to MySQL
# Source DB Access Name: Required
# Source DB Table: Required [schema_name.table_name] [If multiple tables used in DML, fill first table name]
# DML: Defaults to: SELECT * FROM Source-Table
# Target S3 Access Name: Required
# Target S3 File Path: Required [Formats allowed: s3://bucket/prefix/filename.txt, s3://bucket/prefix/filename_yyyymmdd.txt, _yyyymmddHHMMSS.txt, 
# Defaults to table_name_yyyymmddHHMMSS.txt ]
# Target File Delimiter: Defaults to comma [comma: ',' tab: '\t', 'pipe': '|']
# Email Notification: Defaults to skip [Requires email id to send notification]

In [0]:
# #Source Inputs
# dbutils.widgets.removeAll()
# dbutils.widgets.dropdown("Source_DB", "MySQL", ["MySQL", "PSQL"])
# dbutils.widgets.text("Source_DB_Access", "")
# dbutils.widgets.text("Source_DB_Table", "")
# dbutils.widgets.text("DML", "")

# #Target Inputs
# dbutils.widgets.text("Target_S3_Access", "")
# dbutils.widgets.text("Target_S3_File_Path", "")
# dbutils.widgets.text("Target_File_Delimiter", ",")
# dbutils.widgets.text("Notification_Recipient","")

In [0]:
%run ./configs

In [0]:
# Reading user inputs
source_db = getArgument('Source_DB').strip()
source_access = getArgument('Source_DB_Access').strip()
source_table = getArgument('Source_DB_Table').strip()
DML = getArgument('DML').strip()
target_access = getArgument('Target_S3_Access').strip()
target_s3 = getArgument('Target_S3_File_Path').strip()
delimiter = getArgument('Target_File_Delimiter').strip()
notify_recipient = getArgument('Notification_Recipient').strip()

s3_conn ={}
db_conn ={}

# Validating user inputs
if source_access == '':
  dbutils.notebook.exit("Source DB Access name missing!")
elif source_table == '':
  dbutils.notebook.exit("Source Table info missing!")
elif len(source_table.split('.')) != 2:
  dbutils.notebook.exit("Schema name missing in source table!")
elif target_access == '':
  dbutils.notebook.exit("Target S3 Access Name missing!")
elif target_s3 == '':
  dbutils.notebook.exit("Target S3 File Path missing!")

if target_s3.endswith(('.csv','.txt','.parquet')):
  target_file_name = target_s3.split('/')[-1]
else:
  target_file_name = ''

if source_access in db_access_list:
  db_conn['db_host'] = db_hosts[source_access]
  db_conn['db_username'] = db_usernames[source_access]
  db_conn['db_password'] = db_passwords[source_access]
else:
  dbutils.notebook.exit("Invalid DB Access Name!")

if target_access in s3_access_list: 
  s3_conn['s3_access_key'] = s3_access_keys[target_access]
  s3_conn['s3_secret_key'] = s3_secret_keys[target_access]
else:
  dbutils.notebook.exit("Invalid S3 Access Name!")
service = 'DB-S3'

In [0]:
import boto3
import uuid
import datetime, time
from datetime import datetime
import logging
import re
# Create a logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [0]:
run_id = uuid.uuid4()
print('run_id:',run_id)

run_id: a394b018-597c-4f3d-b93a-091b8db07145


In [0]:
def run_logger(service,log_op, opn, DML, rc, target,status):
  """
  Inputs: SQL Query
  Output: Returns True if success
  """
  logging.info(f"Updating run log table for {opn} operation.....")
  if log_op == 'insert' :
    query = f"INSERT INTO TABLE run_log VALUES('{service}','{run_id}','{source_access}','{source_table}','{DML}','{opn}',{rc},'{target_access}','{target}','{status}',current_timestamp())"
  elif log_op == 'update':
    query = f"UPDATE run_log SET status='{status}' where run_id = '{run_id}' and operation='{opn}'"
  # Executing SQL Query
  spark.sql(query)
  return True

In [0]:
def table_data_read(db_conn, source_table, DML):
  """
  Reads data from table using DML if DML is blank SELECT * will be used
  Inputs: DB connection details, table name, DML
  Output: returns dataframe, record count
  """

  # Reading user input DML
  if DML == '':
    query = f"(SELECT * FROM {source_table})as query"
  else:
    query = '( '+ DML+' ) as query'
  
  print('Executable Query:',query)

  db_name = source_table.split('.')[0]
  dbtable = source_table.split('.')[1]

  # Preparing url string for JDBC connection
  if source_db == 'MySQL':
    url = f"jdbc:mysql://{db_conn['db_host']}:3306/{db_name}" 
  elif source_db == 'PSQL':
    url = f"jdbc:postgresql://{db_conn['db_host']}:5432/{db_name}" 

  try:
    read_df = (spark.read
    .format("jdbc")
    .option("url", url)
    .option("dbtable", query)
    .option("user", db_conn['db_username'])
    .option("password", db_conn['db_password'])
    .load()
    )
  except Exception as e:
    logging.error(f"Error during reading from table: {e}")
    run_logger('DB-S3','insert','read', query, 0, target_s3 ,'failed')
    return None
  if read_df :
    rc = read_df.count()
    query = query.replace("'", "\\'")
    run_logger('DB-S3','insert','read', query, rc, target_s3 ,'success')
    logging.info("Completed reading from table ......")
    return [read_df,rc]
  else:
    query = query.replace("'", "\\'")
    run_logger('DB-S3','insert','read', query, 0, target_s3 ,'failed')
    logging.info("Error during reading from table ......")
    return None

In [0]:
def move_and_rename_file_in_s3(s3_conn, target_s3, new_file_name):
  """
  This function moves the s3 file & renames it to required file name
  Inputs: S3 connection details, s3_path, file name
  Outputs: True
  """

  access_key = s3_conn['s3_access_key']
  secret_key = s3_conn['s3_secret_key']
  
  # Creating boto3 client
  s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

  s3_parts = target_s3.split('/')
  bucket_name = s3_parts[2]
  if target_s3.endswith(('.csv','.txt','.parquet')):
    prefix = '/'.join(s3_parts[3:-1])
  else:
    prefix = '/'.join(s3_parts[3:])

  if prefix.endswith('/'):
    folder_prefix = prefix + new_file_name + '/'
  else :
    folder_prefix = prefix +'/' +new_file_name+'/'

  fformat = '.'+new_file_name.split('.')[-1]
  if fformat in ['.csv','.txt']:
    fformat = '.csv'
  else:
    fformat = fformat

  # List objects in the folder
  response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_prefix)
  # Retrieve the filenames from the list of objects
  csv_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith(f'{fformat}')]

  if csv_files:
    # Pick the last csv file
    last_csv_file = csv_files[-1]
    if prefix.endswith('/') :
      key = prefix + new_file_name
    else:
      key = prefix + '/' + new_file_name
    # Move the file to upper directory
    s3.copy_object(Bucket=bucket_name, CopySource=f"{bucket_name}/{last_csv_file}", Key= key)
    # Delete original directory
    s3 = boto3.resource('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
    bucket = s3.Bucket(f'{bucket_name}')
    for obj in bucket.objects.filter(Prefix= f'{folder_prefix}'):
      s3.Object(bucket.name,obj.key).delete()
    logging.info("Moved and Renamed files")
    return True
  else :
    logging.info("Error during moving & renaming files")
    return None

In [0]:
def write_data_s3(s3_conn, target_s3, target_file_name, delimiter, input_df):

  """
  This function writes spark df read from DB to S3 path provided
  Inputs: S3 connection details, s3 landing path, final file name, input read df
  Output: record count, file name
  """

  s3_parts_1 = target_s3.split('/')
  bucket_name = s3_parts_1[2]
  if target_s3.endswith(('.csv','.txt','.parquet')):
    prefix = '/'.join(s3_parts_1[3:-1])
  else:
    prefix = '/'.join(s3_parts_1[3:])

  # Accessing keys from connection inputs
  access_key = s3_conn['s3_access_key']
  secret_key = s3_conn['s3_secret_key']  

  # Setting Spark configs to access S3
  sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", access_key)
  sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", secret_key)

  # Creating boto3 client
  s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

  current_time = datetime.now()
  timestamp = current_time.strftime("%Y%m%d%H%M%S")
  current_day = datetime.today().date()
  date = current_day.strftime("%Y%m%d")

  if target_file_name == '':
    file_name = source_table.split('.')[1]+'_'+timestamp+'.csv'
    if not target_s3.endswith('/'):
      target_s3 = target_s3 + '/'
    file_path = 's3://'+bucket_name+'/'+prefix+'/'+file_name
  elif target_file_name != '':
    if ('_yyyymmddHHMMSS' in target_file_name) :
      file_parts = target_file_name.split('_yyyymmddHHMMSS')
      file_name = file_parts[0]+'_'+timestamp+file_parts[1]
      file_path = 's3://'+bucket_name+'/'+prefix+'/'+file_name
    elif ('_yyyymmdd' in target_file_name) :
      file_parts = target_file_name.split('_yyyymmdd')
      file_name = file_parts[0]+'_'+date+file_parts[1]
      file_path = 's3://'+bucket_name+'/'+prefix+'/'+file_name
      print('file_name:',file_name)
    else :
      file_name = target_file_name
      file_path = 's3://'+bucket_name+'/'+prefix+'/'+file_name
    print('Target File name:',file_name)
  
  file_format = file_name.split('.')[-1]
  rc = input_df.count()

  if file_format in ['txt','csv']:
    input_df.coalesce(1).write.format('csv').option('header','True').option("delimiter",delimiter).mode('overwrite').save(file_path)
  else:
    input_df.write.mode('overwrite').parquet(file_path)
  res = move_and_rename_file_in_s3(s3_conn, target_s3, file_name )
  if res:
    run_logger('DB-S3','insert','write','',rc,file_path, 'success')
    logging.info("Completed writing file.....")
    return [rc, file_name]
  else:
    run_logger('DB-S3','insert','write','',0,target_s3, 'failed')
    logging.info("Error during writing file.....")
    return None


In [0]:
def db_s3(s3_conn, source_table, DML, target_s3, target_file_name, delimiter, db_conn):
  
  """
  Main function to call read and write functions
  Inputs: source parameters, target prameters
  Output: record count
  """
  inputs = table_data_read(db_conn, source_table, DML)
  if inputs:
    result = write_data_s3(s3_conn, target_s3, target_file_name, delimiter, inputs[0])
  else:
    #run_logger('DB-S3','update','read','','','','failed')
    result = None
    
  if not result:
    run_logger('DB-S3','update','write','','','','failed')

  return result

In [0]:
# Calling main function
status = db_s3(s3_conn, source_table, DML, target_s3, target_file_name, delimiter, db_conn)

# Checking status
if status :
  logging.info(f"{status[0]} records transferred from DB to S3 {target_s3} with filename {status[1]}")
else:
  logging.info("Failed to transfer file from S3 to Target table")

Executable Query: ( select * from reddit.reddit_posts_agg limit 1000 ) as query


INFO:root:Updating run log table for read operation.....
INFO:root:Completed reading from table ......


Target File name: db_s3_20240323162918.csv


INFO:root:Moved and Renamed files
INFO:root:Updating run log table for write operation.....
INFO:root:Completed writing file.....
INFO:root:1000 records transferred from DB to S3 s3://sdevalla-portfolio/orch_test/db_s3_yyyymmddhhmmss.csv with filename db_s3_20240323162918.csv


In [0]:
from botocore.exceptions import ClientError
def send_email(subject, body_html, sender, recipients):

  """
  This function sends email notification to recipeints on run status
  Inputs: Email content, receiver email address list
  Output: Success/Failure message
  """
  access_key = ses_conn['access_key']
  secret_key = ses_conn['secret_key']

  #Creating boto3 ses client
  ses_client = boto3.client('ses', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name = 'us-east-2')

  # Create a MIME message
  body_text = "This email requires HTML support. Please view in a HTML-compatible email client."
  charset = "UTF-8"
  
  # Assemble the email
  try:
      response = ses_client.send_email(
          Destination={
              'ToAddresses': recipients,
          },
          Message={
              'Body': {
                  'Html': {
                      'Charset': charset,
                      'Data': body_html,
                  },
                  'Text': {
                      'Charset': charset,
                      'Data': body_text,
                  },
              },
              'Subject': {
                  'Charset': charset,
                  'Data': subject,
              },
          },
          Source=sender,
      )
  except ClientError as e:
      print(e.response['Error']['Message'])
  else:
      print("Email sent! Message ID:", response['MessageId'])

In [0]:
def dataframe_to_html_table(df):
    # Convert DataFrame to HTML table
    html_table = df.to_html(index=False)
    # Add inline CSS styling to color the header
    styled_header = '<th style="background-color: #FB451D; color: white;">'
    html_table = html_table.replace('<th>', styled_header)
    # Format the HTML table
    formatted_html_table = f'<html><body>{html_table}</body></html>'
    return formatted_html_table

In [0]:
subject = f"{service} run status for run id - {run_id}"
df = spark.sql(""" select distinct service as Service, source_path_table as `Source Table`, source_file_dml as DML, operation as Task,  record_count as Record_Count, target_path_table as Target_S3, Status, Timestamp from run_log where run_id = '{}'  """.format(run_id ))
html_table = dataframe_to_html_table(df.toPandas())
body_html = f"<html><body><p>Hi,</p><p>Please find the status of service: DB-S3 with run id: {run_id}</p>{html_table}</body></html>"
displayHTML(body_html)
sender = "noreplyd22snotification@gmail.com"
if notify_recipient != '' :
  recipients = notify_recipient.split(',')
else:
   recipients = []
if len(recipients) > 0:
  send_email(subject, body_html, sender, recipients)
else:
  print('No recipients to send email!')

Service,Source Table,DML,Task,Record_Count,Target_S3,Status,Timestamp
DB-S3,reddit.reddit_posts_agg,( select * from reddit.reddit_posts_agg limit 1000 ) as query,read,1000,s3://sdevalla-portfolio/orch_test/db_s3_yyyymmddhhmmss.csv,success,2024-03-23 16:29:14.637
DB-S3,reddit.reddit_posts_agg,,write,1000,s3://sdevalla-portfolio/orch_test/db_s3_20240323162918.csv,success,2024-03-23 16:29:22.805


No recipients to send email!
