In [1]:
from pyspark.sql import SparkSession
import psycopg2
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/jars/postgresql-42.7.3.jar pyspark-shell'

In [2]:
spark = SparkSession.builder \
    .appName("PostgreSQL JDBC Example") \
    .getOrCreate()

In [3]:
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("./data/Online_Retail.csv")

In [4]:
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS S

In [5]:
df = df.withColumn("Amount", df['Quantity'] * df['UnitPrice'])
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|            Amount|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|15.299999999999999|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|             20.34|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|              22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|             20.34|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|    

In [6]:
conn = psycopg2.connect(
    host="postgres",
    database="my_database",
    user="my_username",
    password="my_password"
)

In [7]:
cur = conn.cursor()

In [8]:
cur.execute("""
   CREATE TABLE IF NOT EXISTS sales (
       InvoiceNo VARCHAR(255),
       StockCode VARCHAR(255),
       Description VARCHAR(255),
       Quantity INTEGER,
       InvoiceDate DATE,
       UnitPrice FLOAT,
       CustomerID INTEGER,
       Country VARCHAR(255),
       Amount FLOAT
   ) 
""")

In [9]:
conn.commit()
cur.close()
conn.close()

In [10]:
def insert_data(df, table_name):
    df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/my_database") \
        .option("dbtable", table_name) \
        .option("user", "my_username") \
        .option("password", "my_password") \
        .mode("append") \
        .save()

In [11]:
insert_data(df, "sales")

In [12]:
query = "SELECT * FROM sales"

In [13]:
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/my_database") \
    .option("dbtable", f"({query}) as tmp") \
    .option("user", "my_username") \
    .option("password", "my_password") \
    .load()

In [14]:
df.show(truncate=False)

+---------+---------+----------------------------------+--------+-----------+---------+----------+--------------+------------------+
|invoiceno|stockcode|description                       |quantity|invoicedate|unitprice|customerid|country       |amount            |
+---------+---------+----------------------------------+--------+-----------+---------+----------+--------------+------------------+
|562847   |21114    |LAVENDER SCENTED FABRIC HEART     |10      |2011-08-10 |1.25     |16245     |United Kingdom|12.5              |
|562847   |21533    |RETROSPOT LARGE MILK JUG          |3       |2011-08-10 |4.95     |16245     |United Kingdom|14.850000000000001|
|562847   |23015    |CORDIAL GLASS JUG                 |2       |2011-08-10 |8.25     |16245     |United Kingdom|16.5              |
|562847   |22759    |SET OF 3 NOTEBOOKS IN PARCEL      |12      |2011-08-10 |1.65     |16245     |United Kingdom|19.799999999999997|
|562847   |20983    |12 PENCILS TALL TUBE RED RETROSPOT|12      |2011

In [15]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date

# 例：新しいデータを保存
new_data = [("536380", "85123A", "WHITE HANGING HEART T-LIGHT HOLDER", 12, "2010-12-02", 2.55, 17850, "United Kingdom", 30.6)]
new_df = spark.createDataFrame(new_data, ["InvoiceNo", "StockCode", "Description", "Quantity", "InvoiceDate", "UnitPrice", "CustomerID", "Country", "Amount"])

# InvoiceDateカラムをdate型に変換
new_df = new_df.withColumn("InvoiceDate", to_date(new_df["InvoiceDate"], "yyyy-MM-dd"))

insert_data(new_df, "sales")

In [16]:
# 検索（SELECT）
def select_data(table_name, condition=None):
    if condition:
        query = f"(SELECT * FROM {table_name} WHERE {condition}) AS tmp"
    else:
        query = f"(SELECT * FROM {table_name}) AS tmp"
    
    return spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/my_database") \
        .option("dbtable", query) \
        .option("user", "my_username") \
        .option("password", "my_password") \
        .load()

In [19]:
search_condition = "InvoiceNo = '536380'"
result_df = select_data("sales", search_condition)
result_df.show(truncate=False)

+---------+---------+----------------------------------+--------+-----------+---------+----------+--------------+------+
|invoiceno|stockcode|description                       |quantity|invoicedate|unitprice|customerid|country       |amount|
+---------+---------+----------------------------------+--------+-----------+---------+----------+--------------+------+
|536380   |22961    |JAM MAKING SET PRINTED            |24      |2010-12-01 |1.45     |17809     |United Kingdom|34.8  |
|536380   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER|12      |2010-12-02 |2.55     |17850     |United Kingdom|30.6  |
|536380   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER|12      |2010-12-02 |2.55     |17850     |United Kingdom|30.6  |
|536380   |22961    |JAM MAKING SET PRINTED            |24      |2010-12-01 |1.45     |17809     |United Kingdom|34.8  |
|536380   |22961    |JAM MAKING SET PRINTED            |24      |2010-12-01 |1.45     |17809     |United Kingdom|34.8  |
+---------+---------+-----------