In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, year, month
from pyspark.sql.types import StructType, StringType, IntegerType, FloatType, TimestampType, MapType
from pyspark.sql.types import DoubleType
import happybase


In [11]:
spark.stop()

In [2]:
spark = SparkSession.builder \
    .appName("KafkaStreamingExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8") \
    .getOrCreate()


In [3]:
bootstrap_servers = ""
kafka_topic = ""
kafka_username = ""
kafka_password = ""

In [4]:
schema = StructType() \
    .add("eventType", StringType()) \
    .add("customerId", StringType()) \
    .add("productId", StringType()) \
    .add("timestamp", TimestampType()) \
    .add("metadata", StructType()
        .add("category", StringType())
        .add("source", StringType())
    ) \
    .add("quantity", IntegerType()) \
    .add("totalAmount", DoubleType()) \
    .add("paymentMethod", StringType()) \
    .add("recommendedProductId", StringType()) \
    .add("algorithm", StringType())

In [5]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_username}" password="{kafka_password}";') \
    .load()

In [6]:
json_df = df.selectExpr("CAST(value AS STRING)").select(from_json("value", schema).alias("data")).select("data.*")


In [7]:
transformed_df = json_df \
    .withColumn("timestamp", col("timestamp").cast(TimestampType())) \
    .withColumn("year", year(col("timestamp"))) \
    .withColumn("month", month(col("timestamp"))) \
    .withColumn("category", col("metadata").getItem("category")) \
    .withColumn("source", col("metadata").getItem("source"))


In [8]:
def write_to_hbase(df, namespace, table_name, thrift_server, port=9090, batch_size=10):
    print(f"Starting to write data to HBase table: {namespace}:{table_name}")
    
    try:
        connection = happybase.Connection(thrift_server, port=port)
        print(f"connected to HBase at {thrift_server}:{port}")
        
        table = connection.table(f'{namespace}:{table_name}')
        print(f"Accessing table: {namespace}:{table_name}")
        
        tables = connection.tables()
        print(f"Available tables: {tables}")
        
        batch = table.batch(batch_size=batch_size)
        row_count = 0
        
        for row in df.collect():
            try:
                row_key = str(row['customerId'])
                
                
                event_data = {
                    'event_info:eventType': str(row['eventType']),
                    'event_info:timestamp': str(row['timestamp'])
                }
                
                transaction_data = {
                    'transaction_data:productId': str(row['productId']) if row['productId'] is not None else '',
                    'transaction_data:quantity': str(row['quantity']) if row['quantity'] is not None else '',
                    'transaction_data:totalAmount': str(row['totalAmount']) if row['totalAmount'] is not None else '',
                    'transaction_data:paymentMethod': str(row['paymentMethod']) if row['paymentMethod'] is not None else '',
                    'transaction_data:recommendedProductId': str(row['recommendedProductId']) if row['recommendedProductId'] is not None else '',
                    'transaction_data:algorithm': str(row['algorithm']) if row['algorithm'] is not None else ''
                }
                
                
                all_data = {**event_data, **{k: v for k, v in transaction_data.items() if v}}
                
                
                batch.put(row_key, all_data)
                
                row_count += 1
                if row_count % batch_size == 0:
                    batch.send()
                    print(f"sent batch of {batch_size} rows. total rows: {row_count}")
                    batch = table.batch(batch_size=batch_size)
            
            except Exception as row_error:
                print(f"processing row: {str(row_error)}")
                print(f"error: {row}")
        
        
        if row_count % batch_size != 0:
            batch.send()
            print(f"Sent final batch. Total rows written: {row_count}")
        
        return row_count
    
    except Exception as e:
        print(f"Error writing to HBase: {str(e)}")
        return 0

In [9]:
 def process_batch(df, epoch_id):
     try:
         rows_written = write_to_hbase(df, 'streaming','events', 'hbase-thrift')
         print(f"Batch processed. Epoch ID: {epoch_id}, Rows written: {rows_written}")
     except Exception as e:
         print(f"Error processing batch: {str(e)}")

In [10]:
 query = transformed_df \
     .writeStream \
     .foreachBatch(process_batch) \
     .outputMode("append") \
     .option("checkpointLocation", "hdfs://localhost:9000//user/streaming_check") \
     .start()
query.awaitTermination()

Starting to write data to HBase table: streaming:events
Successfully connected to HBase at hbase-thrift:9090
Accessing table: streaming:events
Available tables: [b'streaming:events']
Sent batch of 10 rows. Total rows so far: 10
Sent batch of 10 rows. Total rows so far: 20
Batch processed. Epoch ID: 493, Rows written: 20
Starting to write data to HBase table: streaming:events
Successfully connected to HBase at hbase-thrift:9090
Accessing table: streaming:events
Available tables: [b'streaming:events']
Sent final batch. Total rows written: 5
Batch processed. Epoch ID: 494, Rows written: 5
Starting to write data to HBase table: streaming:events
Successfully connected to HBase at hbase-thrift:9090
Accessing table: streaming:events
Available tables: [b'streaming:events']
Sent final batch. Total rows written: 1
Batch processed. Epoch ID: 495, Rows written: 1
Starting to write data to HBase table: streaming:events
Successfully connected to HBase at hbase-thrift:9090
Accessing table: streaming:

KeyboardInterrupt: 

Starting to write data to HBase table: streaming:events
Successfully connected to HBase at hbase-thrift:9090
Accessing table: streaming:events
Available tables: [b'streaming:events']
Sent final batch. Total rows written: 1
Batch processed. Epoch ID: 512, Rows written: 1
Starting to write data to HBase table: streaming:events
Successfully connected to HBase at hbase-thrift:9090
Accessing table: streaming:events
Available tables: [b'streaming:events']
Sent final batch. Total rows written: 1
Batch processed. Epoch ID: 513, Rows written: 1
Starting to write data to HBase table: streaming:events
Successfully connected to HBase at hbase-thrift:9090
Accessing table: streaming:events
Available tables: [b'streaming:events']
Sent final batch. Total rows written: 1
Batch processed. Epoch ID: 514, Rows written: 1
Starting to write data to HBase table: streaming:events
Successfully connected to HBase at hbase-thrift:9090
Accessing table: streaming:events
Available tables: [b'streaming:events']
Sent

In [10]:
query = transformed_df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://localhost:9000//user/streaming") \
    .option("checkpointLocation", "hdfs://localhost:9000//user/streaming_check") \
    .start()

#query.awaitTermination()

In [13]:
streaming_df = spark.read.parquet("/user/streaming/*.parquet")

In [14]:
streaming_df.show()

+-------------------+----------+---------+-------------------+--------------------+--------+-----------+-------------+--------------------+--------------------+----+-----+--------------+------+
|          eventType|customerId|productId|          timestamp|            metadata|quantity|totalAmount|paymentMethod|recommendedProductId|           algorithm|year|month|      category|source|
+-------------------+----------+---------+-------------------+--------------------+--------+-----------+-------------+--------------------+--------------------+----+-----+--------------+------+
|        productView|     63267|     4231|2024-07-28 05:27:45|     [Books, Direct]|    null|       null|         null|                null|                null|2024|    7|         Books|Direct|
|          addToCart|     45129|     8421|2024-07-28 05:27:47|                 [,]|       1|       null|         null|                null|                null|2024|    7|          null|  null|
|recommendationClick|     9326

In [None]:
SELECT 
    productId, 
    SUM(totalAmount) as total_sales 
FROM 
    streaming_data 
GROUP BY 
    productId 
ORDER BY 
    total_sales DESC 
LIMIT 5;


In [None]:
SELECT 
    metadata.category, 
    YEAR('timestamp') as year, 
    MONTH('timestamp') as month, 
    SUM(totalAmount) as total_sales, 
    SUM(quantity) as total_quantity 
FROM 
    streaming_data 
GROUP BY 
    metadata.category, 
    YEAR('timestamp'), 
    MONTH('timestamp') 
ORDER BY 
    year, 
    month, 
    metadata.category;


In [None]:
streaming_df.write\
    .mode("overwrite")\
    .option("path","/user/streaming/*.parquet")\
    .saveAsTable("default.streaming_table")

In [43]:
spark.stop()