In [1]:
# Only if using Jupyter + PySpark
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.hadoop:hadoop-aws:3.3.6,com.amazonaws:aws-java-sdk-bundle:1.12.374 pyspark-shell"


In [3]:
import findspark
findspark.init("/opt/spark")

In [54]:
from pyspark.sql import SparkSession

# Stop any old session first
try:
    spark.stop()
except:
    pass

spark = (
    SparkSession.builder
    .appName("MinioTest")
    .config("spark.master", "spark://spark-master:7077")

    # S3A core config
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minio")
    .config("spark.hadoop.fs.s3a.secret.key", "minio123")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")  # <-- important for HTTP MinIO

    # Credentials providers (no AWS v2 class)
    .config(
        "spark.hadoop.fs.s3a.aws.credentials.provider",
        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider,"
        "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"
    )

    # Multipart cleanup: override "24h" etc with numeric ms
    .config("spark.hadoop.fs.s3a.multipart.purge.age", "86400000")      # 24h
    .config("spark.hadoop.fs.s3a.multipart.purge.interval", "3600000")  # 1h

    # --- S3A timeouts (ms) ---
    .config("spark.hadoop.fs.s3a.connection.timeout", "60000")
    .config("spark.hadoop.fs.s3a.connection.request.timeout", "60000")
    .config("spark.hadoop.fs.s3a.connection.establish.timeout", "60000")
    .config("spark.hadoop.fs.s3a.connection.acquisition.timeout", "60000")
    .config("spark.hadoop.fs.s3a.connection.idle.time", "60000")
    .config("spark.hadoop.fs.s3a.connection.ttl", "300000")

    # --- S3A thread pool / connection pool (ALL numeric) ---
    .config("spark.hadoop.fs.s3a.threads.max", "96")
    .config("spark.hadoop.fs.s3a.threads.keepalivetime", "60000")  # <--- key that defaulted to "60s"
    .config("spark.hadoop.fs.s3a.connection.maximum", "200")
    .config("spark.hadoop.fs.s3a.max.total.tasks", "1000")
    .config("spark.hadoop.fs.s3a.max.metadata.tasks", "100")
    .config("spark.hadoop.fs.s3a.max.total.connections", "200")

    # --- Spark-level timeouts as numbers ---
    .config("spark.files.fetchTimeout", "60000")
    .config("spark.network.timeout", "600000")
    .config("spark.rpc.askTimeout", "600000")
    
    # (plus the timeouts / thread configs you already set)

    .config("spark.submit.pyFiles", "/home/spark/.local/lib/python3.10/site-packages/vaderSentiment")
    .getOrCreate()
)





  /opt/spark/python
  /tmp/spark-ee5edadb-6979-49a8-922e-ce3393a41c21/userFiles-7b551373-f8cb-4b9c-96a3-5a22b29d56d1
  /tmp/spark-ee5edadb-6979-49a8-922e-ce3393a41c21/userFiles-1ea7bc59-467f-4e94-8256-fe4ab620bc78
  /opt/spark/python/lib/py4j-0.10.9.9-src.zip
  /opt/spark/python
  /opt/spark/python/lib/py4j-0.10.9.9-src.zip
  /usr/lib/python310.zip
  /usr/lib/python3.10
  /usr/lib/python3.10/lib-dynload
  
  /usr/local/lib/python3.10/dist-packages
  /usr/lib/python3/dist-packages


In [59]:
import sys
sys.path.append("/home/spark/.local/lib/python3.10/site-packages")



In [60]:
!pip install vaderSentiment

Defaulting to user installation because normal site-packages is not writeable


In [95]:
from pyspark.sql.functions import length,  regexp_replace, trim, col, to_timestamp, to_date, udf,size, split, pandas_udf, PandasUDFType
from pyspark.sql.types import BooleanType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from urllib.parse import urlparse
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pandas as pd

In [62]:
df = spark.read.json("s3a://test/")
df.show(5, truncate=False)


                                                                                

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
df

DataFrame[content: string, url: string]

In [11]:
df.printSchema()

root
 |-- content: string (nullable = true)
 |-- pub_date: string (nullable = true)
 |-- url: string (nullable = true)



# Cleaning

In [75]:


df_columnar = df.withColumn(
    "timestamp", to_timestamp(col("pub_date"))
).withColumn(
    "date", to_date(col("timestamp"))
)


In [76]:


df_columnar = df_columnar.withColumn(
    "clean_text",
    regexp_replace(col("content"), r"[\n\r\t]", " ")  # remove new lines
)

# Remove HTML tags
df_columnar = df_columnar.withColumn(
    "clean_text",
    regexp_replace(col("clean_text"), r"<.*?>", "")
)

# Remove long sequences of --- +++ === ___ *** etc.
df_columnar = df_columnar.withColumn(
    "clean_text",
    regexp_replace(col("clean_text"), r"([-=_+*]{3,})", " ")
)

# Normalize multiple spaces
df_columnar = df_columnar.withColumn(
    "clean_text",
    regexp_replace(col("clean_text"), r"\s{2,}", " ")
)

df_columnar = df_columnar.withColumn("clean_text", trim(col("clean_text")))


In [77]:
df_columnar = df_columnar.drop("content")
df_columnar.show(1, truncate=False)

[Stage 2:>                                                          (0 + 1) / 1]

+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [78]:
#remove empty articles
df_columnar = df_columnar.filter(col("clean_text").isNotNull() & (col("clean_text") != ""))

In [79]:
#word count
df_columnar = df_columnar.withColumn(
    "word_count",
    size(split(col("clean_text"), r"\s+"))
)


In [80]:
def get_domain(url):
    try:
        return urlparse(url).netloc
    except:
        return None

extract_domain_udf = udf(get_domain)

df_columnar = df_columnar.withColumn("domain", extract_domain_udf(col("url")))

In [81]:
df_columnar.select("timestamp").show(20, truncate=False)


[Stage 3:>                                                          (0 + 1) / 1]

+-------------------+
|timestamp          |
+-------------------+
|2025-11-10 00:00:00|
|2025-10-14 00:00:00|
|2025-10-22 00:00:00|
|2025-10-29 00:00:00|
|2025-10-23 00:00:00|
|2025-10-28 00:00:00|
|2025-10-28 00:00:00|
|2025-10-18 00:00:00|
|2025-10-28 00:00:00|
|2025-11-03 00:00:00|
|2025-11-03 00:00:00|
|2025-11-05 00:00:00|
|2025-10-22 00:00:00|
|2025-10-19 00:00:00|
|2025-10-18 00:00:00|
|2025-10-27 00:00:00|
|2025-10-18 00:00:00|
|2025-10-22 00:00:00|
|2025-10-22 00:00:00|
|2025-05-21 00:00:00|
+-------------------+
only showing top 20 rows


                                                                                

In [82]:
df_columnar.printSchema()

root
 |-- pub_date: string (nullable = true)
 |-- url: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- date: date (nullable = true)
 |-- clean_text: string (nullable = true)
 |-- word_count: integer (nullable = true)
 |-- domain: string (nullable = true)



In [83]:
columns_to_drop = ["timestamp", "pub_date", "url"]
df_columnar = df_columnar.drop(*columns_to_drop)

# Extract Company/Events features

In [99]:
cosmo_keywords = ["Cosmo Pharmaceuticals", "Cosmo Pharma"]
competitor_keywords = ["Norgine", "TRB Chemedica", "EndoChoice", "PhaseBio", "Alkermes", "Genmab"]
industry_keywords = ["pharma", "biotech", "drug", "healthcare", "GI", "dermatology"]
def contains_keyword(text, keywords):
    text_lower = text.lower()
    for kw in keywords:
        if kw.lower() in text_lower:
            return True
    return False

is_cosmo_udf = udf(lambda x: contains_keyword(x, cosmo_keywords), BooleanType())
is_competitor_udf = udf(lambda x: contains_keyword(x, competitor_keywords), BooleanType())
is_industry_udf = udf(lambda x: contains_keyword(x, competitor_keywords), BooleanType())

df_columnar = df_columnar.withColumn("is_cosmo", is_cosmo_udf("clean_text")) \
                         .withColumn("is_competitor", is_competitor_udf("clean_text"))\
                         .withColumn("is_industry", is_industry_udf("clean_text"))


In [105]:
clinical_keywords = ["phase 1", "phase 2", "phase 3", "clinical trial", "efficacy", "trial results"]
regulatory_keywords = ["FDA approval", "EMA approval", "rejection", "CRL", "complete response letter"]
patent_keywords = ["patent", "intellectual property", "IP", "exclusivity"]
executive_keywords = ["CEO", "CFO", "CBO", "board", "management change"]
mna_keywords = ["merger", "acquisition", "licensing", "deal", "partnership"]
prodcut_keywords = ["release", "launch", "commercialization", "approval"]

def detect_event(text, keywords):
    text_lower = text.lower()
    for kw in keywords:
        if kw.lower() in text_lower:
            return True
    return False

df_columnar = df_columnar.withColumn("clinical_event", udf(lambda x: detect_event(x, clinical_keywords), BooleanType())("clean_text")) \
                         .withColumn("regulatory_event", udf(lambda x: detect_event(x, regulatory_keywords), BooleanType())("clean_text")) \
                         .withColumn("patent_event", udf(lambda x: detect_event(x, patent_keywords), BooleanType())("clean_text")) \
                         .withColumn("executive_event", udf(lambda x: detect_event(x, executive_keywords), BooleanType())("clean_text")) \
                         .withColumn("mna_event", udf(lambda x: detect_event(x, mna_keywords), BooleanType())("clean_text"))\
                         .withColumn("product_event", udf(lambda x: detect_event(x, prodcut_keywords), BooleanType())("clean_text"))


In [90]:
analyzer = SentimentIntensityAnalyzer()

def vader_sentiment(text):
    if not text:
        return 0.0
    return float(analyzer.polarity_scores(text)["compound"])

# Convert to Spark UDF
vader_udf = udf(vader_sentiment, DoubleType())

# Apply UDF
df_columnar = df_columnar.withColumn("sentiment_score", vader_udf("clean_text"))


In [92]:
from pyspark.sql.functions import size, split
keywords = ["phase 1", "phase 2", "phase 3", "clinical trial", "efficacy", "trial results","FDA approval", "EMA approval", "rejection", "CRL", "complete response letter","patent", "intellectual property", "IP", "exclusivity","CEO", "CFO", "CBO", "board", "management change","new drug"]
def keyword_count(text):
    text_lower = text.lower()
    return sum(text_lower.count(kw.lower()) for kw in keywords)

df_columnar = df_columnar.withColumn(
    "keyword_count",
    udf(lambda x: keyword_count(x, clinical_keywords), "int")("clean_text")
)


In [93]:
df_columnar.printSchema()

root
 |-- date: date (nullable = true)
 |-- clean_text: string (nullable = true)
 |-- word_count: integer (nullable = true)
 |-- domain: string (nullable = true)
 |-- is_cosmo: boolean (nullable = true)
 |-- is_competitor: boolean (nullable = true)
 |-- clinical_event: boolean (nullable = true)
 |-- regulatory_event: boolean (nullable = true)
 |-- patent_event: boolean (nullable = true)
 |-- executive_event: boolean (nullable = true)
 |-- mna_event: boolean (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- keyword_count: integer (nullable = true)



In [106]:
df_daily = df_columnar.groupBy("date").agg(
    F.count("*").alias("num_articles"),
    F.sum(F.col("is_cosmo").cast("int")).alias("num_cosmo_articles"),
    F.sum(F.col("is_competitor").cast("int")).alias("num_competitor_articles"),
    F.sum(F.col("is_industry").cast("int")).alias("num_industry_articles"),
    F.avg("sentiment_score").alias("avg_sentiment"),
    F.sum(F.col("is_cosmo").cast("int")).alias("num_cosmo_articles"),
    F.sum(F.col("is_competitor").cast("int")).alias("num_competitor_articles"),
    F.sum(F.col("is_industry").cast("int")).alias("num_industry_articles"),
    F.max("sentiment_score").alias("max_sentiment"),
    F.min("sentiment_score").alias("min_sentiment"),
    F.sum(F.col("clinical_event").cast("int")).alias("num_clinical_events"),
    F.sum(F.col("regulatory_event").cast("int")).alias("num_regulatory_events"),
    F.sum(F.col("patent_event").cast("int")).alias("num_patent_events"),
    F.sum(F.col("mna_event").cast("int")).alias("num_mna_events"),
    F.sum(F.col("product_event").cast("int")).alias("num_product_events"),
    F.sum(F.when(F.col("sentiment_score") < 0, 1).otherwise(0)).alias("num_negative_events")
)

In [107]:
df_daily.printSchema()

root
 |-- date: date (nullable = true)
 |-- num_articles: long (nullable = false)
 |-- num_cosmo_articles: long (nullable = true)
 |-- num_competitor_articles: long (nullable = true)
 |-- num_industry_articles: long (nullable = true)
 |-- avg_sentiment: double (nullable = true)
 |-- num_cosmo_articles: long (nullable = true)
 |-- num_competitor_articles: long (nullable = true)
 |-- num_industry_articles: long (nullable = true)
 |-- max_sentiment: double (nullable = true)
 |-- min_sentiment: double (nullable = true)
 |-- num_clinical_events: long (nullable = true)
 |-- num_regulatory_events: long (nullable = true)
 |-- num_patent_events: long (nullable = true)
 |-- num_mna_events: long (nullable = true)
 |-- num_product_events: long (nullable = true)
 |-- num_negative_events: long (nullable = true)



In [108]:
df_daily.show(20, truncate=False)



+----------+------------+------------------+-----------------------+---------------------+-------------------+------------------+-----------------------+---------------------+-------------+-------------+-------------------+---------------------+-----------------+--------------+------------------+-------------------+
|date      |num_articles|num_cosmo_articles|num_competitor_articles|num_industry_articles|avg_sentiment      |num_cosmo_articles|num_competitor_articles|num_industry_articles|max_sentiment|min_sentiment|num_clinical_events|num_regulatory_events|num_patent_events|num_mna_events|num_product_events|num_negative_events|
+----------+------------+------------------+-----------------------+---------------------+-------------------+------------------+-----------------------+---------------------+-------------+-------------+-------------------+---------------------+-----------------+--------------+------------------+-------------------+
|2025-10-27|1           |0                 |1 

                                                                                

In [109]:
rows = df_daily.count()
cols = len(df_daily.columns)
print(f"DataFrame dimensions: {rows} rows x {cols} columns")

[Stage 14:>                                                         (0 + 1) / 1]

DataFrame dimensions: 85 rows x 17 columns


                                                                                

In [110]:
df_daily.select(
    F.min("date").alias("min_date"),
    F.max("date").alias("max_date")
).show()



+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2024-12-20|2025-12-11|
+----------+----------+



                                                                                