In [30]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

from datetime import datetime


In [31]:
# https://stackoverflow.com/questions/51772350/how-to-specify-driver-class-path-when-using-pyspark-within-a-jupyter-notebook

#Create SparkSession
scSpark = SparkSession\
        .builder\
        .appName("dvdrental ingestion")\
        .config('spark.driver.extraClassPath',
            "postgresql-42.2.14.jar")\
        .getOrCreate()


In [56]:
df = scSpark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://source-db-container:5432/dvdrental") \
    .option("dbtable", "customer") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- store_id: short (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: short (nullable = true)
 |-- activebool: boolean (nullable = true)
 |-- create_date: date (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- active: integer (nullable = true)



In [60]:
jardrv = "~/drivers/postgresql-42.2.14.jar"
spark = SparkSession.builder.config('spark.driver.extraClassPath', jardrv).getOrCreate()
url = 'jdbc:postgresql://source-db-container:5432/dvdrental'
properties = {'user': 'postgres', 'password': 'postgres'}
df = spark.read.jdbc(url=url, table='actor', properties=properties)
df.show()

+--------+----------+------------+--------------------+
|actor_id|first_name|   last_name|         last_update|
+--------+----------+------------+--------------------+
|       1|  Penelope|     Guiness|2013-05-26 14:47:...|
|       2|      Nick|    Wahlberg|2013-05-26 14:47:...|
|       3|        Ed|       Chase|2013-05-26 14:47:...|
|       4|  Jennifer|       Davis|2013-05-26 14:47:...|
|       5|    Johnny|Lollobrigida|2013-05-26 14:47:...|
|       6|     Bette|   Nicholson|2013-05-26 14:47:...|
|       7|     Grace|      Mostel|2013-05-26 14:47:...|
|       8|   Matthew|   Johansson|2013-05-26 14:47:...|
|       9|       Joe|       Swank|2013-05-26 14:47:...|
|      10| Christian|       Gable|2013-05-26 14:47:...|
|      11|      Zero|        Cage|2013-05-26 14:47:...|
|      12|      Karl|       Berry|2013-05-26 14:47:...|
|      13|       Uma|        Wood|2013-05-26 14:47:...|
|      14|    Vivien|      Bergen|2013-05-26 14:47:...|
|      15|      Cuba|     Olivier|2013-05-26 14:

In [58]:
print (type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [35]:
display(df.groupBy('actor_id'))

<pyspark.sql.group.GroupedData at 0x7f5994317a90>

In [54]:
# Crea una tabla nueva
url_target = 'jdbc:postgresql://dest-db-container:5432/dvdrental_dwh'
properties = {'user': 'postgres', 'password': 'postgres'}

df.filter(df.first_name == "Bob").write.mode('overwrite').jdbc(url=url_target, table="public.actor2",properties=properties)


In [64]:
# Probar SERIAL columns
jardrv = "~/drivers/postgresql-42.2.14.jar"
spark = SparkSession.builder.config('spark.driver.extraClassPath', jardrv).getOrCreate()
url = 'jdbc:postgresql://source-db-container:5432/dvdrental'
properties = {'user': 'postgres', 'password': 'postgres'}
df_customer = spark.read.jdbc(url=url, table='customer', properties=properties)
df_customer.show()

+-----------+--------+----------+---------+--------------------+----------+----------+-----------+--------------------+------+
|customer_id|store_id|first_name|last_name|               email|address_id|activebool|create_date|         last_update|active|
+-----------+--------+----------+---------+--------------------+----------+----------+-----------+--------------------+------+
|        524|       1|     Jared|      Ely|jared.ely@sakilac...|       530|      true| 2006-02-14|2013-05-26 14:49:...|     1|
|          1|       1|      Mary|    Smith|mary.smith@sakila...|         5|      true| 2006-02-14|2013-05-26 14:49:...|     1|
|          2|       1|  Patricia|  Johnson|patricia.johnson@...|         6|      true| 2006-02-14|2013-05-26 14:49:...|     1|
|          3|       1|     Linda| Williams|linda.williams@sa...|         7|      true| 2006-02-14|2013-05-26 14:49:...|     1|
|          4|       2|   Barbara|    Jones|barbara.jones@sak...|         8|      true| 2006-02-14|2013-05-26 14

In [67]:
# MEte registros a una tabla existente
url_target = 'jdbc:postgresql://dest-db-container:5432/dvdrental_dwh'
properties = {'user': 'postgres', 'password': 'postgres'}

#quitar una columna
df_customer= df_customer.drop("address_id")

df_customer.filter(df_customer.store_id == "1").write.mode('append').jdbc(url=url_target, table="datalake_raw.customer_dim",properties=properties)


In [70]:
# MEte registros a una tabla existente en STaging para ELT
url_target = 'jdbc:postgresql://dest-db-container:5432/dvdrental_staging'
properties = {'user': 'postgres', 'password': 'postgres'}

#quitar una columna
df_customer= df_customer.drop("address_id")

df_customer.filter(df_customer.store_id == "1").write.mode('append').jdbc(url=url_target, table="public.customer_dim_raw",properties=properties)


In [50]:
spark.version

'3.0.0'