In [18]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.functions import * 
import json
import pyspark
import psycopg2

In [19]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.2.6.jar") \
    .getOrCreate()

In [20]:
conn = psycopg2.connect(database = "origem_pizzaria", 
                    user = "postgres", 
                    host= 'localhost',
                    password = "postgres",
                    port = 5432)
cur = conn.cursor()

In [21]:
cur.execute("""select 
            pz.pizza_id, 
            pt.name, 
            pt.category, 
            pz.size, 
            pz.price, 
            pt.ingredients
from pizzas pz
join pizza_types pt on pz.pizza_type_id = pt.pizza_type_id 
            """)
stg_pizzas = cur.fetchall()

In [22]:
cur.execute("""select pz.pizza_id, 
os.order_id, 
os.date, 
os.time, 
od.quantity, 
(price * quantity) as price_total
from orders_details od
join orders os on od.order_id = os.order_id
join pizzas pz on pz.pizza_id = od.pizza_id
join pizza_types pt on pt.pizza_type_id = pz.pizza_type_id
""")
stg_fato_orders = cur.fetchall()

In [23]:
conn.close()

In [24]:
conn = psycopg2.connect(database = "stage_pizzaria", 
                    user = "postgres", 
                    host= 'localhost',
                    password = "postgres",
                    port = 5432)
cur = conn.cursor()

In [25]:
cur.execute("""CREATE TABLE IF NOT EXISTS STG_PIZZAS (
    pizza_id INT,
    pizza_name varchar,
    category varchar,
    size varchar(1),
    price NUMERIC(10, 4),
    ingredients varchar
)
""")

In [26]:
cur.execute("""CREATE TABLE IF NOT EXISTS STG_FATO_ORDERS (
    pizza_id INT,
    order_id INT,
    date DATE,
    time TIME,
    quantity INT,
    price NUMERIC(10, 4)
)
""")

In [27]:
conn.close()

In [28]:
window_spec  = Window.orderBy("pd_id")
EOW_DATE = "9999-12-31"
DATE_FORMAT = "yyyy-MM-dd"

columns = ['pizza_id', 'name', 'category', 'size', 'price', 'ingredients']
stage_pizzas_df = spark.createDataFrame(stg_pizzas, columns).selectExpr('pizza_id', 'name', 'category', 'size', 'cast(price as decimal(10,2))', 'ingredients')
                # .withColumn("sk_pk_id",row_number().over(window_spec))\
                # .withColumn("effective_date",date_format(current_date(), DATE_FORMAT))\
                # .withColumn("expiration_date",date_format(lit(EOW_DATE), DATE_FORMAT))\
                # .withColumn("current_flag", lit(True))
                
# dim_pizzas_details_df.show()

In [29]:
# Convert 'time' column to string format
stg_fato_orders = [(row[0], row[1], row[2], row[3].strftime('%H:%M:%S'), row[4], row[5]) for row in stg_fato_orders]

# Define column names
columns = ['pizza_id', 'order_id', 'date', 'time', 'quantity', 'price']

# Create DataFrame
stg_fato_orders_df = spark.createDataFrame(stg_fato_orders, columns)

In [30]:
columns = ['pizza_id', 'order_id', 'date', 'time', 'quantity', 'price']
stg_fato_orders_df = spark.createDataFrame(stg_fato_orders, columns).selectExpr('pizza_id', 'order_id', 'cast(date as string)', 'cast(time as string)', 'quantity', 'cast(price as decimal(10,2))')

In [31]:
stg_fato_orders_df.write.format('jdbc').options(
  url='jdbc:postgresql://localhost:5432/stage_pizzaria',
  driver='org.postgresql.Driver',
  dbtable='STG_FATO_ORDERS',
  user='postgres',
  password='postgres').mode('overwrite').save()

                                                                                

In [32]:
stage_pizzas_df.write.format('jdbc').options(
  url='jdbc:postgresql://localhost:5432/stage_pizzaria',
  driver='org.postgresql.Driver',
  dbtable='STG_PIZZAS',
  user='postgres',
  password='postgres').mode('overwrite').save()

                                                                                

In [33]:
spark.stop()