In [1]:
import re
import json
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import udf, col, size
from pyspark.sql.types import FloatType, ArrayType
import re

spark_session = SparkSession.builder\
        .master("spark://spark-master:7077") \
        .appName("test_run")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 16)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# RDD API
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/10 13:46:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
DATA_PATH = 'data/reddit.json'
lines = spark_context.textFile(f"hdfs://spark-master:9000/{DATA_PATH}")
#lines.take(1)

In [3]:
# Parse each JSON line into a Python dict
parsed_rdd = lines.map(lambda line: json.loads(line))

#parsed_rdd.take(1)

In [4]:
custom_schema = StructType([
    StructField("content", StringType(), True),          # Comment text
    StructField("summary", StringType(), True),     # Timestamp
    StructField("author", StringType(), True),        # Username
    StructField("subreddit", StringType(), True)      # Subreddit name
])

df = spark_session.createDataFrame(parsed_rdd, schema=custom_schema)

#df.printSchema()
#df.show()

# Count total number of rows
#print(f"Total Rows: {df.count()}")

In [5]:
tech_tickers = [
    "AAPL",  # Apple Inc.
    "MSFT",  # Microsoft Corporation
    "GOOGL", "GOOG",  # Alphabet Inc. (Google)
    "AMZN",  # Amazon.com Inc.
    "META",  # Meta Platforms Inc. (Facebook)
    "NVDA",  # NVIDIA Corporation
    "TSLA",  # Tesla Inc.
    "INTC",  # Intel Corporation
    "AMD",  # Advanced Micro Devices Inc.
    "IBM",  # International Business Machines
    "CSCO",  # Cisco Systems Inc.
    "ORCL",  # Oracle Corporation
    "NFLX",  # Netflix Inc.
    "ADBE",  # Adobe Inc.
    "CRM",  # Salesforce Inc.
    "PLTR"  # Palantir Technologies Inc.
]

In [6]:
# Convert the list to a set for faster lookups
tech_tickers_set = set(tech_tickers)

# Define the UDF
def extract_tech_tickers(text):
    import re
    if not text:
        return []
    # Split text into words and clean them
    words = re.findall(r'\b\w+\b', text.upper())  # Split into words, ignoring punctuation
    # Check if cleaned word is in the ticker list
    found_tickers = []
    for word in words:
        cleaned_word = re.sub(r'[^A-Z]', '', word)  # Remove non-alphabetic characters
        if cleaned_word in tech_tickers_set:
            found_tickers.append(cleaned_word)
    return list(set(found_tickers))  # Deduplicate

# Register the UDF
extract_tickers_udf = udf(extract_tech_tickers, ArrayType(StringType()))

In [7]:
# Apply UDF to the "content" column and filter rows with tickers
df = df.withColumn("tickers", extract_tickers_udf(col("content"))) \
       .filter(size(col("tickers")) > 0)

#df.show(5)

In [8]:
positive_words = ["bullish", "moon", "buy", "growth", "strong", "profit", "win"]
negative_words = ["bearish", "crash", "sell", "loss", "weak", "fraud", "dump"]

In [9]:
def custom_sentiment(text):
    if not text:
        return 0.0
    words = text.lower().split()
    positive = sum(1 for word in words if word in positive_words)
    negative = sum(1 for word in words if word in negative_words)
    return (positive - negative) / max(len(words), 1)  # Normalize by text length

sentiment_udf = udf(custom_sentiment, FloatType())

In [10]:
df = df.withColumn("sentiment", sentiment_udf(col("content")))

#df.show(5)

In [11]:
df.collect()

                                                                                

[Row(content='Yeah, but at least they are absolutely terrible. You\'ll catch up eventually, just ignore them. \n Real smurfs are only there to play with their friends, and will rarely rage at you because they actually know what they are doing and will not rely on team mates to do the same. They feel bad when they stomp too hard, and will usually apologize or try hard not to rub it in people\'s faces or go out of their way to steal kills from team mates \n Bad smurfs will get angry at every mistake you do, use terms like "meta" or "counter jungle" to try to confuse you and to make themselves look cool, and then get angry when you couldn\'t tryhard as hard as them. They will get angry at team mates saying "gg noob team" at end of the game. They will usually lose because they are unable to adapt to lower level playstyles and will get stomped because they try to conform to "high elo play tactics" while playing againts level 5\'s.', summary='all those people calling you bad are bad. Just ke

In [12]:
spark_session.stop()