In [1]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [2]:
import numpy as np
np.bool = np.bool_

In [3]:
#current notebook name
notebook_name = __session__.replace('.ipynb','')[__session__.rfind('/')+1:] 

In [4]:
# HDFS base paths
hdfs_lakehouse_base_path = 'hdfs://localhost:9000/lakehouse/'
hdfs_warehouse_base_path = 'hdfs://localhost:9000/warehouse'

In [5]:
import os
dependencies = ["org.apache.spark:spark-avro_2.12:3.5.0",
                "io.delta:delta-iceberg_2.12:3.0.0","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"]
os.environ['PYSPARK_SUBMIT_ARGS']= f"--packages {','.join(dependencies)} pyspark-shell"
os.environ['PYARROW_IGNORE_TIMEZONE'] = 'true'

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Initialize Spark Session with Kafka Support
spark = (SparkSession.builder
    .appName(notebook_name)
    .config("spark.log.level", "ERROR")
    .config("spark.sql.warehouse.dir", hdfs_warehouse_base_path)
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.streaming.schemaInference", "true")  # Enable schema inference for streaming data
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"  # Kafka integration
            "io.delta:delta-core_2.12:2.4.0")  # Delta Lake support
    .enableHiveSupport()
    .getOrCreate()
)

25/03/16 14:29:37 WARN Utils: Your hostname, osbdet resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s1)
25/03/16 14:29:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
io.delta#delta-iceberg_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7c747dc9-5c2f-488a-a53a-a9cc32fe5e77;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-avro_2.12;3.5.0 in central
	found org.tukaani#xz;1.9 in central
	found io.delta#delta-iceberg_2.12;3.0.0 in central
	found io.delta#delta-spark_2.12;3.0.0 in central
	found io.delta#delta-storage;3.0.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.1.1 in central
	found com.github.ben-manes.caffeine#caffeine;2.9.3 in central
	found org.checkerframework#checker-qual;3.19.0 in central
	found com.google.errorprone#error_prone_annotations;2.10.0 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-c

In [7]:
btc_price = spark.read.format("delta").load(f"{hdfs_lakehouse_base_path}/silver/trump_btc/BTC/").cache()
btc_stream_df = spark.readStream.format("delta").load(f"{hdfs_lakehouse_base_path}/silver/trump_btc/BTC/")

fg_df = spark.read.format("delta").load(f"{hdfs_lakehouse_base_path}/silver/trump_btc/fear_greed_index").cache()

                                                                                

In [9]:
btc_stream_df = btc_stream_df.withColumn("date_minute", col("date").cast("string"))

# Print schema to confirm
btc_stream_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- date_minute: string (nullable = true)



In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Define schema for incoming Twitter data
twitter_schema = StructType([
    StructField("date", StringType(), True),
    StructField("favorites", DoubleType(), True),
    StructField("id", StringType(), True),
    StructField("isRetweet", BooleanType(), True),
    StructField("retweets", DoubleType(), True),
    StructField("text", StringType(), True)
])


twitter_stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "btc_price")
    .option("startingOffsets", "latest")
    .load()
)


In [None]:
from pyspark.sql.functions import col
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Extract JSON from Kafka's `value` column
twitter_df = twitter_stream.withColumn(
    "parsed_value", from_json(col("value").cast("string"), twitter_schema)
).select("parsed_value.*")

# Initialize Vader Sentiment Analyzer
analyzer = SentimentIntensityAnalyzer()

def analyze_sentiment(text):
    if text:  
        return analyzer.polarity_scores(text)["compound"]
    return 0.0  


# Define a function to process each micro-batch
#def process_batch(batch_df, batch_id):
#    if not batch_df.isEmpty():
#        # Convert to Pandas DataFrame
#        pdf = batch_df.toPandas()
#
#        # Apply sentiment analysis using Pandas
#        pdf["sentiment_score"] = pdf["text"].apply(lambda text: analyzer.polarity_scores(text)["compound"] if text else 0.0)

#        # Convert back to Spark DataFrame
#        spark_df = spark.createDataFrame(pdf)
#
#        # Write to Console (for debugging)
#        spark_df.show(truncate=False)


def process_batch(batch_df, batch_id):
    if not batch_df.isEmpty():
        # Extract unique 'date_minute' values from the batch
        unique_dates = batch_df.select("date_minute").distinct()

        # Filter BTC data to include only relevant timestamps
        filtered_btc_df = btc_stream_df.join(unique_dates, on="date_minute", how="inner")

        # Perform join between batch tweets and BTC prices
        enriched_df = batch_df.join(filtered_btc_df, on="date_minute", how="inner")

        # Convert to Pandas DataFrame for sentiment analysis
        pdf = enriched_df.toPandas()

        # Apply sentiment analysis using Pandas
        pdf["sentiment_score"] = pdf["text"].apply(lambda text: analyzer.polarity_scores(text)["compound"] if text else 0.0)

        # Convert back to Spark DataFrame
        spark_df = spark.createDataFrame(pdf)

        # Select required columns and rename 'Close' as 'btc_price'
        final_df = spark_df.select("date_minute", "favorites", "retweets", "text", "sentiment_score", col("Close").alias("btc_price"))

        # Write to console (or save to HDFS/Kafka if needed)
        final_df.show(truncate=False)






                                                     


In [11]:
from pyspark.sql.functions import col
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Extract JSON from Kafka's `value` column
twitter_df = twitter_stream.withColumn(
    "parsed_value", from_json(col("value").cast("string"), twitter_schema)
).select("parsed_value.*")

# Initialize Vader Sentiment Analyzer
analyzer = SentimentIntensityAnalyzer()

def analyze_sentiment(text):
    if text:  
        return analyzer.polarity_scores(text)["compound"]
    return 0.0  

from pyspark.sql.functions import date_format, col
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Initialize VADER Sentiment Analyzer
analyzer = SentimentIntensityAnalyzer()

def process_batch(batch_df, batch_id):
    if not batch_df.isEmpty():
        print(f"Processing Batch {batch_id}")

        # Ensure 'date_minute' exists before using it
        if "date_minute" not in batch_df.columns:
            print("Warning: 'date_minute' column missing in batch. Creating it dynamically...")
            batch_df = batch_df.withColumn("date_minute", date_format(col("date"), "yyyy-MM-dd HH:mm"))

        # Extract unique 'date_minute' values from batch
        unique_dates = batch_df.select("date_minute").distinct()

        # Ensure BTC data is also in correct format
        btc_filtered_df = btc_stream_df.withColumn("date_minute", col("date_minute").cast("string"))

        # Perform join between batch tweets and BTC prices
        enriched_df = batch_df.join(btc_filtered_df, on="date_minute", how="inner")

        # Apply Sentiment Analysis using Pandas
        pdf = enriched_df.toPandas()
        pdf["sentiment_score"] = pdf["text"].apply(lambda text: analyzer.polarity_scores(text)["compound"] if text else 0.0)

        # Convert back to Spark DataFrame
        spark_df = spark.createDataFrame(pdf)

        # Select required columns and rename 'Close' as 'btc_price'
        final_df = spark_df.select("date_minute", "favorites", "retweets", "text", "sentiment_score", col("Close").alias("btc_price"))

        # Show results
        final_df.show(truncate=False)


In [12]:
query = twitter_df.writeStream \
    .foreachBatch(process_batch) \
    .start()

query.awaitTermination()

                                                                                

Processing Batch 1


25/03/16 14:30:01 ERROR MicroBatchExecution: Query [id = 36a9c1fb-ec03-498a-888c-880801b13e4e, runId = d4cd1802-c1b6-4294-8c43-3c17628a7ac9] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/sql/utils.py", line 120, in call
    raise e
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/sql/utils.py", line 117, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/tmp/ipykernel_29124/1230733154.py", line 42, in process_batch
    pdf = enriched_df.toPandas()
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages

StreamingQueryException: [STREAM_FAILED] Query [id = 36a9c1fb-ec03-498a-888c-880801b13e4e, runId = d4cd1802-c1b6-4294-8c43-3c17628a7ac9] terminated with exception: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/sql/utils.py", line 120, in call
    raise e
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/sql/utils.py", line 117, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/tmp/ipykernel_29124/1230733154.py", line 42, in process_batch
    pdf = enriched_df.toPandas()
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py", line 202, in toPandas
    rows = self.collect()
           ^^^^^^^^^^^^^^
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 1257, in collect
    sock_info = self._jdf.collectToPython()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
delta
