1. Reads metadata from a table in the AWS Glue Data Catalog
2. Apply transformations to the data source
3. Writes the metadata back to the table in the Data Catalog, store the transformed data to a S3 path


In [1]:
import sys

import pandas
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import log10
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

import pyspark.sql.functions as fc
from io import BytesIO, StringIO
import boto3
from urllib.parse import urlparse
from neptune_python_utils.glue_gremlin_csv_transforms import GlueGremlinCsvTransforms
import databricks.koalas as ks

ModuleNotFoundError: No module named 'awsglue'

In [5]:
def get_features_and_labels(transactions_df, transactions_id_cols, transactions_cat_cols):
    # Get features
    non_feature_cols = ['isFraud', 'TransactionDT'] + transactions_id_cols.split(",")
    feature_cols = [col for col in transactions_df.columns if col not in non_feature_cols]
    logger.info(f'transaction_id_cols columns: {transactions_id_cols}')
    logger.info(f'feature columns: {feature_cols}')
    logger.info(f'categorical columns: {transactions_cat_cols.split(",")}')

    # Transform categorical features
    for col in transactions_cat_cols.split(","):
        indexer = StringIndexer(inputCol=col, outputCol=col+"_index")
        encoder = OneHotEncoder(inputCol=col+"_index", outputCol=col+"_vec")
        transactions_df = indexer.fit(transactions_df).transform(transactions_df)
        transactions_df = encoder.transform(transactions_df)
        feature_cols.remove(col)
        feature_cols.append(col+"_vec")
    
    # Apply log transformation to 'TransactionAmt'
    transactions_df = transactions_df.withColumn('TransactionAmt', log10('TransactionAmt'))
    # Assemble features into a single vector
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    features = assembler.transform(transactions_df)
    logger.info(f'Transformed feature columns: {list(features.columns)}')

    # get labels
    labels = transactions_df.select('TransactionID', 'isFraud')
    logger.info(f'Transformed feature columns: {list(labels.columns)}')

    return features, labels


In [None]:
def dump_edge_as_graph(name, dataframe):
    # upsert edge
    logger.info(f'Creating glue dynamic frame from spark dataframe for the relation between transaction and {name}...')
    dynamic_df = DynamicFrame.fromDF(dataframe, glueContext, f'{name}EdgeDF')
    relation = GlueGremlinCsvTransforms.create_prefixed_columns(dynamic_df, [('~from', TRANSACTION_ID, 't'),('~to', name, name)])
    relation = GlueGremlinCsvTransforms.create_edge_id_column(relation, '~from', '~to')
    relation = SelectFields.apply(frame = relation, paths = ["~id", '~from', '~to'], transformation_ctx = f'selection_{name}')
    logger.info(f'Upserting edges between \'{name}\' and transaction...')
    dump_df_to_s3(relation.toDF(), f'relation_{name}_edgelist', graph = True)

In [6]:
def get_relations_and_edgelist(transactions_df, identity_df, transactions_id_cols, output_dir):
    # Get relations
    edge_types = transactions_id_cols.split(",") + list(identity_df.columns)
    #logging.info("Found the following distinct relation types: {}".format(edge_types))
    id_cols = ['TransactionID'] + transactions_id_cols.split(",")
    full_identity_df = transactions_df[id_cols].merge(identity_df, on='TransactionID', how='left')
    #logging.info("Shape of identity columns: {}".format(full_identity_df.shape))

    # extract edges
    edges = {}
    for etype in edge_types:
        edgelist = full_identity_df[['TransactionID', etype]].dropna()
        #edgelist.to_csv(os.path.join(output_dir, 'relation_{}_edgelist.csv').format(etype), index=False, header=True)
        #logging.info("Wrote edgelist to: {}".format(os.path.join(output_dir, 'relation_{}_edgelist.csv').format(etype)))
        edges[etype] = edgelist
    return edges

In [None]:
def dump_df_to_s3(df, objectName, header = True, graph = False):
    if graph == False:
        objectKey = f"{args['output_prefix']}{args['JOB_RUN_ID']}/{objectName}"
        logger.info(f'Dumping edge "{objectName}"" to bucekt prefix {objectKey}')
    else:
        objectKey = f"{args['output_prefix']}{args['JOB_RUN_ID']}/graph/{objectName}"
        logger.info(f'Dumping edge "{objectName}" as graph to bucket prefix {objectKey}')
        
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(df, glueContext, f"{objectName}DF"),
        connection_type="s3",
        connection_options={"path": objectKey},
        format_options={"writeHeader": header},
        format="csv")

In [None]:
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, 
                            ['JOB_NAME',
                            'Database',
                            'transaction_table',
                            'identity_table',
                            'id_cols',
                            'cat_cols'
                            'output_prefix',]
                            )

sc = SparkContext()
glueContext = GlueContext(sc) # Creates a new glue context
spark = glueContext.spark_session
logger = glueContext.get_logger() # use to output the log messages

# Create a DynamicFrame using the 'from_catalog' method
dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name)

# Apply a transformation to the DynamicFrame
dyf_transformed = dyf.transform(lambda x: x)

# Define the S3 path where the transformed data will be stored
s3_output_path = "s3://my-bucket/transformed-data"

# Write the transformed DynamicFrame to S3
transactions = glueContext.create_dynamic_frame.from_catalog(database=args['database'], table_name=args['transaction_table'])
identities = glueContext.create_dynamic_frame.from_catalog(database=args['database'], table_name=args['identity_table'])
# Write the transformed DynamicFrame back to the Glue Data Catalog
glueContext.write_dynamic_frame.from_catalog(frame = dyf_transformed, database = db_name, table_name = tbl_name)

#job.commit()
