In [1]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Using cached psycopg2_binary-2.9.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.9


In [15]:
# Create the Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql.functions import from_json, col, regexp_replace, expr
import psycopg2
import logging
spark = (
    SparkSession 
    .builder 
    .appName("Streaming from Kafka into Postgresql") 
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.postgresql:postgresql:42.7.3")
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [16]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka2:29093")
    .option("subscribe", "shopify.items")
    .option("startingOffsets", "earliest")
    .load()
)

In [17]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [18]:
# Cast key and value columns to STRING and remove backslashes
kafka_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
kafka_df = kafka_df.withColumn("key", regexp_replace(col("key"), "\\\\", ""))
kafka_df = kafka_df.withColumn("value", regexp_replace(col("value"), "\\\\", ""))

In [19]:
kafka_df.printSchema()
#kafka_df.show(truncate=False)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [20]:
# Extract the payload JSON string from the value column
# Remove the `schema` part to focus on the `payload`
value_payload_df = kafka_df.withColumn("payload", expr("substring(value, instr(value, 'payload') + 10, length(value) - instr(value, 'payload') - 11)"))

In [21]:
# Define the schema for the payload
payload_schema = StructType([
    StructField("_id", StructType([StructField("$oid", StringType(), True)]), True),
    StructField("name", StringType(), True),
    StructField("price", StringType(), True),
    StructField("category", StringType(), True),
    StructField("instock", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("description", StringType(), True),
    StructField("filename", StringType(), True)
])

In [22]:
# Parse the payload JSON string using the defined schema
parsed_df = value_payload_df.withColumn("data", from_json(col("payload"), payload_schema))

In [23]:
parsed_df.printSchema()
#parsed_df.show(truncate=False)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- payload: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- _id: struct (nullable = true)
 |    |    |-- $oid: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- price: string (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- instock: string (nullable = true)
 |    |-- tags: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- filename: string (nullable = true)



In [24]:
# Select individual fields from the parsed JSON data
final_df = parsed_df.select(
    col("data._id.$oid").alias("_id"),
    col("data.name").alias("name"),
    col("data.price").alias("price"),
    col("data.category").alias("category"),
    col("data.instock").alias("instock"),
    col("data.tags").alias("tags"),
    col("data.description").alias("description"),
    col("data.filename").alias("filename")
)

In [25]:
final_df.printSchema()
#final_df.show(truncate=False)

root
 |-- _id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- category: string (nullable = true)
 |-- instock: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- description: string (nullable = true)
 |-- filename: string (nullable = true)



In [26]:
# Write the stream to the console
query = final_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [None]:
query.awaitTermination()

In [201]:
def create_table(connection):
    with connection.cursor() as cursor:
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS items (
            id serial PRIMARY KEY,
            _id varchar(100) NOT NULL,
            name varchar(100) NOT NULL,
            price varchar(20) NOT NULL,
            category varchar(100) NOT NULL,
            instock varchar(10) NOT NULL,
            tags varchar(100) NOT NULL,
            description text,
            filename varchar(200) NOT NULL
        );
        """)
        connection.commit()
        print("Table created successfully!")

In [202]:
def insert_data(connection, **kwargs):
    print("Inserting data...")

    _id = kwargs.get('_id')
    name = kwargs.get('name')
    price = kwargs.get('price')
    category = kwargs.get('category')
    instock = kwargs.get('instock')
    tags = kwargs.get('tags')
    description = kwargs.get('description')
    filename = kwargs.get('filename')

    try:
        with connection.cursor() as cursor:
            cursor.execute(
                'insert into items (_id,name,price,category,instock,tags,description,filename)'
                    'VALUES (%s,%s,%s,%s,%s,%s,%s,%s)',
                    (_id,name,price,category,instock,tags,description,filename))
            connection.commit()
            logging.info(f"Data inserted for {_id} {name}")
    except Exception as e:
        logging.error(f'Could not insert data due to {e}')
        connection.rollback()

In [203]:
def create_postgres_connection():
    try:
        # Connecting to the PostgreSQL database
        connection = psycopg2.connect(
            dbname="postgres_dw",
            user="gauss",
            password="root",
            host="postgres_db",
            port="5432"
        )
        return connection
    except Exception as e:
        logging.error(f"Could not create PostgreSQL connection due to {e}")
        return None

In [None]:
# Create PostgreSQL connection
connection = create_postgres_connection()
if connection is not None:
    create_table(connection)

    logging.info("Streaming is being started...")

    def write_to_postgres(batch_df, batch_id):
        for row in batch_df.collect():
            insert_data(connection,
                        _id=row['_id'],
                        name=row['name'],
                        price=row['price'],
                        category=row['category'],
                        instock=row['instock'],
                        tags=row['tags'],
                        description=row['description'],
                        filename=row['filename'])

    streaming_query = (final_df.writeStream
                       .foreachBatch(write_to_postgres)
                       .outputMode("append")
                       .start().awaitTermination())

    connection.close()

Table created successfully!
Inserting data...
Inserting data...
Inserting data...
Inserting data...


ERROR:root:Could not insert data due to null value in column "price" of relation "items" violates not-null constraint
DETAIL:  Failing row contains (17, 666d9507d06a80d7d8962131, robe, null, femme, t, version3, summer, product.jpg).

ERROR:root:Could not insert data due to can't adapt type 'Column'
ERROR:root:Could not insert data due to null value in column "price" of relation "items" violates not-null constraint
DETAIL:  Failing row contains (20, 666d9507d06a80d7d8962131, robe, null, femme, t, version3, summer, product.jpg).

ERROR:root:Could not insert data due to can't adapt type 'Column'


Inserting data...
Inserting data...
Inserting data...
Inserting data...
Inserting data...
Inserting data...
