### Import libraries

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os
import sys
from pyspark.sql.functions import *

### Set Java Home & Vars

In [2]:
# set Java home
# os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-18.0.2.1"
os.environ["JAVA_HOME"] = "C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot"


### Set Spark configs details

In [3]:
conf = SparkConf() \
    .setAppName("ETLPipeline") \
    .setMaster("local") \
    .set("spark.driver.extraClassPath","C:/pyspark/*")

### Initiate Spark Session

In [4]:
sc = SparkContext.getOrCreate(conf=conf)
etl = SparkSession(sc)

### Set Database details

In [5]:
#get password from environmnet var
# pwd = os.environ['PGPASS']
pwd = 'demopass'
# uid = os.environ['PGUID']
uid = "LAPTOP-7L39KNKT\gavin"
#sql db details
server = "localhost"
src_db = "AdventureWorksDW2019"
target_db = "AdventureWorks"
src_driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
target_driver = "org.postgresql.Driver"

In [6]:
# source connection
src_url = f"jdbc:sqlserver://{server}:1433;databaseName={src_db};user={uid};password={pwd};"
# target connection
target_url = f"jdbc:postgresql://{server}:5432/{target_db}?user={uid}&password={pwd}"

### SQL Statement and test

In [7]:
sql = """select  t.name as table_name from sys.tables t 
where t.name in ('DimProduct','DimProductSubcategory','DimProductCategory','DimSalesTerritory','FactInternetSales') """

In [8]:
# Let's test our connection
dfs=etl.read. \
    format("jdbc"). \
    options(driver=src_driver, user=uid, password=pwd, url=src_url, query=sql). \
    load()
dfs.show()

Py4JJavaError: An error occurred while calling o34.load.
: java.lang.ClassNotFoundException: com.microsoft.sqlserver.jdbc.SQLServerDriver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [9]:
data_collect = dfs.collect()
# looping thorough each row of the dataframe
for row in data_collect:
    # while looping through each
    # row printing the data of table_name
    print(row["table_name"])

NameError: name 'dfs' is not defined

### Function to extract source system data

In [10]:
def extract():
    try:
        dfs=etl.read. \
            format("jdbc"). \
            options(driver=src_driver,user=uid, password=pwd,url=src_url,query=sql). \
            load()
        # get table names
        data_collect = dfs.collect()
        # looping thorough each row of the dataframe
        for row in data_collect:
        # while looping through each
        # row printing the data of table_name
            print(row["table_name"])
            tbl_name = row["table_name"]
            df = etl.read \
            .format("jdbc") \
            .option("driver", src_driver) \
            .option("user", uid) \
            .option("password", pwd) \
            .option("url", src_url) \
            .option("dbtable", f"dbo.{tbl_name}") \
            .load()
            #print(df.show(10))
            load(df, tbl_name)
            print("Data loaded successfully")
    except Exception as e:
        print("Data extract error: " + str(e))

### Function to persist data in target db

In [11]:
def load(df, tbl):
    try:
        rows_imported = 0
        print(f'importing rows {rows_imported} to {rows_imported + df.count()}... for table {tbl}')
        df.write.mode("overwrite") \
        .format("jdbc") \
        .option("url", target_url) \
        .option("user", uid) \
        .option("password", pwd) \
        .option("driver", target_driver) \
        .option("dbtable", "src_" + tbl) \
        .save()
        print("Data imported successful")
        rows_imported += df.count()
    except Exception as e:
        print("Data load error: " + str(e))

In [12]:
# Function Call
extract()

DimProduct
importing rows 0 to 606... for table DimProduct
Data imported successful
Data loaded successfully
DimProductCategory
importing rows 0 to 4... for table DimProductCategory
Data imported successful
Data loaded successfully
DimProductSubcategory
importing rows 0 to 37... for table DimProductSubcategory
Data imported successful
Data loaded successfully
DimSalesTerritory
importing rows 0 to 11... for table DimSalesTerritory
Data imported successful
Data loaded successfully
FactInternetSales
importing rows 0 to 60398... for table FactInternetSales
Data imported successful
Data loaded successfully
