In [6]:
import io
from pyspark.sql.types import IntegerType, StructField, StructType, TimestampType
# Transaction schema needed to be explicitly stated. Not necessary for customer data. 
schema = StructType([
    # Define the name field
      StructField('transaction_id', IntegerType(), True),
    # Add the age field
      StructField('customer_id', IntegerType(), True),
      StructField('transaction_amount', IntegerType(), True),
    # Add the city field
      StructField('transaction_date', TimestampType(), True)
    ])

transaction_data = spark.read.parquet("/content/drive/MyDrive/data_engineer_assessment/refined_zone/Refined_Parquet_Files/transaction_data_*.parquet", schema=schema)
customer_data = spark.read.parquet("/content/drive/MyDrive/data_engineer_assessment/refined_zone/Refined_Parquet_Files/customer_data_*.parquet")

In [7]:
from pyspark.sql.functions import *
customer_data = customer_data.withColumn("birthdate", substring(col("birthdate"),0,10)) # Rearrange customer birthdate by removing unnecessary 00:00:00 time.

In [12]:
# Tried to convert modify transaction date into correct format however only null values were created. Debugging required. 


#transaction_data = transaction_data.withColumn("transaction_date", col("transaction_date").cast('timestamp').alias('transaction_time'))
#transaction_data = transaction_data.withColumn("transaction_date", date_format(col("transaction_date"), "yyyy-MM-dd HH:mm:ss")).collect()

In [8]:
transaction_data.createOrReplaceTempView("transactions") # Create a temporary view of transcations data
transaction_data

DataFrame[customer_id: int, transaction_amount: double, transaction_date: string, transaction_id: int]

In [9]:
customer_data.createOrReplaceTempView("customers") # Create a temporary view of customers data
customer_data

DataFrame[birthdate: string, bank_account_type: string, bank_name: string, employment_status: string, education_level: string, customerid: int]

In [10]:
import sqlite3 # Create a local database for simplicity. 
  
connection = sqlite3.connect("/content/drive/MyDrive/data_engineer_assessment/refined_zone/CustomerTransactions.db") # connecting to the database

crsr = connection.cursor() # cursor creation for connection to database
  
# SQL command to create a dimension table in the database
customers_sql_command = """CREATE TABLE IF NOT EXISTS dimCustomers (
  customerid integer not null PRIMARY KEY,
  birthdate timestamp not null,
  bank_account_type varchar(25) not null,
  bank_name varchar(25) not null,
  employment_status varchar(25),
  education_level varchar(25)
);"""
  
crsr.execute(customers_sql_command) # execute the statement

# SQL command to create a fact table in the database
transaction_sql_command = """CREATE TABLE IF NOT EXISTS factTransactions (
  
  customer_id integer not null,
  transaction_amount integer not null,
  transaction_date timestamp not null,
  transaction_id integer PRIMARY KEY AUTOINCREMENT,
  FOREIGN KEY (customer_id) REFERENCES dimCustomers(customerid)
);"""

crsr.execute(transaction_sql_command) # execute the statement

transaction_records = spark.sql('SELECT * from transactions').collect() # Retrieve all data in transactions

SQL = """INSERT INTO factTransactions(customer_id, transaction_amount, transaction_date, transaction_id) VALUES (?,?,?,?)""" # Insert data into database
for row in transaction_records:
  row = tuple(row)
  crsr.execute(SQL, row)

customer_records = spark.sql('SELECT * from customers').collect() # Retrieve all data in customers

SQL_2 = """INSERT INTO dimCustomers(birthdate, bank_account_type, bank_name, employment_status, education_level, customerid) VALUES (?,?,?,?,?,?)""" # Insert data into database
for row in customer_records:
  row = tuple(row)
  crsr.execute(SQL_2, row)

connection.commit() # Commit changes to database
connection.close() # close the connection

[Row(birthdate='1983-01-19', bank_account_type='Other', bank_name='ABSA Bank', employment_status='Self-Employed', education_level='Post-Graduate', customerid=1042), Row(birthdate='1992-02-24', bank_account_type='Other', bank_name='Capitec Bank', employment_status='Permanent', education_level='Post-Graduate', customerid=1100), Row(birthdate='1978-04-16', bank_account_type='Other', bank_name='Access Bank', employment_status='Permanent', education_level='Post-Graduate', customerid=1070), Row(birthdate='1989-07-09', bank_account_type='Savings', bank_name='Capitec Bank', employment_status='Permanent', education_level='Graduate', customerid=1056), Row(birthdate='1985-02-25', bank_account_type='Other', bank_name='Stanbic Bank', employment_status='Unemployed', education_level='Graduate', customerid=1047), Row(birthdate='1990-07-03', bank_account_type='Other', bank_name='ABSA Bank', employment_status='Self-Employed', education_level='Graduate', customerid=1020), Row(birthdate='1979-03-25', bank

In [11]:

conn = sqlite3.connect('/content/drive/MyDrive/data_engineer_assessment/refined_zone/CustomerTransactions.db')  # Connect to database

with io.open('/content/drive/MyDrive/data_engineer_assessment/refined_zone/backupdatabase.sql', 'w') as p: #Create SQL Script for database backup.
  
    for line in conn.iterdump(): 
        p.write('%s\n' % line)
      
print('Backup performed successfully!') # Print if successful
print('Data Saved as backupdatabase.sql')
  
conn.close() # Close connection

 Backup performed successfully!
 Data Saved as backupdatabase.sql
