In [4]:
import psycopg2

def create_database():
    # Connect to the default database to create the new database
    conn = psycopg2.connect(
        dbname='postgres',  # Connect to the default database
        user='admin',
        password='admin',
        host='localhost',
        port='5432'
    )
    conn.autocommit = True  # Enable autocommit to create database
    cur = conn.cursor()
    
    try:
        # Create the goodread database if it doesn't exist
        cur.execute("SELECT 1 FROM pg_catalog.pg_database WHERE datname = 'goodread'")
        exists = cur.fetchone()
        if not exists:
            cur.execute("CREATE DATABASE goodread;")
            print("Database 'goodread' has been created successfully.")
        else:
            print("Database 'goodread' already exists.")
    except Exception as e:
        print(f"Error occurred while creating database: {e}")
    finally:
        cur.close()
        conn.close()

def create_tables():
    conn = psycopg2.connect(
        dbname='goodread',
        user='admin',
        password='admin',
        host='localhost',
        port='5432'
    )
    
    try:
        cur = conn.cursor()
        
        # Create Authors table
        create_authors_table = """
        CREATE TABLE IF NOT EXISTS authors (
            author_id SERIAL PRIMARY KEY,
            author_name VARCHAR(255) UNIQUE
        );
        """
        
        # Create Ratings table
        create_ratings_table = """
        CREATE TABLE IF NOT EXISTS ratings (
            rating_id SERIAL PRIMARY KEY,
            rating FLOAT,
            fivestars INT,
            fourstars INT,
            threestars INT,
            twostars INT,
            onestar INT
        );
        """
        
        # Create Books table
        create_books_table = """
        CREATE TABLE IF NOT EXISTS books (
            book_id SERIAL PRIMARY KEY,
            rating_id INT,
            author_id INT,
            bookname VARCHAR(255),
            publish DATE,
            prices FLOAT,
            rating FLOAT,
            rating_count INT,
            reviews INT,
            pages_n INT,
            cover VARCHAR(50),
            bookUrl VARCHAR(255),
            FOREIGN KEY (author_id) REFERENCES authors(author_id),
            FOREIGN KEY (rating_id) REFERENCES ratings(rating_id)
        );
        """
        
        # Create Details table
        create_details_table = """
        CREATE TABLE IF NOT EXISTS details (
            book_id INT PRIMARY KEY,
            author_id INT,
            describe TEXT,
            book_title VARCHAR(255),
            FOREIGN KEY (book_id) REFERENCES books(book_id),
            FOREIGN KEY (author_id) REFERENCES authors(author_id)
        );
        """
        
        # Execute table creation commands
        cur.execute(create_authors_table)
        cur.execute(create_ratings_table)
        cur.execute(create_books_table)
        cur.execute(create_details_table)
        conn.commit()

        print("Tables have been created successfully.")
        
    except Exception as e:
        print(f"Error occurred while creating tables: {e}")
    finally:
        cur.close()
        conn.close()

# Create the database and tables



In [2]:
create_database()


Database 'goodread' has been created successfully.
Tables have been created successfully.


In [5]:
create_tables()

Tables have been created successfully.


In [6]:
import psycopg2

def create_connection():
    return psycopg2.connect(
        dbname='goodread',
        user='admin',
        password='admin',
        host='localhost',
        port='5432'
    )


In [7]:
def insert_authors():
    authors_data = [
        ('J.K. Rowling',),
        ('George Orwell',),
        ('J.R.R. Tolkien',)
    ]
    
    conn = create_connection()
    cur = conn.cursor()

    try:
        for author in authors_data:
            cur.execute("INSERT INTO authors (author_name) VALUES (%s) ON CONFLICT (author_name) DO NOTHING;", author)
        
        conn.commit()
        print("Authors data inserted successfully.")
    except Exception as e:
        print(f"Error inserting authors: {e}")
        conn.rollback()
    finally:
        cur.close()
        conn.close()


In [9]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, regexp_extract, when, date_format, to_date
import psycopg2

# Connect to Kafka
def connect_kafka():
    spark = SparkSession.builder \
        .appName('SparkKafkaToPostgres') \
        .config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,"
                                        "org.postgresql:postgresql:42.5.0") \
        .getOrCreate()
    return spark

# Load data from Kafka
def load_data(spark):
    df = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "goodread") \
        .load()
    return df

# Format data
def format_data(df):
    df = df.selectExpr("CAST(value AS STRING)") \
        .selectExpr("from_json(value, 'author STRING, bookUrl STRING, bookname STRING, describe STRING, prices STRING, publish STRING, rating STRING, ratingcount STRING, reviews STRING, fivestars STRING, fourstars STRING, threestars STRING, twostars STRING, onestar STRING, pages STRING') as jsonData") \
        .select("jsonData.*")

    # Processing string data
    df = df.withColumn("onestar", regexp_replace(col("onestar"), r"[^\d]", ""))
    df = df.withColumn("twostars", regexp_replace(col("twostars"), r"[^\d]", ""))
    df = df.withColumn("threestars", regexp_replace(col("threestars"), r"[^\d]", ""))
    df = df.withColumn("fourstars", regexp_replace(col("fourstars"), r"[^\d]", ""))
    df = df.withColumn("fivestars", regexp_replace(col("fivestars"), r"[^\d]", ""))
    df = df.withColumn("pages_n", regexp_extract(col("pages"), r"(\d+)", 1))
    df = df.withColumn("cover", regexp_extract(col("pages"), r",\s*(.*)", 1))
    df = df.withColumn("prices", when(df["prices"].like("Kindle%"), regexp_extract(col("prices"), r"\$(\d+\.\d{2})", 1)).otherwise(0))
    df = df.withColumn("publish", regexp_extract(col("publish"), r"(\w+ \d{1,2}, \d{4})", 1))
    df = df.withColumn("publish", to_date(col("publish"), "MMMM d, yyyy"))
    df = df.withColumn("publish", date_format(col("publish"), "dd/MM/yyyy"))
    df = df.withColumn("ratingcount", regexp_replace(col("ratingcount"), ",", ""))
    df = df.withColumn("reviews", regexp_replace(col("reviews"), ",", ""))
    df = df.drop("pages")

    return df

# Convert data types
def convert(df):
    df = df.withColumn("pages_n", df["pages_n"].cast("int")) \
           .withColumn("prices", df["prices"].cast("float")) \
           .withColumn("onestar", df["onestar"].cast("int")) \
           .withColumn("twostars", df["twostars"].cast("int")) \
           .withColumn("threestars", df["threestars"].cast("int")) \
           .withColumn("fourstars", df["fourstars"].cast("int")) \
           .withColumn("fivestars", df["fivestars"].cast("int")) \
           .withColumn("publish", df["publish"].cast("date")) \
           .withColumn("rating", df["rating"].cast("float")) \
           .withColumn("ratingcount", df["ratingcount"].cast("int")) \
           .withColumn("reviews", df["reviews"].cast("int"))
    

    return df


In [10]:
spark = connect_kafka()
df = load_data(spark)
df = format_data(df)
df = convert(df)

In [12]:
df = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "goodread") \
        .load()

In [13]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [14]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DateType

# Khởi tạo Spark session
spark = SparkSession.builder \
    .appName("Kafka Book Data Processing") \
    .getOrCreate()

# Định nghĩa schema cho DataFrame
schema = StructType([
    StructField("author", StringType(), True),
    StructField("bookUrl", StringType(), True),
    StructField("bookname", StringType(), True),
    StructField("describe", StringType(), True),
    StructField("prices", StringType(), True),
    StructField("publish", StringType(), True),
    StructField("rating", StringType(), True),
    StructField("ratingcount", StringType(), True),
    StructField("reviews", StringType(), True),
    StructField("fivestars", StringType(), True),
    StructField("fourstars", StringType(), True),
    StructField("threestars", StringType(), True),
    StructField("twostars", StringType(), True),
    StructField("onestar", StringType(), True),
    StructField("pages", StringType(), True)
])

# Giả sử bạn đọc dữ liệu từ Kafka vào một DataFrame
def read_from_kafka(spark):
    # Cấu hình đọc từ Kafka (cập nhật với cấu hình của bạn)
    kafka_df = spark.read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "goodread") \
        .load()

    # Chọn cột giá trị từ Kafka và chuyển đổi nó thành chuỗi
    kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")
    return kafka_df

# Hàm chuyển đổi kiểu dữ liệu
def convert(df):
    df = df.withColumn("pages_n", df["pages"].cast("int")) \
           .withColumn("prices", df["prices"].cast("float")) \
           .withColumn("onestar", df["onestar"].cast("int")) \
           .withColumn("twostars", df["twostars"].cast("int")) \
           .withColumn("threestars", df["threestars"].cast("int")) \
           .withColumn("fourstars", df["fourstars"].cast("int")) \
           .withColumn("fivestars", df["fivestars"].cast("int")) \
           .withColumn("publish", df["publish"].cast("date")) \
           .withColumn("rating", df["rating"].cast("float")) \
           .withColumn("ratingcount", df["ratingcount"].cast("int")) \
           .withColumn("reviews", df["reviews"].cast("int"))

    return df

# Hàm chính
def main(spark):
    # Đọc dữ liệu từ Kafka
    kafka_df = read_from_kafka(spark)

    # Chuyển đổi DataFrame từ JSON string sang DataFrame với schema đã định nghĩa
    book_df = kafka_df.selectExpr("from_json(value, schema) as data") \
                      .select("data.*")

    # Chuyển đổi kiểu dữ liệu
    converted_df = convert(book_df)

    # Hiển thị DataFrame sau khi chuyển đổi
    converted_df.show()

# Gọi hàm chính
if __name__ == "__main__":
    main(spark)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `schema` cannot be resolved. Did you mean one of the following? [`value`].; line 1 pos 17;
'Project ['from_json(value#533, 'schema) AS data#535]
+- Project [cast(value#520 as string) AS value#533]
   +- Relation [key#519,value#520,topic#521,partition#522,offset#523L,timestamp#524,timestampType#525] KafkaRelation(strategy=Subscribe[goodread], start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit)


In [2]:
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'goodread',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group'
)

for message in consumer:
    print(f"Key: {message.key}, Value: {message.value.decode('utf-8')}")


KeyboardInterrupt: 