In [8]:
%%sql
CREATE DATABASE IF NOT EXISTS dev_curated_ecommerce;

In [9]:
%%sql
CREATE TABLE IF NOT EXISTS dev_curated_ecommerce.customers (
    customer_id                 bigint NOT NULL COMMENT 'unique id',
    name                        string,
    gender                      string,
    email                       string,
    phone                       string,
    country                     string,
    registration_date           timestamp NOT NULL,
    acquisition_channel_id      int
)
USING iceberg
PARTITIONED BY (days(registration_date));

In [3]:
from datetime import datetime

schema = spark.table("climate.weather").schema

data = [
    (datetime(2023,8,16), 76.2, 40.951908, -74.075272, "Partially sunny", 0.0, 3.5),
    (datetime(2023,8,17), 82.5, 40.951908, -74.075272, "Sunny", 0.0, 1.2),
    (datetime(2023,8,18), 70.9, 40.951908, -74.075272, "Cloudy", .5, 5.2)
  ]

df = spark.createDataFrame(data, schema)
df.writeTo("climate.weather").append()

                                                                                

In [4]:
df.show()

+-------------------+----+---------+----------+---------------+------+----------+
|           datetime|temp|      lat|      long| cloud_coverage|precip|wind_speed|
+-------------------+----+---------+----------+---------------+------+----------+
|2023-08-16 00:00:00|76.2|40.951908|-74.075272|Partially sunny|   0.0|       3.5|
|2023-08-17 00:00:00|82.5|40.951908|-74.075272|          Sunny|   0.0|       1.2|
|2023-08-18 00:00:00|70.9|40.951908|-74.075272|         Cloudy|   0.5|       5.2|
+-------------------+----+---------+----------+---------------+------+----------+



In [8]:
%%sql
select
    month(datetime), avg(wind_speed) 
from climate.weather
where lower(cloud_coverage) like '%sun%'
group by month(datetime)

month(datetime),avg(wind_speed)
8,2.35


### Test Postgres
---

In [14]:
tables = ["customer_acquisition_channels", "customers", "inventory", "order_items", "orders", "product_categories", "products"]
jdbc_url = "jdbc:postgresql://postgres/ecommerce"
jdbc_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}
def get_df(jdbc_url, jdbc_prop, table):
    # Read data from PostgreSQL
    print(f"Reading data from PostgreSQL, table: {table}...")
    postgres_df = spark.read.jdbc(url=jdbc_url, table=table, properties=jdbc_prop)
    print(postgres_df)

for table in tables:
    get_df(jdbc_url, jdbc_properties, table)

Reading data from PostgreSQL, table: customer_acquisition_channels...
DataFrame[channel_id: int, category: string, channel_name: string, description: string]
Reading data from PostgreSQL, table: customers...
DataFrame[customer_id: int, name: string, gender: string, email: string, phone: string, country: string, registration_date: timestamp, acquisition_channel_id: int]
Reading data from PostgreSQL, table: inventory...
DataFrame[inventory_id: int, product_id: int, quantity: int, warehouse_location: string, updated_at: timestamp]
Reading data from PostgreSQL, table: order_items...
DataFrame[order_item_id: int, order_id: int, product_id: int, quantity: int, price: decimal(16,2)]
Reading data from PostgreSQL, table: orders...
DataFrame[order_id: int, customer_id: int, order_date: timestamp, order_status: string, total_amount: decimal(10,2), payment_method: string]
Reading data from PostgreSQL, table: product_categories...
DataFrame[category_id: int, category_name: string, parent_category_i

DataFrame[customer_id: int, name: string, gender: string, email: string, phone: string, country: string, registration_date: timestamp, acquisition_channel_id: int]

In [3]:
postgres_df.show(5)

+-----------+-------------+------+--------------------+-------------+-------+-------------------+----------------------+
|customer_id|         name|gender|               email|        phone|country|  registration_date|acquisition_channel_id|
+-----------+-------------+------+--------------------+-------------+-------+-------------------+----------------------+
|          1|     John Doe|     M|   John.Doe@mail.com| +11239879877|    USA|2020-01-01 07:16:37|                    11|
|          2|     Jane Doe|     F|   Jane.Doe@mail.com| +11239879871|    USA|2020-01-02 00:21:00|                     7|
|          3|    Elon Musk|     M|  Elon.Musk@mail.com| +11239879872|    USA|2020-01-01 09:46:43|                    10|
|          4|   Bill Gates|     M| Bill.Gates@mail.com| +11239879873|    USA|2020-01-02 16:26:29|                     1|
|          5|Eleanor Rigby|     M|Eleanor.Rigby@mai...|+441234567891|    GBR|2020-01-01 18:34:50|                     5|
+-----------+-------------+-----

In [4]:
# Write data to MinIO in Iceberg format
iceberg_table = "demo.ecommerce.customers"

print(f"Writing data to Iceberg table: {iceberg_table}")
postgres_df.writeTo(iceberg_table).createOrReplace()

print("Data migration completed successfully!")

Writing data to Iceberg table: demo.ecommerce.customers


                                                                                

Data migration completed successfully!


In [16]:
%%sql
select * from dev_raw_ecommerce.customers

customer_id,name,gender,email,phone,country,registration_date,acquisition_channel_id
11,Michael Su,F,ms@mail.com,123321123321,CN,2020-01-15 03:23:48,2
6,Muhammad Abdullah,M,Muhammad.Abdullah@mail.com,966123456789,SA,2020-01-03 23:00:14,12
8,Wang Yichen,M,Wang.Yichen@mail.com,86123456789,CN,2020-01-03 23:50:55,2
9,Sanjaya Putra,M,Sanjaya.Putra@mail.com,628123456789,ID,2020-01-04 18:04:12,1
1,John Doe,M,John.Doe@mail.com,11239879877,US,2020-01-01 07:16:37,11
3,Elon Musk,M,Elon.Musk@mail.com,11239879872,US,2020-01-01 09:46:43,10
5,Eleanor Rigby,M,Eleanor.Rigby@mail.com,441234567891,GB,2020-01-01 18:34:50,5
7,Eiichiro Oda,M,Eiichiro.Oda@mail.com,81123456789,JP,2020-01-01 23:04:31,4
2,Jane Doe,F,Jane.Doe@mail.com,11239879871,US,2020-01-02 00:21:00,7
4,Bill Gates,M,Bill.Gates@mail.com,11239879873,US,2020-01-02 16:26:29,1


In [15]:
%%sql
select * from dev_curated_ecommerce.customers

customer_id,name,gender,email,phone,country,registration_date,acquisition_channel_id


In [6]:
%%sql
insert into dev_raw_ecommerce.customers(customer_id, name, gender, email, phone, country, registration_date, acquisition_channel_id)
values (11, 'Michael Su', 'F', 'ms@mail.com', '123321123321', 'CN', cast(date_format('2020-01-15 03:23:48', 'yyyy-MM-dd HH:mm:ss') as timestamp), 2)

In [8]:
spark = SparkSession.builder \
    .appName("PostgreSQL to MinIO with Iceberg DataLakehouse") \
    .getOrCreate()

24/12/03 02:28:29 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [9]:
spark