# Prepare driver

In [1]:
! wget -P /home/jovyan https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.5/postgresql-42.2.5.jar

--2020-02-06 16:40:34--  https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.5/postgresql-42.2.5.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.248.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.248.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 825943 (807K) [application/java-archive]
Saving to: ‘/home/jovyan/postgresql-42.2.5.jar.1’


2020-02-06 16:40:34 (64.6 MB/s) - ‘/home/jovyan/postgresql-42.2.5.jar.1’ saved [825943/825943]



# Start spark application

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /home/jovyan/postgresql-42.2.5.jar --jars /home/jovyan/postgresql-42.2.5.jar pyspark-shell'

import pyspark

spark = pyspark.sql.SparkSession.builder \
        .master("local[1]") \
        .appName("snapshot") \
        .getOrCreate()

print("Application started")

Application started


# Warm-up spark

In [3]:
spark.sparkContext.range(1000).sum()
print("Spark application is ready for work")

Spark application is ready for work


# Read some PostgreSQL Data

In [4]:
customers = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='customers'
    ).load()

products = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='products'
    ).load()

orders = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='orders'
    ).load()

customers.registerTempTable("customers")
products.registerTempTable("products")
orders.registerTempTable("orders")

print("Customers table")
customers.orderBy('id').show(100)
print("Orders table")
orders.orderBy('id').show(100)
print("Products table")
products.orderBy('id').show(100)


Customers table
+----+----------+---------+--------------------+
|  id|first_name|last_name|               email|
+----+----------+---------+--------------------+
|1001|     Sally|   Thomas|danielburns@gmail...|
|1002|    George|   Bailey|jeremiah02@brown.com|
|1003|    Edward|   Walker|welchgabriel@mart...|
|1004|      Anne|Kretchmar|derrick91@phillip...|
|1006|      Mary|    Crane| jamesgill@gmail.com|
|1007|       Amy|   Warner|robertsonvalerie@...|
|1009|    Jeremy|    Brown|kylerichardson@ho...|
|1013|      Todd|    Logan| jeffery76@glenn.biz|
|1014|      Mary|  Roberts|kimberly21@yahoo.com|
|1018|     Betty|  Padilla|   ray80@hotmail.com|
|1019|    Thomas|     Hunt|mallory61@guzman.biz|
|1020|      Ryan|    Jones|  fmorales@lynch.biz|
|1021|   Anthony|    Clark|costaroberto@hotm...|
|1023|  Benjamin|   Thomas|   dcampos@gmail.com|
|1026|    Brandy|     Rice|michael37@garcia-...|
|1027|   Barbara|  Miranda|nmeza@maxwell-eat...|
|1028|     Jacob|    Boyer|   fcombs@turner.com|
|102

# Read and join the data

In [14]:
# please write your query here
query = """
select 
  c.id, c.first_name, c.last_name, 
  sum(p.weight) as total_weight,
  current_timestamp as load_dttm
from customers as c
join orders o
  on o.purchaser = c.id
join products p
  on p.id = o.product_id
where c.id <= 1005  
group by c.id, c.first_name, c.last_name
"""
result = spark.sql(query)
# result.write.format("parquet").mode('overwrite').save("/home/jovyan/weight_report")

In [15]:
result.show()

+----+----------+---------+------------+--------------------+
|  id|first_name|last_name|total_weight|           load_dttm|
+----+----------+---------+------------+--------------------+
|1003|    Edward|   Walker|         5.3|2020-02-06 16:49:...|
|1001|     Sally|   Thomas|         8.1|2020-02-06 16:49:...|
|1002|    George|   Bailey|       1.875|2020-02-06 16:49:...|
+----+----------+---------+------------+--------------------+

