# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [1]:
#%help

In [1]:
#import os

#mysql_jar = "/opt/aws/glue/lib/mysql-connector-java-8.0.23.jar"
#os.environ['PYSPARK_SUBMIT_ARGS'] = f'--jars {mysql_jar} pyspark-shell'

#print( os.environ['PYSPARK_SUBMIT_ARGS'] )

--jars /opt/aws/glue/lib/mysql-connector-java-8.0.23.jar pyspark-shell


####  Run this cell to set up and start your interactive session.


In [3]:
#%idle_timeout 2880
#%glue_version 5.0
#%worker_type G.1X
#%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

# JDBC_JAR_PATH = "/opt/aws/glue/lib/mysql-connector-j-9.2.0.jar"

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Extract DynamicFrames from an AWS Glue Data Catalog and display their schemas

In [4]:
import boto3

glue = boto3.client('glue')

database_name = 'fraud-detection-gluedb'

# Obtener las tablas de la base de datos especificada
response = glue.get_tables(DatabaseName=database_name)

datatables = response['TableList']

print([e['Name'] for e in datatables])

['fraud_detection_certs', 'fraud_detection_payment_methods', 'fraud_detection_review', 'fraud_detection_shops', 'fraud_detection_ssl_issuer']


In [5]:
import boto3
from botocore.exceptions import ClientError
import json

def get_secret():

    secret_name = "prod/fraud-detection-db"
    region_name = "us-east-1"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        raise e

    secret = get_secret_value_response['SecretString']
    return json.loads(secret)


In [8]:
import boto3

connection_mysql_options = lambda dbtable : {
    "url": f"jdbc:mysql://{get_secret()['host']}:{get_secret()['port']}/fraud-detection", 
    "dbtable": dbtable, 
    "user": get_secret()["username"], 
    "password": get_secret()["password"], 
    "customJdbcDriverS3Path": "s3://fraud-detection-data-ars/mysql-connector-java-8.0.23.jar",
    "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"
}

# Crear un cliente de Glue
glue = boto3.client('glue')

database_name = 'fraud-detection-gluedb'

# Obtener las tablas de la base de datos especificada
response = glue.get_tables(DatabaseName=database_name)

datatables = response['TableList']

local_flag=True

dyf_set = dict()
for name in datatables:
    table_name = name['Name'].removeprefix("fraud_detection_")
    print(f"Table {table_name}")
    
    # fixed access with from_catalog from docker container
    if local_flag: # USING DOCKER IMAGE
        dyf_set[table_name] = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql_options(table_name.upper()))
        dyf_set[table_name].printSchema()
    else: # IN AWS PLATFORM
        dyf_set[table_name] = glueContext.create_dynamic_frame.from_catalog(database=database_name, table_name=name['Name'])
        dyf_set[table_name].printSchema()


Table certs
root
|-- cert_id: int
|-- cert_expire_date: timestamp
|-- shop_id: int
|-- ssl_id: int

Table payment_methods
root
|-- payment_method_id: int
|-- allow_credit_card: boolean
|-- allow_money_back: boolean
|-- allow_cash_on_delivery: boolean
|-- allow_crypto: boolean
|-- shop_id: int

Table review
root
|-- review_id: int
|-- trust_pilot_score: int
|-- site_jabber_review: boolean
|-- tranco_rank: int
|-- shop_id: int

Table shops
root
|-- shop_id: int
|-- domain_url: string
|-- domain_registration_date: timestamp
|-- is_young_domain: boolean
|-- contact_email: boolean
|-- logo_url: boolean
|-- legitimate: boolean

Table ssl_issuer
root
|-- ssl_id: int
|-- ssl_issuer: string
|-- ssl_organization: string



#### Convert the DynamicFrame to a Spark DataFrame and display a sample of the data

In [9]:
df_set = dict()

for table_name, dyf in dyf_set.items():
    df_set[table_name] = dyf.toDF()
    print(f"Table {table_name}")
    df_set[table_name].show(5)



Table certs


                                                                                

+-------+-------------------+-------+------+
|cert_id|   cert_expire_date|shop_id|ssl_id|
+-------+-------------------+-------+------+
|    777|2023-10-24 13:20:41|    777|     3|
|    954|2023-09-13 20:01:27|    954|     2|
|     60|2023-09-18 20:00:40|     60|     2|
|    206|2023-09-05 04:24:48|    206|     2|
|    427|2023-10-17 08:48:18|    427|     3|
+-------+-------------------+-------+------+
only showing top 5 rows

Table payment_methods


                                                                                

+-----------------+-----------------+----------------+----------------------+------------+-------+
|payment_method_id|allow_credit_card|allow_money_back|allow_cash_on_delivery|allow_crypto|shop_id|
+-----------------+-----------------+----------------+----------------------+------------+-------+
|             1052|             true|            true|                 false|       false|   1052|
|               80|             true|            true|                 false|       false|     80|
|              214|             true|            true|                 false|       false|    214|
|              561|            false|            true|                 false|       false|    561|
|              979|             true|           false|                  true|       false|    979|
+-----------------+-----------------+----------------+----------------------+------------+-------+
only showing top 5 rows

Table review


                                                                                

+---------+-----------------+------------------+-----------+-------+
|review_id|trust_pilot_score|site_jabber_review|tranco_rank|shop_id|
+---------+-----------------+------------------+-----------+-------+
|      579|             null|             false|         -1|    579|
|      225|               -1|             false|         -1|    225|
|     1095|             null|             false|         -1|   1095|
|      826|               -1|             false|         -1|    826|
|      144|               -1|             false|         -1|    144|
+---------+-----------------+------------------+-----------+-------+
only showing top 5 rows

Table shops


                                                                                

+-------+--------------------+------------------------+---------------+-------------+--------+----------+
|shop_id|          domain_url|domain_registration_date|is_young_domain|contact_email|logo_url|legitimate|
+-------+--------------------+------------------------+---------------+-------------+--------+----------+
|    709| https://ikonisch.pt|                    null|           true|         true|    true|      true|
|    186|https://www.gowbl...|     2023-06-23 05:43:00|           true|         true|    true|     false|
|   1115|https://www.maxim...|     2000-10-26 00:00:00|          false|         true|    true|      true|
|   1072|https://www.enigm...|     2023-05-29 00:00:00|           true|        false|    true|     false|
|    268|https://www.footw...|     2023-04-20 00:00:00|           true|        false|   false|     false|
+-------+--------------------+------------------------+---------------+-------------+--------+----------+
only showing top 5 rows

Table ssl_issuer


[Stage 8:>                                                          (0 + 1) / 1]

+------+--------------------+----------------+
|ssl_id|          ssl_issuer|ssl_organization|
+------+--------------------+----------------+
|    31|GeoTrust TLS RSA ...|    DigiCert Inc|
|    37|Thawte EV RSA CA ...|    DigiCert Inc|
|     3|                  E1|   Let's Encrypt|
|     1|Cloudflare Inc EC...|Cloudflare, Inc.|
|    35|GeoTrust Global T...|    DigiCert Inc|
+------+--------------------+----------------+
only showing top 5 rows



                                                                                

### Join dataframes

In [23]:
data = df_set["shops"].join(
    df_set["payment_methods"], 
    "shop_id", "left"
).drop("payment_method_id")

data = data.join(
    df_set["review"],
    "shop_id", "left"
).drop("review_id")

data.toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,shop_id,domain_url,domain_registration_date,is_young_domain,contact_email,logo_url,legitimate,allow_credit_card,allow_money_back,allow_cash_on_delivery,allow_crypto,trust_pilot_score,site_jabber_review,tranco_rank
0,496,https://delozastore.de,NaT,True,True,True,False,False,True,False,False,,False,-1
1,1005,https://www.barnesandnoble.com,1995-08-10 04:00:00,False,False,True,True,False,False,False,False,4.0,True,-1
2,744,https://www.chicballus.com,2023-05-09 01:57:00,True,False,True,False,True,True,False,False,,False,-1
3,811,https://www.usridingapparel.com,2023-07-26 03:37:00,True,False,True,False,True,True,False,False,,False,-1
4,474,http://www.aloevera.lt,2014-12-09 00:00:00,False,True,True,True,True,False,False,False,-1.0,False,-1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1135,154,https://www.sprintis.eu,NaT,True,False,True,True,True,True,True,False,-1.0,False,-1
1136,1119,https://store.dji.com/l,1996-01-16 05:00:00,False,True,True,True,True,True,False,False,2.0,False,-1
1137,856,https://stact.de,NaT,True,True,True,True,True,True,False,False,-1.0,False,-1
1138,1129,https://www.knitwearonlienshop.com,2023-07-27 04:53:00,True,False,True,False,True,True,True,False,,False,-1


#### Write the data in the DynamicFrame to a location in Amazon S3

In [17]:
from awsglue.dynamicframe import DynamicFrame

glueContext.write_dynamic_frame.from_options(
    frame=DynamicFrame.fromDF(data.coalesce(1), glueContext, "random"),
    connection_type="s3",
    connection_options={"path": "s3://fraud-detection-data-ars/dataset"},
    format="csv",
    format_options={
        "quoteChar": -1,
    },
)

25/03/20 17:40:18 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

<awsglue.dynamicframe.DynamicFrame at 0x7f367b405a20>