In [6]:
import os

from pyspark.sql import SparkSession

driver_path = "/home/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Postgres") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [7]:
spark

Creamos un DataFrame de ejemplo y lo escribimos en nuestra BD

In [20]:
# Ejemplo extraido de: https://sparkbyexamples.com/pyspark/different-ways-to-create-dataframe-in-pyspark/

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



## Conexión a la base de datos local (en postgresql) + creación de base de datos

In [19]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.7


In [9]:
# EXTRACTO DE: https://kb.objectrocket.com/postgresql/create-a-postgresql-database-using-the-psycopg2-python-library-755

# import the psycopg2 database adapter for PostgreSQL
from psycopg2 import connect, extensions, sql

In [13]:
# declare a new PostgreSQL connection object
conn = connect(
    user = "postgres",
    host = "postgres_db",
    password = "postgres",
    port = "5435"
)

# object type: psycopg2.extensions.connection
print ("\ntype(conn):", type(conn))


type(conn): <class 'psycopg2.extensions.connection'>


In [16]:
# string for the new database name to be created
DB_NAME = "ejemplo_db"

In [15]:
# get the isolation leve for autocommit
autocommit = extensions.ISOLATION_LEVEL_AUTOCOMMIT
print ("ISOLATION_LEVEL_AUTOCOMMIT:", extensions.ISOLATION_LEVEL_AUTOCOMMIT)

# set the isolation level for the connection's cursors
# will raise ActiveSqlTransaction exception otherwise
conn.set_isolation_level( autocommit )

ISOLATION_LEVEL_AUTOCOMMIT: 0


Creamos la base de datos

In [17]:
# instantiate a cursor object from the connection
cursor = conn.cursor()

# use the execute() method to make a SQL request
cursor.execute('CREATE DATABASE ' + str(DB_NAME))

## Escritura en la base de datos (postgresql), usando pyspark

In [18]:
# Postgres connection settings
pg_url = "jdbc:postgresql://postgres_db:5435/"
pg_user = "postgres" # not recommended to have this value in the code
pg_password = "postgres" # not recommended to have this value in the code
pg_driver = "org.postgresql.Driver"

In [21]:
df.write \
    .mode('overwrite') \
    .format("jdbc") \
    .option("url", pg_url+"ejemplo_db") \
    .option("dbtable", "salarios") \
    .option("user", pg_user) \
    .option("password", pg_password) \
    .option("driver", pg_driver) \
    .save()

## Lectura de tabla desde la BD a pyspark dataframe

In [22]:
# Create the connection and read the table
df_lectura = spark.read \
    .format("jdbc") \
    .option("url", pg_url+"ejemplo_db") \
    .option("dbtable", "salarios") \
    .option("user", pg_user) \
    .option("password", pg_password) \
    .option("driver", pg_driver) \
    .load()

In [24]:
df_lectura.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [25]:
df_lectura.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

