# **Installing livyc** 🔧

In [None]:
pip install livyc==0.0.13

Collecting livyc==0.0.13Note: you may need to restart the kernel to use updated packages.
  Downloading livyc-0.0.13-py3-none-any.whl (4.9 kB)
Installing collected packages: livyc
  Attempting uninstall: livyc
    Found existing installation: livyc 0.0.12
    Uninstalling livyc-0.0.12:
      Successfully uninstalled livyc-0.0.12


You should consider upgrading via the 'c:\python\python37\python.exe -m pip install --upgrade pip' command.



Successfully installed livyc-0.0.13


In [84]:
pip install psycopg2

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\python\python37\python.exe -m pip install --upgrade pip' command.


# **Importing livyc library** ⚡

In [86]:
from livyc import livyc

# **Setting livy configuration** ✍

In [88]:
data_livy = {
    "livy_server_url": "localhost",
    "port": "8998",
    "jars": ["org.postgresql:postgresql:42.3.1"]
}

# **Populate PostgreSQL DB with data** 🗄

In [136]:
# DROP TABLES
staging_table_drop = "DROP TABLE IF EXISTS staging"

# CREATE TABLES

staging_table_create = ("""
CREATE TABLE IF NOT EXISTS staging(
        id serial PRIMARY KEY NOT NULL,
        first_name varchar,
        last_name varchar,
        company_name varchar,
        address varchar,
        city varchar,
        state varchar,
        zip varchar,
        phone1 varchar,
        phone2 varchar,
        email varchar,
        department varchar
);
""")

create_table_queries = [staging_table_create]
drop_table_queries = [staging_table_drop]


import psycopg2
import pandas as pd
import os


def create_connection(params):
    """
     create a new connection with the postgreSQL 
     database and return the cur and conn object
    :param params: connection string   
    """
    conn = None

    try:
        print('Connecting to the PostgreSQL database')
        conn = psycopg2.connect(**params)
        conn.set_session(autocommit=True)

        cur = conn.cursor()

        print('PostgreSQL database version:')
        cur.execute('SELECT version()')

        db_version = cur.fetchone()
        print(db_version)              
        return cur, conn
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)


def close_connection(cur, conn):
    """
     close the connection with the postgreSQL database     
    :param cur: cursor
    :param conn: connection object
    """
    try:
        cur.close()
        if conn is not None:
            conn.close()
            print('Database connection closed')                        
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)


def drop_tables(cur, conn):
    """
     drop all the tables in the example     
    :param cur: cursor
    :param conn: connection object
    """
    print("Dropping tables")
    for query in drop_table_queries:        
        cur.execute(query)
        conn.commit()
    print("Tables dropped")


def create_tables(cur, conn):
    """
     create all the tables in the example     
    :param cur: cursor
    :param conn: connection object
    """
    print("Creating created")
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()
    print("Tables created")


def check_data(cur, conn, tables):
    """
     Check count of records in tables
    :param cur: cursor
    :param conn: connection object
    :param tables: tables to check
    """

    count_values = {}

    for table in tables:
        query_count = "SELECT COUNT(*) FROM {0}".format(table)

        try:
            cur = conn.cursor()
            cur.execute(query_count)
            count_values[table] = cur.fetchone()[0]          
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            raise

    return count_values

def set_staging(cur, conn, staging_file, columns):

    print("Copying data from .csv to staging zone")

    try:
        copy_cmd = f"copy staging({','.join(columns)}) from stdout (format csv)"
        with open(staging_file, 'r') as f:
            next(f)
            cur.copy_expert(copy_cmd, f)        
        conn.commit()
        print("Staging ready")
    except (psycopg2.Error) as e:
        print(e)

        
        
class Pipeline:

    def __init__(self, params, staging_file):
        self.params = params
        self.staging_file = staging_file

    def run(self):
        tables = ['staging']
        columns_staging = ['first_name','last_name','company_name','address','city','state','zip','phone1','phone2','email','department']
        cur, conn = create_connection(self.params)
        drop_tables(cur, conn)
        create_tables(cur, conn)
        set_staging(cur, conn, self.staging_file, columns_staging)
        count_tables = check_data(cur, conn, tables)
        for k, v in count_tables.items():
            print("Table {0} has {1} records".format(k, v))
        close_connection(cur, conn)


params = {"host": "localhost", "port":"5432", "database": "db", "user": "postgres", "password": "pg12345"}


staging_file = "./Documents/sample.csv"
pipeline = Pipeline(params, staging_file)
pipeline.run()


Connecting to the PostgreSQL database
PostgreSQL database version:
('PostgreSQL 14.3 (Debian 14.3-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit',)
Dropping tables
Tables dropped
Creating created
Tables created
Copying data from .csv to staging zone
Staging ready
Table staging has 106 records
Database connection closed


# **Let's try launch a pySpark script to Apache Livy Server** 🤓

In [137]:

params["table"] = "staging"

pyspark_script = """

    from pyspark.sql.functions import udf, col, explode
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
    from pyspark.sql import Row
    from pyspark.sql import SparkSession


    df = spark.read.format("jdbc") \
        .option("url", "jdbc:postgresql://pg_container:{port}/{database}") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "{table}") \
        .option("user", "{user}") \
        .option("password", "{password}") \
        .load()
        
    n_rows = df.count()

    spark.stop()
"""

pyspark_script = pyspark_script.format(**params)
pyspark_script


'\n\n    from pyspark.sql.functions import udf, col, explode\n    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType\n    from pyspark.sql import Row\n    from pyspark.sql import SparkSession\n\n\n    df = spark.read.format("jdbc")         .option("url", "jdbc:postgresql://pg_container:5432/db")         .option("driver", "org.postgresql.Driver")         .option("dbtable", "staging")         .option("user", "postgres")         .option("password", "pg12345")         .load()\n        \n    n_rows = df.count()\n\n    spark.stop()\n'

In [157]:
lvy = livyc.LivyC(data_livy)

In [161]:
session = lvy.create_session()

In [162]:
lvy.run_script(session, pyspark_script)

''

In [163]:
lvy.read_variable(session, "n_rows")

106