# Prepare driver

In [None]:
! wget -P /home/jovyan https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.5/postgresql-42.2.5.jar

# Start spark application

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /home/jovyan/postgresql-42.2.5.jar --jars /home/jovyan/postgresql-42.2.5.jar pyspark-shell'

import pyspark

spark = pyspark.sql.SparkSession.builder \
        .master("local[1]") \
        .appName("snapshot") \
        .getOrCreate()

print("Application started")

# Warm-up spark

In [None]:
spark.sparkContext.range(1000).sum()
print("Spark application is ready for work")

# Read some PostgreSQL Data

In [None]:
customers = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='customers'
    ).load()

products = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='products'
    ).load()

orders = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='orders'
    ).load()

customers.registerTempTable("customers")
products.registerTempTable("products")
orders.registerTempTable("orders")

print("Customers table")
customers.show(5)
print("Orders table")
orders.show(5)
print("Products table")
products.show(5)


# Read and join the data

In [None]:
# please write your query here
# Customers who have made and have not made purchases.

query = """SELECT c.id, c.first_name, c.last_name, Sum(o.quantity*p.weight) AS total_weight, current_timestamp() AS load_dttm
FROM (customers AS c LEFT JOIN orders AS o ON c.id = o.purchaser) LEFT JOIN products AS p ON o.product_id = p.id
GROUP BY c.id, c.first_name, c.last_name
HAVING (c.id<=1005) ORDER BY c.id"""

result = spark.sql(query)

result.repartition(1).write.mode("OVERWRITE").format("parquet").save("/home/jovyan/weight_report")

In [None]:
spark.read.format("parquet").load("/home/jovyan/weight_report").show(20, False)

In [None]:
spark.stop()