In [1]:
! python3 --version
! pip3 --version

Python 3.7.15
pip 22.3.1 from /home/glue_user/.local/lib/python3.7/site-packages/pip (python 3.7)


In [2]:
! pip3 install mysql-connector-python

Defaulting to user installation because normal site-packages is not writeable
Collecting mysql-connector-python
  Downloading mysql_connector_python-8.0.32-cp37-cp37m-manylinux1_x86_64.whl (23.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.5/23.5 MB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting protobuf<=3.20.3,>=3.11.0
  Downloading protobuf-3.20.3-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: protobuf, mysql-connector-python
Successfully installed mysql-connector-python-8.0.32 protobuf-3.20.3

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade

In [1]:
! cat /etc/os-release
! cp ./mysql-connector-java-5.1.49.jar $SPARK_HOME/jars
! ls $SPARK_HOME/jars

NAME="Amazon Linux"
VERSION="2"
ID="amzn"
ID_LIKE="centos rhel fedora"
VERSION_ID="2"
PRETTY_NAME="Amazon Linux 2"
ANSI_COLOR="0;33"
CPE_NAME="cpe:2.3:o:amazon:amazon_linux:2"
HOME_URL="https://amazonlinux.com/"


In [4]:
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, lit, concat, create_map
from pyspark.sql.types import StringType
from itertools import chain
from awsglue.context import GlueContext
from awsglue.job import Job

import csv, io
import mysql.connector
from functools import partial
from itertools import chain

In [5]:
spark = SparkSession.builder \
           .appName('123.com') \
           .config("spark.jars", "mysql-connector-java-5.1.49.jar") \
           .getOrCreate()

In [14]:
class DB_Connection():
    def __init__(self, db_name, hostname = None, user = None, password = None, driver_type = "mysql", driver_port = 3306, glue_cilent = None, connection_name = None):
        try:
            self.glue = False
            if glue_cilent is not None and connection_name is not None:
                self.glue = True
                connection = glue_cilent.get_connection(Name=connection_name)
                self.hostname = connection['Connection']['ConnectionProperties']['JDBC_CONNECTION_URL']
                self.hostname = self.hostname.split('/')[2].split(':')[0]
                self.user = connection['Connection']['ConnectionProperties']['USERNAME']
                self.password = connection['Connection']['ConnectionProperties']['PASSWORD']
            else:
                self.hostname = hostname
                self.user = user
                self.password = password
            
            self.conn = None
            self.db_name = db_name
            self.driver_type = driver_type
            self.driver_port = driver_port
            
            self.jdbc_url = f'jdbc:{self.driver_type}://{self.hostname}:{self.driver_port}/{self.db_name}'
        except Exception as e:
            print(f"Error getting connection settings for DB: {db_name} at {hostname}\n{e}")
    def get_options(self, table):
        return {'url': self.jdbc_url,'user': self.user, 'password': self.password, 'dbtable': table}
    def get_conn(self, dbType = "mysql", mandatory = False, ssl_disabled= True):
        # self.conn.is_connected()
        if dbType == "mysql":
            self.conn =  mysql.connector.connect(user=self.user, password=self.password, host=self.hostname, database=self.db_name, ssl_disabled= ssl_disabled)
        if mandatory: assert self.conn is not None, f"Did not manage to connect to {dbType}"
        return self.conn
    def close_conn(self):
        if self.conn is not None and self.conn.is_connected():
            self.conn.close()

def write_to_csv(output, filename):
    output = output.getvalue().split('\r\n')[:-1]
    if len(output) > 0:
        # print(output)
        output = [x.split(',') for x in output]
        with open(filename, 'w') as out_file:
            writer = csv.writer(out_file, delimiter =",")
            writer.writerows(output)

def db1_customer_f_partition(partitionData, db1, bucket = None, outputfolder = None):
    output = io.StringIO()
    errorExist = False
    for row in partitionData:
        if db1.conn is None or not db1.conn.is_connected():
            db1.get_conn(dbType = 'mysql')
        db1_cursor = db1.conn.cursor()
        
        row = row.asDict()
        try: # %(emp_no)s
            mapping = {'customer_id': row['id'], 'email': row['email'], 'tenant': row['TENANT']}
            db1_cursor.execute("""
                INSERT INTO customer(customer_id, email, tenant) 
                VALUES (%(customer_id)s,%(email)s,%(tenant)s) 
                ON DUPLICATE KEY UPDATE email = %(email)s;""" 
                , mapping)
            db1.conn.commit()
            # csv.DictWriter(output, row.keys()).writerow(row)
        except Exception as e: 
            print(e)
            errorExist = True
            csv.DictWriter(output, row.keys()).writerow(row)
        # yield Row(**row)
    db1.close_conn()
    if errorExist:
        print( outputfolder + 'card_customer_error.csv')
        write_to_csv(output, 'error1.csv') # local testing
#         s3_resource = boto3.resource('s3')
#         s3_resource.Object(bucket, outputfolder + 'card_customer_error.csv').put(Body=output.getvalue())
    print("DONE - db1_customer_f_partition")

def db1_card_f_partition(partitionData, db1, bucket = None, outputfolder = None):
    output = io.StringIO()
    errorExist = False
    for row in partitionData:
        if db1.conn is None or not db1.conn.is_connected():
            db1.get_conn(dbType = 'mysql')
        db1_cursor = db1.conn.cursor()
        
        row = row.asDict()
        try: # %(emp_no)s
            mapping = {'CARD_ID': row['card_id'], 'CARD_TYPE': row['card_type'], 'REWARD_TYPE': row['REWARD_TYPE'], 'TENANT': row['TENANT'], 'CUSTOMER_ID': row['id']}
            db1_cursor.execute("""
                INSERT INTO card(card_id, card_type, reward_type, tenant, customer_id) 
                VALUES (%(CARD_ID)s,%(CARD_TYPE)s,%(REWARD_TYPE)s,%(TENANT)s,%(CUSTOMER_ID)s) 
                ON DUPLICATE KEY UPDATE card_type = %(CARD_TYPE)s, reward_type = %(REWARD_TYPE)s, customer_id = %(CUSTOMER_ID)s;""" 
                , mapping)
            db1.conn.commit()
            # csv.DictWriter(output, row.keys()).writerow(row)
        except Exception as e: 
            print(e)
            errorExist = True
            csv.DictWriter(output, row.keys()).writerow(row)
        # yield Row(**row)
    db1.close_conn()
    if errorExist:
        write_to_csv(output, 'error1.csv') # local testing
        print(outputfolder + 'card_card_error.csv')
#         s3_resource = boto3.resource('s3')
#         s3_resource.Object(bucket, outputfolder + 'card_card_error.csv').put(Body=output.getvalue())
    print("DONE - db1_card_f_partition")

def db2_cardtype_f_partition(partitionData, db1, bucket = None, outputfolder = None):
    output = io.StringIO()
    errorExist = False
    for row in partitionData:
        if db1.conn is None or not db1.conn.is_connected():
            db1.get_conn(dbType = 'mysql')
        db1_cursor = db1.conn.cursor(buffered=True)
        
        row = row.asDict()
        try: # %(emp_no)s
            mapping = {'card_type': row['card_type'], 'tenant': row['TENANT']}
            db1_cursor.execute("""SELECT * FROM card_type where name=%(card_type)s AND tenant=%(tenant)s """, mapping)
            if db1_cursor.rowcount > 0 :
                db1_cursor.execute("""UPDATE card_type SET name=%(card_type)s , tenant=%(tenant)s WHERE name=%(card_type)s AND tenant=%(tenant)s""", mapping)
            else:
                db1_cursor.execute("""INSERT INTO card_type (name,tenant) VALUES (%(card_type)s , %(tenant)s )""", mapping )
            db1.conn.commit()
            # csv.DictWriter(output, row.keys()).writerow(row)
        except Exception as e: 
            print(e)
            errorExist = True
            csv.DictWriter(output, row.keys()).writerow(row)
        # yield Row(**row)
    db1.close_conn()
    if errorExist:
        write_to_csv(output, 'error1.csv') # local testing
        print(outputfolder + 'campaign_cardType_error.csv')
#         s3_resource = boto3.resource('s3')
#         s3_resource.Object(bucket, outputfolder + 'campaign_cardType_error.csv').put(Body=output.getvalue())
    print("DONE - db2_cardtype_f_partition")

def db2_customer_f_partition(partitionData, db1, bucket = None, outputfolder = None):
    output = io.StringIO()
    errorExist = False
    for row in partitionData:
        if db1.conn is None or not db1.conn.is_connected():
            db1.get_conn(dbType = 'mysql')
        db1_cursor = db1.conn.cursor(buffered=True)
        
        row = row.asDict()
        try: 
            mapping = {'email': row['email'], 'name': row['name'], 'phone_number': row['phone'], 'card_type_id': row['id'], 'card_type': row['card_type']}
            # print(mapping)
            if mapping["card_type_id"] is None:
                raise Exception ("No card type ID detected!")
            db1_cursor.execute("""SELECT * FROM customer where card_type_id=%(card_type_id)s AND name=%(name)s """, mapping)
            # print(db1_cursor.rowcount, "asd")
            if db1_cursor.rowcount > 0 :
                db1_cursor.execute("""UPDATE customer SET email=%(email)s, phone_number=%(phone_number)s WHERE card_type_id=%(card_type_id)s AND name=%(name)s""", mapping)
            else:
                db1_cursor.execute("""INSERT INTO customer (email,name,phone_number,card_type_id) VALUES (%(email)s , %(name)s , %(phone_number)s, %(card_type_id)s)""", mapping )
            db1.conn.commit()
            csv.DictWriter(output, row.keys()).writerow(row)
        except Exception as e: 
            print(e)
            errorExist = True
            csv.DictWriter(output, row.keys()).writerow(row)
        # yield Row(**row)
    db1.close_conn()
    if errorExist:
        write_to_csv(output, 'error1.csv') # local testing
        print(outputfolder + 'campaign_customer_error.csv')
#         s3_resource = boto3.resource('s3')
#         s3_resource.Object(bucket, outputfolder + 'campaign_customer_error.csv').put(Body=output.getvalue())
    print("DONE - db2_customer_f_partition")

In [7]:
REWARD_TYPE_MAPPING = {"scis_shopping": "cashback", "scis_freedom": "points", "scis_platinummiles": "miles", "scis_premiummiles": "miles"}
# s3_path = args.get('S3PATH')
TENANT = 'scis'
# BUCKET = args.get('BUCKET')
OUTPUTFOLDER = '.'
# print(s3_path, TENANT, BUCKET, OUTPUTFOLDER)

In [8]:
# db_name, hostname = None, user = None, password = None, 
card_db1_c = DB_Connection(db_name = 'card_db', hostname = 'host.docker.internal', driver_type = "mysql", driver_port = 3306, user = 'root', password = 'admin') # any connection in the vpc
campaign_db2_c = DB_Connection(db_name = 'campaign_db', hostname = 'host.docker.internal', driver_type = "mysql", driver_port = 3306, user = 'root', password = 'admin') # any connection in the vpc


In [9]:
s3_df = spark.read.format("csv").option("header", "true").load('./users.csv')

In [10]:
# create name and tenant column in s3_df (set-up)
s3_df = s3_df.drop(*["created_at","card_pan"])
s3_df = s3_df.withColumn('name', concat(col("first_name"), lit(" "), col("last_name")))
s3_df = s3_df.withColumn('TENANT', lit(TENANT))
s3_df = s3_df.drop(*["first_name","last_name"])
mapping_expr = create_map([lit(x) for x in chain(*REWARD_TYPE_MAPPING.items())])
s3_df = s3_df.withColumn('REWARD_TYPE', mapping_expr[col("card_type")] )

# Setup dataframe to insert to the respective databases -4: CUSTOMER_db1_df, CARD_db1_df, CARDTYPE_db2_df, CUSTOMER_db2_df
CUSTOMER_db1_df = s3_df.drop_duplicates(subset=['id', 'TENANT']).select(["id", "email", "TENANT"]) # .withColumn('isProcess', lit(0))
CARD_db1_df = s3_df.drop_duplicates(subset=['card_id', 'TENANT']).select(["card_id", "card_type", "REWARD_TYPE", "TENANT", "id"]) #.withColumn('isProcess', lit(0))
CARDTYPE_db2_df = s3_df.drop_duplicates(subset=['card_type', 'TENANT']).select(["card_type", "TENANT"])
CUSTOMER_db2_df = s3_df.drop_duplicates(subset=['name', 'card_type']).select(["email", "name", "phone", "card_type"])

In [12]:
# join foreign key to customer via card_type_id .option('driver', 'com.mysql.jdbc.Driver')
db2_CARDTYPE_df = spark.read.format("jdbc").option('driver', 'com.mysql.jdbc.Driver').option("url", campaign_db2_c.jdbc_url).option("user", campaign_db2_c.user).option("password", campaign_db2_c.password).option("dbtable", "card_type").load()

db2_CARDTYPE_df = db2_CARDTYPE_df.withColumnRenamed("name", "name_cardType")
CUSTOMER_db2_df = CUSTOMER_db2_df.join(db2_CARDTYPE_df, CUSTOMER_db2_df.card_type ==  db2_CARDTYPE_df.name_cardType, "left")

In [15]:
BUCKET = None

In [16]:
print(f"Inserting to databases.... ERROR FOLDER: {OUTPUTFOLDER}")
# Start Insertion for Card Database: card and customer table
card_db1_c.conn = None
CUSTOMER_db1_df.foreachPartition( partial(db1_customer_f_partition, db1=card_db1_c, bucket = BUCKET, outputfolder = OUTPUTFOLDER) )
card_db1_c.close_conn()
print("Insert CARD DB, CUSTOMER TABLE!")

card_db1_c.conn = None
CARD_db1_df.foreachPartition( partial(db1_card_f_partition, db1=card_db1_c , bucket = BUCKET, outputfolder = OUTPUTFOLDER) )
card_db1_c.close_conn()
print("Insert CARD DB, CARD TABLE!")

# Start Insertion for Campaign Database: card and customer table
campaign_db2_c.conn = None
CARDTYPE_db2_df.foreachPartition( partial(db2_cardtype_f_partition, db1=campaign_db2_c, bucket = BUCKET, outputfolder = OUTPUTFOLDER) )
campaign_db2_c.close_conn()
print("Insert CAMPAIGN DB, cardtype TABLE!")

campaign_db2_c.conn = None
CUSTOMER_db2_df.foreachPartition( partial(db2_customer_f_partition, db1=campaign_db2_c, bucket = BUCKET, outputfolder = OUTPUTFOLDER) )
campaign_db2_c.close_conn()
print("Insert CAMPAIGN DB, customer TABLE!")


print("DONE")

Inserting to databases.... ERROR FOLDER: .
Insert CARD DB, CUSTOMER TABLE!
Insert CARD DB, CARD TABLE!
Insert CAMPAIGN DB, cardtype TABLE!
Insert CAMPAIGN DB, customer TABLE!
DONE
