In [3]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("TestStockSymbols") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define the schema for the table
schema = "symbol STRING, name STRING, price DOUBLE, volume INT"

# Create a DataFrame with sample data
data = [
    ("AAPL", "Apple Inc.", 175.64, 3000000),
    ("MSFT", "Microsoft Corp.", 341.07, 2500000),
    ("GOOGL", "Alphabet Inc.", 2724.34, 1800000)
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

# Write DataFrame to Delta table in HDFS
df.write.format("delta").mode("overwrite").save("hdfs://namenode:8020/testlakehouse/TestStockSymbols")

# Drop the existing table if it exists
spark.sql("DROP TABLE IF EXISTS TestStockSymbols")

# Register the Delta table in Spark SQL catalog
spark.sql("""
    CREATE TABLE TestStockSymbols
    USING DELTA
    LOCATION 'hdfs://namenode:8020/testlakehouse/TestStockSymbols'
""")


                                                                                

24/07/23 19:29:28 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `default`.`teststocksymbols` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


DataFrame[]

In [4]:
# Show tables in the current database

spark.sql("SHOW TABLES").show()

+---------+----------------+-----------+
|namespace|       tableName|isTemporary|
+---------+----------------+-----------+
|  default|teststocksymbols|      false|
+---------+----------------+-----------+



In [5]:
# Query to select all records from the test_table
result = spark.sql("SELECT * FROM teststocksymbols")

# Show the results
result.show()

+------+---------------+-------+-------+
|symbol|           name|  price| volume|
+------+---------------+-------+-------+
|  MSFT|Microsoft Corp.| 341.07|2500000|
| GOOGL|  Alphabet Inc.|2724.34|1800000|
|  AAPL|     Apple Inc.| 175.64|3000000|
+------+---------------+-------+-------+



In [6]:
# Create a DataFrame with additional sample data
additional_data = [
    ("TSLA", "Tesla Inc.", 890.10, 2000000),
    ("AMZN", "Amazon.com Inc.", 139.68, 2200000),
    ("NVDA", "NVIDIA Corporation", 585.54, 1500000)
]

# Create DataFrame
additional_df = spark.createDataFrame(additional_data, schema=schema)

# Append data to the Delta table in HDFS
additional_df.write.format("delta").mode("append").save("hdfs://namenode:8020/testlakehouse/TestStockSymbols")


                                                                                

In [7]:
# Query to select all records from the test_table
result = spark.sql("SELECT * FROM teststocksymbols")

# Show the results
result.show()

+------+------------------+-------+-------+
|symbol|              name|  price| volume|
+------+------------------+-------+-------+
|  NVDA|NVIDIA Corporation| 585.54|1500000|
|  AMZN|   Amazon.com Inc.| 139.68|2200000|
|  MSFT|   Microsoft Corp.| 341.07|2500000|
| GOOGL|     Alphabet Inc.|2724.34|1800000|
|  AAPL|        Apple Inc.| 175.64|3000000|
|  TSLA|        Tesla Inc.|  890.1|2000000|
+------+------------------+-------+-------+



In [8]:
# Describe the table using Spark SQL
spark.sql("DESCRIBE teststocksymbols").show()

+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|         symbol|   string|       |
|           name|   string|       |
|          price|   double|       |
|         volume|      int|       |
|               |         |       |
| # Partitioning|         |       |
|Not partitioned|         |       |
+---------------+---------+-------+



In [9]:
import pyspark.sql.functions as F

# Read Delta table
df = spark.read.format("delta").load("hdfs://namenode:8020/testlakehouse/TestStockSymbols")

# Generate random values for 'price' and 'volume'
df_with_random_values = df \
    .withColumn("price", F.round(F.rand() * 1000, 2)) \
    .withColumn("volume", F.round(F.rand() * 5000000).cast("int"))

# Overwrite the Delta table with new random values
df_with_random_values.write.format("delta").mode("overwrite").save("hdfs://namenode:8020/testlakehouse/TestStockSymbols")

# Register the Delta table in Spark SQL catalog again
spark.sql("DROP TABLE IF EXISTS TestStockSymbols")
spark.sql("""
    CREATE TABLE TestStockSymbols
    USING DELTA
    LOCATION 'hdfs://namenode:8020/testlakehouse/TestStockSymbols'
""")

# Show the results
result = spark.sql("SELECT * FROM TestStockSymbols ORDER BY symbol ASC")
result.show()

                                                                                

24/07/23 19:30:33 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `default`.`teststocksymbols` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
+------+------------------+------+-------+
|symbol|              name| price| volume|
+------+------------------+------+-------+
|  AAPL|        Apple Inc.| 74.05|  31001|
|  AMZN|   Amazon.com Inc.|224.37|2636527|
| GOOGL|     Alphabet Inc.|473.95|1672081|
|  MSFT|   Microsoft Corp.|889.79| 896824|
|  NVDA|NVIDIA Corporation|693.94|4694680|
|  TSLA|        Tesla Inc.| 867.6| 344625|
+------+------------------+------+-------+



In [10]:
# Show all tables in the current database
tables = spark.sql("SHOW TABLES").select("tableName").rdd.flatMap(lambda x: x).collect()

# Describe each table
for table in tables:
    print(f"Describing table: {table}")
    description = spark.sql(f"DESCRIBE {table}")
    description.show(truncate=False)

Describing table: teststocksymbols
+---------------+---------+-------+
|col_name       |data_type|comment|
+---------------+---------+-------+
|symbol         |string   |       |
|name           |string   |       |
|price          |double   |       |
|volume         |int      |       |
|               |         |       |
|# Partitioning |         |       |
|Not partitioned|         |       |
+---------------+---------+-------+



In [11]:
current_database = spark.sql("SELECT current_database()").collect()[0][0]
print(f"Current database: {current_database}")


Current database: default


In [12]:
# Show the results
result = spark.sql("SELECT * FROM TestStockSymbols ORDER BY symbol ASC")
result.show()

+------+------------------+------+-------+
|symbol|              name| price| volume|
+------+------------------+------+-------+
|  AAPL|        Apple Inc.| 74.05|  31001|
|  AMZN|   Amazon.com Inc.|224.37|2636527|
| GOOGL|     Alphabet Inc.|473.95|1672081|
|  MSFT|   Microsoft Corp.|889.79| 896824|
|  NVDA|NVIDIA Corporation|693.94|4694680|
|  TSLA|        Tesla Inc.| 867.6| 344625|
+------+------------------+------+-------+



In [36]:
!pip install kafka-python
!pip show kafka-python
# Location: /home/NBuser/.local/lib/python3.9/site-packages

import sys

# Print current sys.path
print("Current sys.path:")
for path in sys.path:
    print(path)

# Path to append
new_path = '/home/NBuser/.local/lib/python3.9/site-packages'

# Append path if it's not already present
if new_path not in sys.path:
    sys.path.append(new_path)
    print(f"Added '{new_path}' to sys.path")
else:
    print(f"'{new_path}' is already in sys.path")

# Verify updated sys.path
print("Updated sys.path:")
for path in sys.path:
    print(path)


Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
Name: kafka-python
Version: 2.0.2
Summary: Pure Python client for Apache Kafka
Home-page: https://github.com/dpkp/kafka-python
Author: Dana Powers
Author-email: dana.powers@gmail.com
License: Apache License 2.0
Location: /home/NBuser/.local/lib/python3.9/site-packages
Requires: 
Required-by: 
Current sys.path:
/opt/spark/work-dir/volume/testsample
/tmp/spark-dcbe9d73-eae7-4d31-8a2c-1fc4e910ff77/userFiles-16e66e1c-b01f-48c3-aae5-72f5dfecb858/org.antlr_antlr4-runtime-4.8.jar
/tmp/spark-dcbe9d73-eae7-4d31-8a2c-1fc4e910ff77/userFiles-16e66e1c-b01f-48c3-aae5-72f5dfecb858/io.delta_delta-storage-2.3.0.jar
/tmp/spark-dcbe9d73-eae7-4d31-8a2c-1fc4e910ff77/userFi

In [43]:
!curl -X GET http://172.18.0.99:9092

curl: (52) Empty reply from server


In [46]:
!kafka-topics.sh --list --bootstrap-server 172.18.0.99:9092

/bin/sh: line 1: kafka-topics.sh: command not found


In [45]:
from kafka import KafkaAdminClient, KafkaConsumer

# Define Kafka parameters
kafka_bootstrap_servers = "localhost:9092"  # Adjust this to your Kafka broker's address

# Initialize Kafka admin client
admin_client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_servers)

# Get the list of topics
try:
    topics = admin_client.list_topics()
    print(f"Topics available: {topics}")
except Exception as e:
    print(f"Failed to connect to Kafka: {e}")

# Initialize Kafka consumer to check topic data (optional)
consumer = KafkaConsumer(bootstrap_servers=kafka_bootstrap_servers)

# Check if specific topic exists
topic = "phongdinhcstest"  # Replace with your topic
if topic in consumer.topics():
    print(f"Topic '{topic}' exists.")
else:
    print(f"Topic '{topic}' does not exist.")


ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error 99. Disconnecting.


NoBrokersAvailable: NoBrokersAvailable

In [None]:
from kafka import KafkaProducer

# Initialize Kafka producer
producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)

# Send a test message
try:
    producer.send('phongdinhcstest', key=b'key', value=b'test message')
    producer.flush()
    print("Test message sent successfully.")
except Exception as e:
    print(f"Failed to send message: {e}")


In [None]:
from kafka import KafkaConsumer

# Initialize Kafka consumer
consumer = KafkaConsumer(
    'phongdinhcstest',
    bootstrap_servers=kafka_bootstrap_servers,
    auto_offset_reset='earliest'
)

# Poll for messages
try:
    for message in consumer:
        print(f"Received message: {message.value.decode('utf-8')}")
        break  # Stop after receiving the first message
except Exception as e:
    print(f"Failed to receive message: {e}")


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaSparkIntegration") \
    .getOrCreate()

# Set Kafka parameters
kafka_bootstrap_servers = "kafka:9092"
topic = "phongdinhcstest"

# Read data from Kafka topic
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .load()

# Extract the value column and convert it to string
df = df.selectExpr("CAST(value AS STRING)")

# Write the data to console
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()
