In [4]:
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, TimestampType, ShortType, DateType, DecimalType, FloatType
from pyspark.sql.functions import col

def main():

    # establish a connection to the db
    conn = psycopg2.connect(
        host = "localhost",
        database = "usedcars",
        user = "postgres",
        password = "postgres")

    print("Connection to PostgreSQL created", "\n")

    # create a cursor out of a connection; a cursor allows you to communicate with Postgres and execute commands
    cur = conn.cursor()

    spark = initialize_Spark()

    df = loadDFWithSchema(spark)

    create_table(cur)

    insert_query, cars_seq = write_postgresql(df)

    cur.execute(insert_query, cars_seq)

    print("Data inserted into PostgreSQL", "\n")

    get_insterted_data(cur)

    cur.close()


    print("Commiting changes to database", "\n")
    # make sure that your changes are shown in the db
    conn.commit()

    print("Closing connection", "\n")

    # close the connection
    conn.close()

    print("Done!", "\n")


def initialize_Spark():

    spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Simple etl job") \
        .getOrCreate()

    print("Spark Initialized", "\n")

    return spark

def loadDFWithoutSchema(spark):

    df = spark.read.format("csv").option("header", "true").load('Iris.csv')

    return df

def loadDFWithSchema(spark):

    schema = StructType([
        StructField("sepal_length", DecimalType(), True),
        StructField("sepal_width", DecimalType(), True),
        StructField("petal_length", DecimalType(), True),
        StructField("petal_width", DecimalType(), True),
        StructField("class", StringType(), True)
    ])

    df = spark \
        .read \
        .format("csv") \
        .schema(schema)         \
        .option("header", "true") \
        .load("Iris.csv")

    print("Data loaded into PySpark", "\n")

    return df

def create_table(cursor):

    try:
        cursor.execute("CREATE TABLE IF NOT EXISTS iris \
       (sepal_length NUMERIC  NULL, \
        sepal_width NUMERIC  NULL, \
        petal_length NUMERIC NULL, \
        petal_width NUMERIC NULL, \
        class VARCHAR (40));")

        print("Created table in PostgreSQL", "\n")
    except:
        print("Something went wrong when creating the table", "\n")


def write_postgresql(df):

    cars_seq = [tuple(x) for x in df.collect()]

    records_list_template = ','.join(['%s'] * len(cars_seq))

    insert_query = "INSERT INTO iris (sepal_length, sepal_width, petal_length, petal_width, class\
                           ) VALUES {}".format(records_list_template)

    print("Inserting data into PostgreSQL...", "\n")

    return insert_query, cars_seq

def get_insterted_data(cursor):

    postgreSQL_select_Query = "select sepal_length, sepal_width, petal_length, petal_width, class from iris"

    cursor.execute(postgreSQL_select_Query)

    cars_records = cursor.fetchmany(2)

    print("Printing 2 rows")
    for row in cars_records:
        print("sepal_length = ", row[0])
        print("sepal_width = ", row[1])
        print("petal_length = ", row[2])
        print("petal_width = ", row[3])
        print("class = ", row[4], "\n")

if __name__ == '__main__':
    main()

Connection to PostgreSQL created 

Spark Initialized 

Data loaded into PySpark 

Created table in PostgreSQL 

Inserting data into PostgreSQL... 

Data inserted into PostgreSQL 

Printing 2 rows
sepal_length =  5
sepal_width =  4
petal_length =  1
petal_width =  0
class =  Iris-setosa 

sepal_length =  5
sepal_width =  3
petal_length =  1
petal_width =  0
class =  Iris-setosa 

Commiting changes to database 

Closing connection 

Done! 

