In [None]:
#Part 1: focus is on building the database, schemas, tables, configuration tables and columns needed in order to load the data

In [None]:
#Import libraries
import os
import sys
import glob
import gc
import sqlalchemy as alch
import urllib as lib
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

In [None]:
#Define OS system variables
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
#Create SparkSession
spark = SparkSession.builder.appName('ReplicaSource_Practice').getOrCreate()

In [None]:
#Define connections for initialDB - SQLAlchemy
targetServer = 'localhost'
mssqlDriverSQLAlchemy = 'SQL SERVER NATIVE CLIENT 11.0'

quotedInitalDB = lib.parse.quote_plus('DRIVER='+mssqlDriverSQLAlchemy+';SERVER='+targetServer+';Trusted_Connection=yes')
engineInitalDB = alch.create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quotedInitalDB), future=True, connect_args = {'autocommit':True})

In [None]:
with engineInitalDB.connect() as initialDbCheck:
    createDbIfNotExists = """
        IF NOT EXISTS(SELECT * FROM sys.databases WHERE name = 'ReplicaSource')
    	    CREATE DATABASE ReplicaSource
    """
    createDbCmd = alch.text(createDbIfNotExists)

    initialDbCheck.execute(createDbCmd)

In [None]:
#Define connections for initialDBSchemas - SQLAlchemy
targetDatabase = 'ReplicaSource'

quotedInitalSchema = lib.parse.quote_plus('DRIVER='+mssqlDriverSQLAlchemy+';SERVER='+targetServer+';DATABASE='+targetDatabase+';Trusted_Connection=yes')
engineInitialSchema = alch.create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quotedInitalSchema), future=True, connect_args = {'autocommit':True})

In [None]:
with engineInitialSchema.connect() as initialSchemaCheck:
    createSchemaIfNotExists = """
        IF NOT EXISTS(SELECT * FROM sys.schemas WHERE name = 'Import')
            EXEC('CREATE SCHEMA Import AUTHORIZATION [dbo]');

		IF NOT EXISTS(SELECT * FROM sys.schemas WHERE name = 'Helper')
			EXEC('CREATE SCHEMA Helper AUTHORIZATION [dbo]')
    """

    createSchemaCmd = alch.text(createSchemaIfNotExists)
    initialSchemaCheck.execute(createSchemaCmd)

In [None]:
#Get Data from Source1 - Spark
source1Host = ''
source1DB = ''
source1Port = ''

source1Properties = {
    "user": "",
    "password": "",
    "mysqlDriver": "com.mysql.jdbc.Driver"
}

source1URL = f'jdbc:mysql://{source1Host}:{source1Port}/{source1DB}?tinyInt1isBit=false&zeroDateTimeBehavior=convertToNull'

In [None]:
source1InfoSchema = """(

)AS t
)AS source1
"""

In [None]:
df_source1InfoSchema = spark.read.jdbc(url = source1URL, table = source1InfoSchema, properties = source1Properties)

In [None]:
df_source1InfoSchema.createOrReplaceTempView('Source1InfoSchemaView')

In [None]:
source1InfoSchemaView = spark.sql("""SELECT DISTINCT * FROM Source1InfoSchemaView""")

In [None]:
source1InfoSchemaView.head()

In [None]:
#In case you have multiple data sources
#mergeAllInfoSchemas = source1InfoSchemaView.union(source2InfoSchemaView).union(source3InfoSchemaView).union(source4InfoSchemaView)

In [None]:
mergeAllInfoSchemas.head()

In [None]:
#Delete columns that were not cleaned up in Information_Schema
cleanupMeta = f"""
    DELETE FROM Import.SourceMetaData
	WHERE CONCAT(Source,'.', TABLE_SCHEMA,'.', TABLE_NAME,'.', COLUMN_NAME,'.', ColumnDefinition) IN
	(
		SELECT DISTINCT CONCAT(Source,'.', TABLE_SCHEMA,'.', TABLE_NAME,'.', COLUMN_NAME,'.', ColumnDefinition)
		FROM
		(
			SELECT Source, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ColumnDefinition, DENSE_RANK() OVER(PARTITION BY Source, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME ORDER BY ColumnDefinition) AS RankColDefinition
			FROM Import.SourceMetaData AS  d
		)AS t
		WHERE RankColDefinition > 1
	)
 """

In [None]:
#Define connection with SQLAlchemy for Config file
mssqlDriverSQLAlchemy = 'SQL SERVER NATIVE CLIENT 11.0'
targetSchema = 'Import'
configTable = targetSchema + '.' + 'TablesToLoad'
metaData = targetSchema + '.' + 'SourceMetaData'

quotedConfig = lib.parse.quote_plus('DRIVER='+mssqlDriverSQLAlchemy+';SERVER='+targetServer+';DATABASE='+targetDatabase+';Trusted_Connection=yes')
engineConfig = alch.create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quotedConfig), future=True, connect_args = {'autocommit':True})

In [None]:
#Define connections for Import Table - Spark
mssqlDriverSpark = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
targetURL = f"jdbc:sqlserver://localhost:1433;databaseName={targetDatabase};integratedSecurity=true;encrypt=false"

In [None]:
#Write import table to Replica DB, this holds the table schemas and names - Spark
mergeAllInfoSchemas.write \
                .format('jdbc') \
                .mode('overwrite') \
                .option('truncate', 'true') \
                .option('url', targetURL) \
                .option('driver', mssqlDriverSpark) \
                .option('dbtable', metaData) \
                .save()

In [None]:
#Define the cleanup, cleanup for metaData that has duplicates in SourceDB's
with engineConfig.begin() as transactionsCleanup:
    cleanupMeta = f"""{cleanupMeta}"""
    cleanupMetaCmd = alch.text(cleanupMeta)
    transactionsCleanup.execute(cleanupMetaCmd)
transactionsCleanup.commit()

In [None]:
mergeAllInfoSchemas.head()

In [None]:
#Creates the Config table if it doesn't already exist
with engineConfig.begin() as config:
    createTableIfNotExists = f"""
    IF NOT EXISTS(
      SELECT 1 FROM sys.tables AS t
      INNER JOIN sys.schemas AS s
      ON(t.schema_id = s.schema_id)
      WHERE s.name = 'Import' AND t.name = 'TablesToLoad'
    )
    BEGIN
    	CREATE TABLE {configTable}
    	(
    		Source VARCHAR(50),
    		DbName VARCHAR(50),
    		TableName NVARCHAR(200),
    		ToLoad CHAR(1),
    		Helper nvarchar(max)
    	)
    END
    """
    createTableCmd = alch.text(createTableIfNotExists)
    config.execute(createTableCmd)
    config.commit()

In [None]:
#Merge tables that do not exist in the table
with engineConfig.begin() as transaction:
    mergeIntoStatement = f"""
    MERGE {configTable} AS Target
    USING (SELECT DISTINCT Source, TABLE_SCHEMA, TABLE_NAME, CONCAT(TABLE_SCHEMA,'.', TABLE_NAME) AS Helper 
    FROM {metaData}) AS Source
    ON(CONCAT(Target.Source, '.', Target.DbName,'.', Target.TableName) = CONCAT(Source.Source, Source.Helper))
    WHEN NOT MATCHED THEN 
    INSERT
    (Source, DbName, TableName, Helper)
    VALUES
    (Source.Source, Source.TABLE_SCHEMA, Source.TABLE_NAME, Source.Helper);
    """
    mergeCmdQuery = alch.text(mergeIntoStatement)
    transaction.execute(mergeCmdQuery)

    transaction.commit()

In [None]:
#Read from config file to see which tables should be loaded
schemasStructToCreate = f"""(
    SELECT DISTINCT DbName AS SchemaName
    FROM {configTable} AS t
    LEFT JOIN sys.schemas AS schem
    ON(t.DbName = schem.name)
    WHERE t.ToLoad = 1 AND schem.name IS NULL
    )"""

tablesStructToCreate = f"""(
    SELECT DISTINCT SchemaName, TableName
    FROM 
    (
    SELECT conf.Source, conf.DbName AS SchemaName, conf.TableName, meta.COLUMN_NAME AS ColumnName, ColumnDefinition AS ColumnType
    , REPLACE(REPLACE(conf.Helper, '[', ''), ']', '') AS Helper
    FROM {configTable} AS conf
    INNER JOIN {metaData} AS meta
    ON(conf.Source = meta.Source AND conf.DbName = meta.TABLE_SCHEMA AND conf.TableName = meta.TABLE_NAME)
    WHERE conf.ToLoad = 1 
    )AS t
    LEFT JOIN INFORMATION_SCHEMA.TABLES AS infoTab
    ON(Helper = CONCAT(infoTab.TABLE_SCHEMA, '.', infoTab.TABLE_NAME))
    --WHERE TABLE_CATALOG IS NULL
  )"""

columnsStructToCreate = f"""(
	SELECT DISTINCT SchemaName, TableName, ColumnName, ColumnType
    FROM 
    (
    SELECT conf.Source, conf.DbName AS SchemaName, conf.TableName, meta.COLUMN_NAME AS ColumnName, ColumnDefinition AS ColumnType
    , REPLACE(REPLACE(conf.Helper, '[', ''), ']', '') AS Helper, meta.COLUMN_NAME
    FROM Import.TablesToLoad AS conf
    INNER JOIN Import.SourceMetaData AS meta
    ON(conf.Source = meta.Source AND conf.DbName = meta.TABLE_SCHEMA AND conf.TableName = meta.TABLE_NAME)
    WHERE conf.ToLoad = 1 
    )AS t
    LEFT JOIN INFORMATION_SCHEMA.COLUMNS AS infoCol
    ON(Helper = CONCAT(infoCol.TABLE_SCHEMA, '.', infoCol.TABLE_NAME)  AND ColumnName = infoCol.COLUMN_NAME)
    --WHERE infoCol.TABLE_CATALOG IS NULL
)"""

dropAllTablesExceptImport = f"""(
	SELECT DISTINCT TABLE_SCHEMA AS SchemaName, Table_Name AS TableName
	FROM INFORMATION_SCHEMA.TABLES AS tab
	WHERE TABLE_SCHEMA != 'Import'
 )"""

In [None]:
#Define connections for RequiredDBSchemas
quotedRequiredStructs = lib.parse.quote_plus('DRIVER='+mssqlDriverSQLAlchemy+';SERVER='+targetServer+';DATABASE='+targetDatabase+';Trusted_Connection=yes')
engineRequiredStructs = alch.create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quotedRequiredStructs), future=True, connect_args = {'autocommit':True})

In [None]:
#Create Schemas if they are required and don't currently exist
with engineRequiredStructs.begin() as transactionsList:
    listOfSchemas = alch.text(f"""{schemasStructToCreate}""")
    listOfSchemasCmd = transactionsList.execute(listOfSchemas).fetchall()
    transactionsList.commit()

with engineRequiredStructs.begin() as transactionsCreate:
    for row in listOfSchemasCmd:
        createSchema = f"""CREATE SCHEMA {row[0]}"""
        createSchemaCmd = alch.text(createSchema)
        transactionsCreate.execute(createSchemaCmd)
    transactionsCreate.commit()

In [None]:
#Create Tables if they are required
with engineRequiredStructs.begin() as transactionsList:
    listOfTables = alch.text(f"""{tablesStructToCreate}""")
    listOfTablesCmd = transactionsList.execute(listOfTables).fetchall()
    transactionsList.commit()

#Drop all tables, save for Import Schema and then create them again
with engineRequiredStructs.begin() as transactionsDropList:      
    listOfTablesToDrop = alch.text(f"""{dropAllTablesExceptImport}""")
    listOfTablesToDropCmd = transactionsDropList.execute(listOfTablesToDrop).fetchall()
    transactionsDropList.commit()

with engineRequiredStructs.begin() as transactionsDropTables:
    for row in listOfTablesToDropCmd:
        dropTable = f"""DROP TABLE {row.SchemaName}.[{row.TableName}]"""
        dropTableCmd = alch.text(dropTable)
        transactionsDropTables.execute(dropTableCmd)
    transactionsDropTables.commit()

with engineRequiredStructs.begin() as transactionsCreate: 
    for row in listOfTablesCmd:
        createTable = f"""CREATE TABLE {row.SchemaName}.{row.TableName}"""
        createTableWithDummy = createTable + '(Dummy int)'
        
        createTableCmd = alch.text(createTableWithDummy)
        transactionsCreate.execute(createTableCmd)
    transactionsCreate.commit()

In [None]:
#Fix data types of specific tables like events

In [None]:
#Create Columns if they are required and don't currently exist
with engineRequiredStructs.begin() as transactionsList:
    listOfColumns = alch.text(f"""{columnsStructToCreate}""")
    listOfColumnsCmd = transactionsList.execute(listOfColumns).fetchall()

    transactionsList.commit()

#Add columns to the tables
with engineRequiredStructs.begin() as transactionsAddColumns:
    for row in listOfColumnsCmd:
        addColumns = f"""ALTER TABLE {row.SchemaName}.{row.TableName} ADD {row.ColumnName} {row.ColumnType}"""
        addColumnsCmd = alch.text(addColumns)
        transactionsAddColumns.execute(addColumnsCmd)
        
    transactionsAddColumns.commit()

#Remove dummy columns
with engineRequiredStructs.begin() as transactionsRemoveDummy:
    for row in listOfTablesCmd:
        deleteDummyCol = f"""ALTER TABLE {row.SchemaName}.{row.TableName} DROP COLUMN Dummy"""
        deleteDummyColCmd = alch.text(deleteDummyCol)
        transactionsRemoveDummy.execute(deleteDummyColCmd)

transactionsRemoveDummy.commit()

In [None]:
#Part 2: now that the infrastructure has been built, we can safely load the data from each SourceDB

In [None]:
#Load all relevant tables to load in general
tablesToLoadQuery = f"""(
SELECT DISTINCT Source, DbName, REPLACE(REPLACE(TableName, '[', ''), ']', '') AS TableName, REPLACE(REPLACE(Helper, '[', ''), ']', '') AS SourceHelper, Helper AS TargetHelper
FROM {configTable} AS conf
WHERE ToLoad = 1
)AS TablesToLoad
"""
df_tablesToLoad = spark.read \
                .format('jdbc') \
                .option('url', targetURL) \
                .option('driver', mssqlDriverSpark) \
                .option('dbtable', tablesToLoadQuery) \
                .load()

In [None]:
df_tablesToLoad.show()

In [None]:
#Seperate to different sources

In [None]:
#Gather data from source1 Databases
df_source1 = df_tablesToLoad.filter("Source == '' AND DbName NOT IN('', '', '')")

In [None]:
df_source1.createOrReplaceTempView('Source1View')

In [None]:
source1View = spark.sql("""SELECT DISTINCT SourceHelper, TargetHelper FROM Source1View""")

In [None]:
source1View.show()

In [None]:
#Gather the relevant Source and Target tables in a list of tuples for source1
source1TablesList = source1View.collect()

In [None]:
for row in source1TablesList:
    #print(row.SourceHelper, row.TargetHelper)
    readTableQuery = f"""(
    SELECT DISTINCT * FROM {row.SourceHelper} 
    )AS ReadTable
    """
    df_readTable = spark.read.jdbc(url = sourceSource1URL, table = readTableQuery, properties = source1Properties)
    df_readTable.write \
                .format('jdbc') \
                .mode('overwrite') \
                .option('truncate', 'true') \
                .option('url', targetURL) \
                .option('driver', mssqlDriverSpark) \
                .option('dbtable', row.TargetHelper) \
                .save()
    #df_readTable.show()