In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
spark = SparkSession \
    .builder \
    .appName("ReadData") \
    .config('spark.driver.extraClassPath',"/home/jovyan/work/jars/postgresql-42.2.24.jar") \
    .config("spark.jars", "/home/jovyan/work/jars/postgresql-42.2.24.jar") \
    .getOrCreate()

21/10/28 03:45:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark.version

'3.2.0'

In [4]:
def read_psql(db_name,db_username,db_password,db_table):
    df = spark.read \
                .format("jdbc") \
                .option("url", "jdbc:postgresql://postgres-data:5432/"+db_name) \
                .option("dbtable", db_table) \
                .option("user", db_username) \
                .option("password", db_password) \
                .option("driver", "org.postgresql.Driver") \
                .load()
    return df

In [5]:
def write_psql(db_name,db_username,db_password,write_mode,db_table,df):
    df.write.format('jdbc') \
        .option('url','jdbc:postgresql://postgres-data:5432/'+db_name) \
        .option('dbtable', db_table) \
        .option('user', db_username) \
        .option('password', db_password) \
        .option('driver','org.postgresql.Driver') \
        .mode(write_mode).save()
    return None

In [6]:
def read_warehouse(db_table):
    db_name = 'dvdrental'
    db_username = 'postgres'
    db_password = 'postgres'
    return read_psql(db_name,db_username,db_password,db_table)

In [7]:
def write_warehouse(write_mode,db_table,df):
    db_name = 'dvdrental'
    db_username = 'postgres'
    db_password = 'postgres'
    return write_psql(db_name,db_username,db_password,write_mode,db_table,df)

In [8]:
# Read payment from database
db_table='payment'
payment = read_warehouse(db_table)
payment.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: short (nullable = true)
 |-- staff_id: short (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: decimal(5,2) (nullable = true)
 |-- payment_date: timestamp (nullable = true)



In [25]:
# Read payment from database
db_table='customer'
customer = read_warehouse(db_table)
customer.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- store_id: short (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: short (nullable = true)
 |-- activebool: boolean (nullable = true)
 |-- create_date: date (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- active: integer (nullable = true)



In [31]:
df = payment.join(customer,'customer_id','left_outer')
df.printSchema()
df.dropDuplicates()
df = df.withColumn("week_year", weekofyear(col("payment_date")))
df = df.withColumn("quarter", quarter(col("payment_date")))
df = df.select(
    col('customer_id'),
    col('amount'),
    col('week_year'),
    col('quarter'),
    col('payment_date'))
spark.conf.set("spark.sql.shuffle.partitions",100)
df = df.groupBy('customer_id','payment_date').sum('amount') \
    .withColumnRenamed('sum(amount)','total')
df.show(5)
df.rdd.getNumPartitions()

root
 |-- customer_id: short (nullable = true)
 |-- payment_id: integer (nullable = true)
 |-- staff_id: short (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: decimal(5,2) (nullable = true)
 |-- payment_date: timestamp (nullable = true)
 |-- store_id: short (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: short (nullable = true)
 |-- activebool: boolean (nullable = true)
 |-- create_date: date (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- active: integer (nullable = true)

+-----------+--------------------+-----+
|customer_id|        payment_date|total|
+-----------+--------------------+-----+
|        148|2007-02-19 21:22:...| 5.99|
|        220|2007-03-20 19:20:...| 4.99|
|        114|2007-04-06 19:06:...| 2.99|
|        516|2007-03-17 14:53:...| 0.99|
|          8|2007-04-30 20:36:...| 0.99|
+-----------+--------------------+-----+

[Stage 96:>                                                         (0 + 1) / 1]                                                                                

1

In [149]:
#Write user deactive to new customer_deactiove table
db_table = 'customer3'
write_mode = 'overwrite'

wr = write_warehouse(write_mode,db_table,df)
print(wr)

None


[Stage 374:>                                                        (0 + 1) / 1]                                                                                