# News Article Data Request

In [29]:
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
from pyspark.sql.types import TimestampType

from utils.s3_helper import S3Helper
from IPython.display import display

import os

### Set up and configure Spark

In [2]:
#user_bucket_name = os.environ['BQUANT_SANDBOX_USER_BUCKET']
#bqnt_username = os.environ['BQUANT_USERNAME']

def get_spark_session(
    executors="10",
    executor_memory="8g",
    driver_memory="32g",
    executor_cores="2",
    driver_max_result_size="1024M",
    executor_memory_overhead="2g",
    task_cpus="1",
):

    spark = (
        SparkSession.builder.config("spark.driver.memory", driver_memory)
        .config("spark.driver.maxResultSize", driver_max_result_size)
        .config("spark.executor.memoryOverhead", executor_memory_overhead)
        .config("spark.executor.instances", executors)
        .config("spark.executor.memory", executor_memory)
        .config("spark.executor.cores", executor_cores)
        .config("spark.task.cpus", task_cpus)
        .config("spark.sql.execution.arrow.enabled", "true")
        .config("spark.shuffle.file.buffer", "1m")
        .config("spark.file.transferTo", "False")
        .config("spark.shuffle.unsafe.file.output.buffer", "1m")
        .config("spark.io.compression.lz4.blockSize", "512k")
        .config("spark.shuffle.service.index.cache.size", "1g")
        .config("spark.shuffle.registration.timeout", "120000ms")
        .config("spark.shuffle.registration.maxAttempts", "3")
        .config("spark.sql.windowExec.buffer.spill.threshold", "1000000")
        .config("spark.sql.windowExec.buffer.in.memory.threshold", "1000000")
        .getOrCreate()
    )

    display(spark)

    return spark

In [3]:
spark = get_spark_session(executors="100", executor_memory="8g", executor_cores="2")

### Request News Headlines datasets 

In [4]:
%%time
bucket_name = "bquant-data-textual-analytics-tier-1"
bucket = boto3.resource("s3").Bucket(bucket_name)
files = [file.key for file in bucket.objects.all()]

files_csv = [
    f"s3://{bucket_name}/{file}"
    for file in files
    if "EID80001" in file and "csv" in file
]

df = (
    spark.read.option("header", "true")
    .option("multiLine", "true")
    .option("escape", "")
    .csv(files_csv)
)

CPU times: user 2min 28s, sys: 502 ms, total: 2min 29s
Wall time: 6min 42s


In [5]:
index_members = ['BBG000B9XRY4', 'BBG000BBJQV0', 'BBG000BBS2Y0', 'BBG000BCQZS4',
       'BBG000BCSST7', 'BBG000BF0K17', 'BBG000BH4R78', 'BBG000BJ81C1',
       'BBG000BKZB36', 'BBG000BLNNH6', 'BBG000BMHYD1', 'BBG000BMX289',
       'BBG000BN2DC2', 'BBG000BNSZP1', 'BBG000BP52R2', 'BBG000BPD168',
       'BBG000BPH459', 'BBG000BR2B91', 'BBG000BR2TH3', 'BBG000BSXQV7',
       'BBG000BVPV84', 'BBG000BW8S60', 'BBG000BWLMJ4', 'BBG000BWXBC2',
       'BBG000C0G1D1', 'BBG000C3J3C9', 'BBG000C5HS04', 'BBG000C6CFJ5',
       'BBG000CH5208', 'BBG000DMBXR2', 'BBG000GZQ728', 'BBG000H556T9',
       'BBG000HS77T5', 'BBG000K4ND22', 'BBG000PSKYX7', 'BBG00BN96922']

start_date = '2019-01-01'

In [19]:
%%time
from functools import reduce

# Filter for just BBG news or include all news articles in the analysis.
wire_filter = (F.col("WireName") == "BN") | (F.col("WireName") == "BFW")

filters = (
    # topic_filter
    wire_filter
    & (F.col("LanguageString") == "ENGLISH")
    & (F.length(F.col("Headline")) > 25)
    & (F.col("TimeOfArrival") >= start_date)
    & (F.col("Assigned_ID_BB_GLOBAL").isin(index_members))
    #& (F.col("Headline").startswith("*"))
)

df = df.withColumn("TimeOfArrival", F.col("TimeOfArrival").cast(TimestampType()))
df1 = df.filter(filters)

df1 = df1.cache()
df1.count()

CPU times: user 24.3 ms, sys: 85 µs, total: 24.4 ms
Wall time: 25.7 s


968397

### Drop Duplicates

In [20]:
window = Window.partitionBy("SUID").orderBy(F.col("TimeOfArrival").asc())

df2 = (
    df1.withColumn("row", F.row_number().over(window))
    .filter(F.col("row") == 1)
    .drop("row")
)

df2 = df2.withColumn("day", F.to_date(F.col("TimeOfArrival")))
window = Window.partitionBy("day", "Headline").orderBy(F.col("TimeOfArrival").asc())
df2 = (
    df2.withColumn("row", F.row_number().over(window))
    .filter(F.col("row") == 1)
    .drop("row", "day")
)

df2 = df2.cache()
df2.count()

104160

In [23]:
def sort_headlines(df2):
    pdf = (
        df2.select(
            "SUID", "Headline", "TimeOfArrival", "Assigned_ID_BB_GLOBAL"
        )
        .toPandas()
        .sort_values(by="TimeOfArrival")
        .reset_index(drop=True)
        .copy()

    )
    pdf["Headline"] = pdf["Headline"].str.lower()

    return pdf

headlines = sort_headlines(df2)

In [24]:

headlines['Headline'][0]

'*jpmorgan\xa0rehires ling zhang from bgi genomics'

### Output Headlines to S3

In [30]:
s3_helper = S3Helper('tmp/fs')

In [32]:
os.mkdir('/tmp/headlines')
headlines.to_parquet('/tmp/headlines/dow_headlines.parquet')

In [35]:
s3_helper.add_file(local_filename='/tmp/headlines/dow_headlines.parquet',s3_folder='news')

### Stop Spark

In [36]:
spark.stop()

In [25]:
import pandas as pd
import plotly.express as px

In [28]:
headlines

Unnamed: 0,SUID,Headline,TimeOfArrival,Assigned_ID_BB_GLOBAL
0,PKP0NPDWRGG0,*jpmorgan rehires ling zhang from bgi genomics,2019-01-02 07:26:13.641,BBG000DMBXR2
1,PKP0NQDWRGG0,*jpmorgan names zhang china healthcare investment banking head,2019-01-02 07:26:14.600,BBG000DMBXR2
2,PKP0PY6KLVR4,apple remains core tech holding in ‘risk-off’ environment: rbc,2019-01-02 07:27:34.229,BBG000B9XRY4
3,PKP4IB6K50XT,refinery outages: exxon beaumont; pes philadelphia; valero mckee,2019-01-02 08:49:23.087,BBG000GZQ728
4,PKP72M6K50XU,taiwan walks tightrope between china and not china: quicktake,2019-01-02 09:44:46.220,BBG000B9XRY4
...,...,...,...,...
104155,SUWXD7DWX2PS,nvidia ceo jensen huang meets mayor of shanghai friday,2025-04-18 12:11:40.996,BBG000BBJQV0
104156,SV1SUNGFWR28,salesforce inc raised to neutral at guggenheim,2025-04-21 03:11:11.105,BBG000BN2DC2
104157,SV22ZKD2677K,*nvidia's huang meeting japanese prime minister shigeru ishiba,2025-04-21 06:50:08.710,BBG000BBJQV0
104158,SV243ONQFBWG,"*nvidia ceo discussed ai robotics, ai energy needs with japan pm",2025-04-21 07:14:12.151,BBG000BBJQV0


In [27]:
pd.set_option('display.max_colwidth', 350)