In [1]:
!pip install awswrangler


In [2]:
import pandas as pd
from sprinkleSdk import SprinkleSdk as sp
import boto3
import awswrangler as wr
from datetime import datetime
import logging
import json
import redshift_connector

In [3]:
def getBoto3Session(aws_access_key, aws_secret_access_key, region):
    boto3session = None
    try:
        boto3session = boto3.Session(aws_access_key_id=aws_access_key,
                                aws_secret_access_key=aws_secret_access_key,
                                region_name=region)
    except Exception as e:
        print(f"getBoto3Session: {str(e)}")
    return boto3session

def getEnvConfig(env=None):
    # Specify the AWS user's access key/secret key here.
    AWS_ACCESS_KEY_ID = "AKIAVMM664F73YXKHU7A"
    AWS_SECRET_ACCESS_KEY = "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"
    AWS_REGION_NAME = 'us-east-1'
    try:

        botoSession = getBoto3Session(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION_NAME)
        if not botoSession:
            raise Exception("Failed to obtain boto session")

        if not env or env == 'DEV':
            config = {
                "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
                "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
                "AWS_REGION_NAME": AWS_REGION_NAME,
                "BUCKET_NAME": "neu-data-analytics",
                "REDSHIFT_HOST": "neumoney-dev-redshift-cluster.crwu8dyj5u4p.us-east-1.redshift.amazonaws.com",
                "REDSHIFT_PORT": 5439,
                "REDSHIFT_DATABASE": "neumoney-dev",
                "REDSHIFT_USER": "neumoney",
                "REDSHIFT_PWD": "p+6JVumb>g)QkiV"
            }
        if env == 'PROD':
            config = {
                "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
                "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
                "AWS_REGION_NAME": AWS_REGION_NAME,
                "BUCKET_NAME": "neu-data-analytics",
                "REDSHIFT_HOST": wr.secretsmanager.get_secret("REDSHIFT_HOST", botoSession),
                "REDSHIFT_PORT": wr.secretsmanager.get_secret("REDSHIFT_PORT", botoSession),
                "REDSHIFT_DATABASE": wr.secretsmanager.get_secret("REDSHIFT_DATABASE", botoSession),
                "REDSHIFT_USER": wr.secretsmanager.get_secret("REDSHIFT_USER", botoSession),
                "REDSHIFT_PWD": wr.secretsmanager.get_secret("REDSHIFT_PWD", botoSession)
            }

    except Exception as e:
        print(f"getConfig: {str(e)}")
        return None
    return config

In [4]:
class AwsS3():
    def __init__(self, config):
        self.AWS_ACCESS_KEY_ID = config.get("AWS_ACCESS_KEY_ID")
        self.AWS_SECRET_ACCESS_KEY = config.get("AWS_SECRET_ACCESS_KEY")
        self.region_name = config.get("AWS_REGION_NAME")
        self.bucket_name = config.get("BUCKET_NAME") 
        self.boto3session = None
        self.s3client = None

    def initialize(self):
        self.getBoto3Session()
        self.getBoto3S3Client()
        return True
    
    def getS3Url(self, file_prefix):
        url = f"s3://{self.bucket_name}/{file_prefix}"
        return url
    
    def getBoto3Session(self):
        try:
            session = boto3.Session(aws_access_key_id=self.AWS_ACCESS_KEY_ID,
                                    aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY,
                                    region_name=self.region_name)
            self.boto3session = session
        except Exception as e:
            print(f"getBoto3Session: {str(e)}")
            return False
        return True

    
    def getBoto3S3Client(self):
        try:
            s3_client = boto3.client('s3', aws_access_key_id=self.AWS_ACCESS_KEY_ID, aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY,region_name=self.region_name)
            self.s3client = s3_client
        except Exception as e:
            print(f"getBoto3S3Client: {str(e)}")
            return False
        return True        
        
    def existS3Key(self, prefix):
        found = False
        try:
            result = self.s3client.list_objects_v2(Bucket=self.bucket_name, Prefix=prefix)
            if 'Contents' in result.keys():
                found = True
        except Exception as e:
            print(f"existS3Key: {str(e)}")
            return False
        return found      
    
    def writeParquet(self, df, file_prefix, partition_cols, parquet_definition, glue_database, glue_table):
        """
        Reference: https://aws-sdk-pandas.readthedocs.io/en/stable/stubs/awswrangler.s3.to_parquet.html
        file_prefix: the prefix for the table. E.g: "s3://neu-data-analytics/application_data/application_data_identity/user_data_v0"
        partition_cols: List of columns by which the parquet is to be partitioned
        """
        resp = {}
        try:
            if df.empty:
                raise Exception("Dataframe empty. Nothing to write to S3.")
                
            partitioned =True if partition_cols else False
            s3url = self.getS3Url(file_prefix)
            file_exists = self.existS3Key(file_prefix)

            if file_exists:
                #logging.info(f"URL:{s3url} exists. Overwriting partitions...")
                print(logging.info(f"URL:{s3url} exists. Overwriting partitions..."))
                resp = wr.s3.to_parquet(df=df,
                                        path=s3url,
                                        boto3_session=self.boto3session,
                                        dataset=True,
                                        partition_cols=partition_cols,
                                        mode='overwrite_partitions',
                                        database=glue_database,
                                        table=glue_table,
                                        dtype=parquet_definition
                                       )
            else:
                logging.info(f"URL:{s3url} does not exist. Creating...")
                resp = wr.s3.to_parquet(df=df,
                                        path=s3url,
                                        boto3_session=self.boto3session,
                                        dataset=True,
                                        partition_cols=partition_cols,
                                        database=glue_database,
                                        table=glue_table,
                                        dtype=parquet_definition)
            
            if not resp:
                raise Exception (f'Failed to write parquet at: {s3url}')
            
        except Exception as e:
            print(f"writeParquet: {str(e)}")
            return resp
        return resp                     
    
    def readParquet(self, file_prefix, partition_predicate):
        """
        Reads entire parquet dataset if partition_predicate is None, If supplied, reads one partition. Returns a pandas dataframe.
        Reference: https://aws-sdk-pandas.readthedocs.io/en/stable/stubs/awswrangler.s3.read_parquet.html
        ::file_prefix: Prefix for the table. E.g: "s3://neu-data-analytics/persona_data/inquiry_data" 
        ::partition_predicate: partition string: E.g: "<column_name1>=<value>/<column_name2>=<value>/...." format. Depends on the number of levels of partitioning
        """
        df = pd.DataFrame()
        urls = []
        try:
            # Read each partition from the supplied list. 
            s3url = self.getS3Url(file_prefix)
            
            # If there are partitions specified, read only those specific partitions. Else read the entire parquet file
            if partition_predicate:
                url = f"{s3url}/{partition_predicate}"
            else:
                url = s3url
            
            print(url)    
            df = wr.s3.read_parquet(path=url, dataset=True, boto3_session=self.boto3session, ignore_empty=True, ignore_index=True, path_suffix=[".snappy.parquet"])   
            #print("DF type is this ",type(df),)  
        except Exception as e:
            print(f"readParquet: {str(e)}")
        return df 
    
    def readJSON(self, file_prefix):
        """
        Reads a JSON file with the specified file_prefix from the S3 bucket. 
        Returns a dictionary of the format {"data": {..json data read from s3 file}, "status": "ok|FileNotFound|error|..."}
        """
        resp = {"data":None, "status":None}
        try:
            if self.existS3Key(file_prefix):
                result = self.s3client.get_object(Bucket=self.bucket_name, Key=file_prefix)
                if not result:
                    raise Exception("ReadError")
                jsonbody = result.get("Body").read().decode()
                if not jsonbody:
                    raise Exception("EmptyJson")
                resp = {"data":jsonbody, "status":"ok"}
            else:
                raise Exception("FileNotFound")
        except Exception as e:
            resp = {"status":str(e), "data":None}
            print(f"readJSON: {str(e)}")
        return resp

In [5]:
def getUniquePartitionValues(df):
    return df[["part_appcreated_month","part_appcreated_date"]].drop_duplicates().to_dict('records')

In [22]:
# def getJsonData(dataframe,merge_keys_json,json_col_name,json_file_path,table_name):
#     try:
#         json_list = []
#         for index, row in df.iterrows():
#             row_json = dict()        
#             file_prefix_json = '{}/{}_{}_{}.json'.format(json_file_path,table_name,row[merge_keys_json[0]],row[merge_keys_json[1]].replace(':',''))
# #             print(file_prefix_json)
#             json_data = s3obj.readJSON(file_prefix_json)
#             if json_data['status'] == 'ok': 
#                 stud_obj = json.loads(json_data['data'])
#                 row_json[json_col_name] = str(stud_obj)
#             else:
#                 row_json[json_col_name] = str()
#             for ln in merge_keys_json:
#                 row_json[ln] = row[ln]

#             json_list.append(row_json)
#     except Exception as e:
#         print(f"getJSONData: {str(e)}")
#     return json_list

In [46]:
#test
def getJsonData(dataframe,merge_keys_json,json_col_name,json_file_path,table_name):
    try:
        json_list = []
        for index, row in df.iterrows():
#             print(index)
#             row_json = dict()        
            file_prefix_json = '{}/{}_{}_{}.json'.format(json_file_path,table_name,row[merge_keys_json[0]],row[merge_keys_json[1]].replace(':',''))
#             print(file_prefix_json)
            json_data = s3obj.readJSON(file_prefix_json)
#             print(json_data)
            if json_data['status'] == 'ok': 
                stud_obj = json.loads(json_data['data'])
                print(stud_obj)
#                 row_json[json_col_name] = str(stud_obj)
#             else:
#                 row_json[json_col_name] = str()
#             for ln in merge_keys_json:
#                 row_json[ln] = row[ln]

#             json_list.append(row_json)
    except Exception as e:
        print(f"getJSONData: {str(e)}")
    return json_list

In [7]:
def fetchExistingPartitions(parts_list, file_prefix, df_schema):
    """
    Given a list of partition predicates, read all those partitions and concatenate them into a single pandas dataframe
    Each item in parts_list will be of the type: {"part_appcreated_month": "202210", "part_appcreated_date": "20221005"}
    """
    part_dfs = []
    existing_data_df = pd.DataFrame()
    try:
        for partition_predicate in parts_list:
            df_part = s3obj.readParquet(file_prefix, f"part_appcreated_month={partition_predicate.get('part_appcreated_month')}/part_appcreated_date={partition_predicate.get('part_appcreated_date')}")
            
            if not df_part.empty:
                df_part["part_appcreated_month"] = partition_predicate.get('part_appcreated_month')
                df_part["part_appcreated_date"] = partition_predicate.get('part_appcreated_date')
                part_dfs.append(df_part)
            
        if part_dfs:
            existing_data_df = pd.concat(part_dfs)[df_schema] # Return only the columns which will be present in the incoming delta dataset. Strip all derived columns existing in parquet 
    except Exception as e:
        print(f"fetchExistingPartitions: {str(e)}")
    return existing_data_df

In [8]:
def getValueByPath(json_str, path):
    """
    json_str: Json string from which key value has to be extracted. Function converts the JSON string into dictionary internally.
    path: The path for the key value. Each element is spearated by |. 
          E.g 1: The path "address|zip" will fetch "56789" from json: {"address": {"zip": "56789", "city":"xyz"}}
          E.g 2: The path "employees#1|name" will fetch "abc" from json: {"employees":[{"name":"xyz", "age":34}, {"name":"abc", "age":30}, {"name":"lmnop", "age":31}]}
    """
    val = None
    try:
        if not path:
            raise Exception
        if not json_str:
            raise Exception
            
        path_elements = path.split("|")
        obj = json.loads(json_str)
        for element in path_elements:
            node_name_splits = element.split('#') # Incase, the element name contains the array index
            node_name = node_name_splits[0]
            array_index = int(node_name_splits[1]) if len(node_name_splits) > 1 else -1
            
            if array_index < 0:
                obj = obj.get(node_name)
            else:
                obj = obj.get(node_name)[array_index]
            
        val = obj    
    except Exception as e:
        pass
    return val

In [9]:
def flatten_scienaptic_data(df):
    ext_cols = {
        "json_decision":"Decision",
        "json_final_grade": "Final_Grade",
        "json_final_score":"Final_Score",
        "json_applicant_grade":"Applicant_Grade",
        "json_applicant_score":"Applicant_score",
        "json_coapplicant_grade":"CoApplicant_Grade",
        "json_coapplicant_score":"CoApplicant_score",
        "json_recommended_loan_amount":"Recommended_Laon_Amount",
        "json_recommended_interest_rate":"Recommended_Interest_Rate",
        "json_recommended_term_of_loan_in_months":"Recommended_Term_of_Loan_in_months"
         #"_embedded|node|config|workflow_version_id"
    }
    
    try:
        for ext_col in ext_cols.keys():
            #print(ext_col)
            df[ext_col] = df.apply(lambda row: getValueByPath(row["scienaptic_data"], ext_cols[ext_col]), axis=1)
    except Exception as e:
        pass
    return df 

In [10]:
def mergeDeltaWithExistingParquet(df, file_prefix, partition_cols, merge_keys, df_schema, s3obj):
    """
    ::df: dataframe having delta records for the batch
    ::file_prefix: The parquet file name. E.g: persona_inquiry_data
    ::partition_cols: Columns used as partition keys. Must be a list, column names in the order of partition hierarchy. E.g: ["part_appcreated_month", "part_appcreated_date"]
    ::merge_keys: Unique identifying columns that can be used to join delta dataframe with existing parquet data, inorder to identify update candidates
    ::final_schema: Final list of columns to be stored into parquet, in the exact order.
    """
    if df.empty:
        print("mergeDeltaWithExistingParquet: Delta records dataset is empty. Nothing to merge.")
        return df
    
    df_final = pd.DataFrame()
    records_written = 0
    try:
        parquet_exists = s3obj.existS3Key(file_prefix)
        
        if not parquet_exists:
            print(f"mergeDeltaWithExistingParquet: Parquet file {file_prefix} Does not exist.")
            df_final = df
        else:
            print(f"mergeDeltaWithExistingParquet: Parquet file {file_prefix} Exists. Preparing delta")
            
            delta_partitions_list = getUniquePartitionValues(df)
            existing_data_df = fetchExistingPartitions(delta_partitions_list, file_prefix, df_schema)
    
            # Left join existing records(in S3) with the incoming delta records. Use the columns that uniquely identifies a record
            if not existing_data_df.empty:
                # get only the merge columns from the delta set - just to do a left join with existing data
                df_delta_filter = df[merge_keys].copy(deep=True).reset_index(drop=True)
                df_delta_filter["new_delta"] = 1
                
                # Join with the incoming delta set and get the new_delta column =1, if the record exists in the incoming delta set
                existing_marked_df = existing_data_df.merge(df_delta_filter, how="left", on=merge_keys).copy(deep=True)
    
                # Filter out records from existing S3 records, which found a match in the incoming delta set.
                df_existing_portion = existing_marked_df.query("~(new_delta==1)").copy(deep=True)
                df_existing_portion["new_delta"] = 0

                df["new_delta"] = 1 # Just to match the list of columns

                # Now concatenate the incoming delta set (full), with those existing rows from S3 which does not exist in the new delta set.
                # This makes the full record set for the target.
                df_final = pd.concat([df_existing_portion, df])[df_schema]
            else:
                df_final = df

    except Exception as e:
        print(f"mergeDeltaWithExistingParquet: {str(e)}")
    return df_final

In [11]:
# Write the final dataframe to parquet
def write2Parquet(df, file_prefix, partition_cols, merge_keys, parquet_schema, s3obj, parquet_definition, glue_database, glue_table):
    res = {}
    try:
        # Write the prepared dataset back into S3. "overwrite_partitions" mode is being used while writing into an existing parquet file.
        # This will ensure that, if there is a record that needs to go into an existing S3 partition, then that entire partition is overwritten.
        # The step above makes sure that the dataset being overwritten contains all records that must be present in the target partition.
        if not df.empty:
            df_final = df[parquet_schema].copy() # Selecting columns in the correct order
            print("write2Parquet: Writing parquet...")
            resp = s3obj.writeParquet(df_final, file_prefix, partition_cols, parquet_definition, glue_database, glue_table)
            if not resp: 
                raise Exception(f"Failed to write into [{file_prefix}] parquet.")    
            res = {"status":True, "records_written": df.shape[0]}
        else:
            res = {"status":True, "records_written": 0}
            print("write2Parquet: Final delta record set is empty. Nothing to write to S3.")
    except Exception as e:
        print(f"write2Parquet: {str(e)}")
        res = {"status":False, "records_written": 0}
    return res
    
def map_parquet_types_to_pandas(merge_keys, parquet_definition):
    type_map = None
    try:
        type_map = {}
        for col in merge_keys:
            parquet_type = parquet_definition.get(col)
            pandas_type = "string"
            if parquet_type in ['bigint', 'integer']:
                pandas_type = "Int64"
            if parquet_type in ['double']:
                pandas_type = "float64"
            if parquet_type in ['date']:
                pandas_type = "datetime64"                
            # Add more if needed
            
            type_map[col] = pandas_type
    except Exception as e:
        print(f"map_parquet_types_to_pandas: {str(e)}")
    return type_map   

In [12]:
def updateBatchControl(df, timestamp_column_name, batch_code, rdb_conn):
    """
    df: Dataframe containing all the delta records that were written into parquet
    timestamp_column_name: Column containing the timestamp value (preferably modified date time) in epoch format
    batch_code: The batch code of the table archival script (PK from batch_control table)
    rdb_conn: Redshift connection object
    """
    if df.empty:
        return True

    try:
        # Calculate the max(timestamp) from the given timestamp column in the dataframe.
        max_timestamp = res[timestamp_column_name].max()
        # Convert epoch number into formatted string representation
        next_start_time = datetime.fromtimestamp(max_timestamp / 1000.0).strftime('%Y-%m-%d %H:%M:%S.%f')
        # Prepare update SQL for batch control
        updsql = "update stage.batch_control set LAST_TIMESTAMP=%s where BATCH_CODE=%s"
        updparams = [next_start_time, batch_code]
        updresult = rdb_conn.execute(updsql, updparams)

        if updresult is None or updresult != 1:
            raise Exception("Batch_control update failed")
    except Exception as e:
        print(f"updateBatchControl: {str(e)}")
        return False
    return True

In [13]:
class RedshiftDb(object):
    def __init__(self, config):
        self.env = config.get("env")
        self.logger = logging.getLogger("RedshiftDb")
        self.dbConn = self._get_db_connection(config)

    def _get_db_connection(self, config):
        dbConn = None
        try:
            dbConn = redshift_connector.connect(host=config.get("REDSHIFT_HOST"),
                                                port=config.get("REDSHIFT_PORT"),
                                                database=config.get("REDSHIFT_DATABASE"),
                                                user=config.get("REDSHIFT_USER"),
                                                password=config.get("REDSHIFT_PWD")
                                                )
            if not dbConn:
                raise Exception("Could not connect")
        except Exception as e:
            self.logger.error(f"connect: {str(e)}")
        return dbConn


    def fetchAll(self, sql, params=None):
        """
        Takes a SQL string optional list of parameters as input
        Returns pandas dataframe if the query was successful and has records.
        On error or when result is empty, returns a None
        """
        if not params:
            params = []

        result = None
        try:
            if not self.dbConn:
                raise Exception("Redshift connection not available")

            cursor = self.dbConn.cursor()
            cursor.execute(sql, params)
            result = cursor.fetch_dataframe()

        except Exception as e:
            self.logger.error(f"fetchall: {str(e)}")
        return result


    def execute(self, sql, params=None):
        """
        Executes one sql with an optional list of parameters
        Returns number of records affected, when the SQL is successful. None in case of errors
        """
        if not params:
            params = []

        result = None
        try:
            if not self.dbConn:
                raise Exception("Redshift connection not available")

            cursor = self.dbConn.cursor()
            res = cursor.execute(sql, params)
            self.dbConn.commit()
            result = res.rowcount

        except Exception as e:
            self.logger.error(f"execute: {str(e)}")
            if self.dbConn:
                self.dbConn.rollback()
        return result

Start->

Test the connection with bucket

In [14]:
#Connection
print(f"Starting Batch Job: Incremental dump of scienaptic table data into Parquet")
config = getEnvConfig(env="DEV") # Change env="PROD" while deploying to production
if not config:
    print(f"ERROR: Could not get credential configurations.")
    

In [15]:
s3obj = AwsS3(config)
s3obj.initialize()

#rdb = RedshiftDb(config)  # <<<<<---- Uncomment once Sprinkle moves to neumoney n/w
batch_code = "SCIENAPTIC_JSON"
timestamp_column = "modified_date"

glue_database = "archive_redshift_db"
glue_table = "credit_limit_scienaptic_details"

file_prefix = "application_data/credit_limit/scienaptic_data_v0"
partition_cols = ["part_appcreated_month", "part_appcreated_date"]

# Columns used to identify unique records from the data set
merge_keys = ['application_id','scienaptic_uuid','part_appcreated_month','part_appcreated_date']
# merge_keys_JSON[0] shauld be primary key and merge_keys_JSON[1] shauld be created date
merge_keys_JSON = ['application_uuid','application_created_date','scienaptic_uuid','part_appcreated_month','part_appcreated_date']

In [16]:
# Read the delta records from scienaptic tables, using the explore object. (Explore name: scienaptic_detail_data)
explore_id = '902fcc373d1f49db9c811105b2921534' #scienaptic_detail_data
df = sp.read_explore(explore_id)


sql = (
" select tab.* from"
" ( "
"select "
    " apps.id as application_id,"
	" stg.id as scienaptic_id,"
	" stg.scienaptic_uuid,"
	" stg.application_uuid,"
	" stg.product_uuid,"
	" stg.user_id,"
	" stg.decision,"
	" stg.created_at,"
	" stg.created_date,"
	" stg.modified_date,"
	" stg.credit_score,"
	" stg.credit_limit,"
	" stg.applied_credit_limit,"
	" TIMESTAMP 'epoch' + stg.modified_date/1000 *INTERVAL '1 second' as modified_timestamp,"
	" TIMESTAMP 'epoch' + apps.created_date/1000 *INTERVAL '1 second' as application_created_date,"
	" current_timestamp as batch_timestamp "
" from "
" stage.ds_postgres_credit_limit_scienaptic_details_redshift_scienaptic_details_stg stg "
" left join stage.ds_postgres_onboarding_application_redshift_application_stg apps on stg.application_uuid=apps.application_uuid "
") tab "
" join stage.batch_control bc on tab.modified_timestamp >= bc.last_timestamp and batch_code = %s "
)
sql_params = [batch_code]
# Uncomment once Sprinkle has moved to neumoney network
# Fetch data from stage table in redshift
# df = rdb.fetchAll(sql, sql_params)


# Convert the application created datetime into YYYYMMDD format for partitioning key
df["application_created_date"] = df["application_created_date"].fillna('1970-01-01T0:0:0.0+00:00')
df["part_appcreated_month"] = df["application_created_date"].apply(lambda x: pd.to_datetime(x).strftime("%Y%m"))  
df["part_appcreated_date"] = df["application_created_date"].apply(lambda x: pd.to_datetime(x).strftime("%Y%m%d"))  

print(f"Fetched {df.shape[0]} records to be added into [{file_prefix}] S3 file")

In [16]:
# bring in json column and any other derived columns
# df["scienaptic_data"] = str()
#  [....,part_appcreated_month, part_appcreated_date, json_data]


In [17]:
parquet_definition = {'application_id':'bigint','scienaptic_id':'bigint','scienaptic_uuid':'string','application_uuid':'string',
                    'product_uuid':'string','user_id':'string','decision':'string','created_at':'bigint','created_date':'bigint',
                    'modified_date':'bigint','credit_score':'double','credit_limit':'double','applied_credit_limit':'double',
                    'modified_timestamp':'date','application_created_date':'date','batch_timestamp':'date' , 'scienaptic_data': 'string' 
                    }

df_schema = list(parquet_definition.keys())
df_schema.extend(partition_cols)

# Any calculated columns, process here
# Bring these column values from the JSON log storage
# None
parquet_schema = list(parquet_definition.keys())
parquet_schema.extend(partition_cols)

In [0]:
# test
json_col_name = 'scienaptic_data'
json_file_path = 'testing-dataset'
table_name = 'SCIENAPTIC'

lst_json = getJsonData(df,merge_keys_JSON,json_col_name,json_file_path,table_name)
print(lst_json)
# json_df = pd.DataFrame(lst_json)


In [21]:
# json_col_name = 'scienaptic_data'
# json_file_path = 'testing-dataset'
# table_name = 'SCIENAPTIC'

# lst_json = getJsonData(df,merge_keys_JSON,json_col_name,json_file_path,table_name)
# json_df = pd.DataFrame(lst_json)


In [19]:
df1 = pd.merge(df,json_df, how="left", on=merge_keys_JSON)

df_new = df1[df_schema].copy()
# [....,json_data, part_appcreated_month, part_appcreated_date]

In [20]:
# Convert column data types in the delta dataframe, inorder to match the data types of the existing parquet file
# If the delta dataframe's data types for merge keys are not matching with parquet file's data types, while merging these two (inorder to remove duplicates), will not find matching records and will cause duplicates
pandas_type_map = map_parquet_types_to_pandas(merge_keys, parquet_definition)
print(pandas_type_map)

# Convert pandas dataframe column types using the map
# df = df.astype(pandas_type_map)
df_new = df_new.astype(pandas_type_map)

In [25]:
# Merge the delta records fetched from the explore object with existing parquet file in S3, so that every affected partition is reconstructed in the dataframe. 
# Note: Overlapping partitions will be overwritten in S3.

final_df = mergeDeltaWithExistingParquet(df_new, file_prefix, partition_cols, merge_keys, df_schema, s3obj)
print(final_df.shape)


# # Now extract relevant attributes from various JSON fields and add them as columns into final_df.
# # For scienaptic, there is one JSON column - scienaptic_data
#print("Extracting JSON attributes and adding to final dataframe...")
#final_df = flatten_scienaptic_data(final_df)

In [26]:
# # Write the data into parquet, with the parquet schema.
resp = write2Parquet(final_df, file_prefix, partition_cols, merge_keys, parquet_schema, s3obj, parquet_definition, glue_database, glue_table)
print(resp)

In [0]:
# Update batch control record for the batch job with the latest time stamp
if resp.get("status") and resp.get("records_written") > 0:
    #updateBatchControl(final_df, timestamp_column, batch_code, rdb)
    print(f"Updated batch_control: {batch_code}")
else:
    print(f"No records written. batch_control not updated")