# Prepare driver

In [1]:
! wget -P /home/jovyan https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.5/postgresql-42.2.5.jar

--2019-10-21 20:26:50--  https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.5/postgresql-42.2.5.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.112.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.112.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 825943 (807K) [application/java-archive]
Saving to: ‘/home/jovyan/postgresql-42.2.5.jar.1’


2019-10-21 20:26:51 (1.46 MB/s) - ‘/home/jovyan/postgresql-42.2.5.jar.1’ saved [825943/825943]



# Start spark application

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /home/jovyan/postgresql-42.2.5.jar --jars /home/jovyan/postgresql-42.2.5.jar pyspark-shell'

import pyspark

spark = pyspark.sql.SparkSession.builder \
        .master("local[1]") \
        .appName("snapshot") \
        .getOrCreate()

print("Application started")

Application started


# Warm-up spark

In [3]:
spark.sparkContext.range(1000).sum()
print("Spark application is ready for work")

Spark application is ready for work


In [4]:
# join на стороне базы данных, результат в spark
join_result = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        query='select c.* from customers c left join orders o on (c.id = o.purchaser)'
    ).load()

join_result.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)



In [5]:
# взяли все as-is, посчитали в spark
customers = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='customers'
    ).load()

orders = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='orders'
    ).load()

join_result = customers.join(orders, customers["id"] == orders["purchaser"])
join_result.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- purchaser: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- product_id: integer (nullable = true)



# Read some PostgreSQL Data

In [6]:
customers = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='customers'
    ).load()

products = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='products'
    ).load()

orders = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='orders'
    ).load()

customers.registerTempTable("customers")
products.registerTempTable("products")
orders.registerTempTable("orders")

print("Customers table")
customers.show(5)
print("Orders table")
orders.show(5)
print("Products table")
products.show(5)


Customers table
+----+----------+----------+--------------------+
|  id|first_name| last_name|               email|
+----+----------+----------+--------------------+
|1176|   Matthew|     Jones|michaelwilson@fie...|
|1177|       Tom|     Craig|parkerrandall@dye...|
|1890|   Michael|Livingston|oneilljames@gmail...|
|1891|   Jackson|    Meyers|aanderson@hotmail...|
|1180|    Laurie|   Jackson|leepatrick@jones-...|
+----+----------+----------+--------------------+
only showing top 5 rows

Orders table
+-----+----------+---------+--------+----------+
|   id|order_date|purchaser|quantity|product_id|
+-----+----------+---------+--------+----------+
|10001|2016-01-16|     1001|       1|       102|
|10002|2016-01-17|     1002|       2|       105|
|10003|2016-02-19|     1002|       2|       106|
|10004|2016-02-21|     1003|       1|       107|
+-----+----------+---------+--------+----------+

Products table
+---+------------------+--------------------+------+
| id|              name|         de

# Read and join the data

Напишите select-запрос, который выводит следующие данные (доступные таблицы - customers, products, orders):

id - id пользователя из таблицы customers
first_name - first_name пользователя из таблицы customers
last_name - last_name пользователя из таблицы customers
total_weight - суммарный размер всех продуктов данного пользователя (поле weight в таблице products)
load_dttm - дата/время выполнения команды загрузки

Дополнительные ограничения запроса - необходимо выграть только тех пользователей, у которых id<=1005.

In [39]:
# please write your query here
query = "SELECT c.id, c.first_name, c.last_name, \
        SUM(p.weight) AS total_weight, \
        CURRENT_TIMESTAMP() as load_dttm \
        FROM customers as c \
        LEFT JOIN orders AS o ON c.id = o.purchaser \
        LEFT JOIN products AS p ON o.product_id = p.id \
        WHERE c.id <= 1005 \
        GROUP BY c.id, c.first_name, c.last_name"

result = spark.sql(query)
result.write.format("parquet").save("/home/jovyan/weight_report")

In [40]:
spark.read \
    .format("parquet").load("/home/jovyan/weight_report").show()

+----+----------+---------+------------+--------------------+
|  id|first_name|last_name|total_weight|           load_dttm|
+----+----------+---------+------------+--------------------+
|1002|    George|   Bailey|       1.875|2019-10-21 21:16:...|
|1003|    Edward|   Walker|         5.3|2019-10-21 21:16:...|
|1001|     Sally|   Thomas|         8.1|2019-10-21 21:16:...|
|1004|      Anne|Kretchmar|        null|2019-10-21 21:16:...|
+----+----------+---------+------------+--------------------+

