In [27]:
q = """    with r1 as (
            SELECT * from release 
             WHERE wire_id > 0 and wire_id NOT IN (25,2345,96,3543,584,474,1719,3447,586,587,97,2640)
                AND CAST(toa AS DATE) > CAST('2023-01-01' AS DATE)
    )
        SELECT 
            S.suid AS story_suid,
            R.suid AS release_suid,
            R.bundle_id AS release_bundle_id,
            S.release_id AS release_id,
            ST.name AS story_type,
            RT.name AS release_type,
            S.toa AS story_toa,
            R.toa AS release_toa,
            S.headline AS story_headline,
            S.wire AS story_wire,
            S.wire_class AS story_wire_class,
            R.wire_id AS release_wire,
            R.wire_class AS release_wire_class,
            R.web_url AS release_web_url,
            R.subject AS release_subject, 
            R.body AS release_body
            FROM story S
            JOIN r1 R ON S.release_id = R.id
            JOIN release_type RT ON R.release_type = RT.release_type
            JOIN story_type ST ON S.story_type = ST.story_type
            WHERE S.language = 'en'
            AND R.language = 'en'""".replace('\n', ' ')

In [28]:
import re 

In [30]:
re.sub(' +', ' ', q).strip()

"with r1 as ( SELECT * from release WHERE wire_id > 0 and wire_id NOT IN (25,2345,96,3543,584,474,1719,3447,586,587,97,2640) AND CAST(toa AS DATE) > CAST('2023-01-01' AS DATE) ) SELECT S.suid AS story_suid, R.suid AS release_suid, R.bundle_id AS release_bundle_id, S.release_id AS release_id, ST.name AS story_type, RT.name AS release_type, S.toa AS story_toa, R.toa AS release_toa, S.headline AS story_headline, S.wire AS story_wire, S.wire_class AS story_wire_class, R.wire_id AS release_wire, R.wire_class AS release_wire_class, R.web_url AS release_web_url, R.subject AS release_subject, R.body AS release_body FROM story S JOIN r1 R ON S.release_id = R.id JOIN release_type RT ON R.release_type = RT.release_type JOIN story_type ST ON S.story_type = ST.story_type WHERE S.language = 'en' AND R.language = 'en'"

In [1]:
from bloomberg.ai.librarian import Librarian, get_config
config = get_config()
config.spark.properties["spark.executor.instances"] = 20
config.spark.properties["spark.executor.memory"] = "4G"
config.spark.properties["spark.driver.memory"] = "6G"

librarian = Librarian(cfg=config)
librarian.spark_session

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/layers/com.bloomberg.ds.buildpacks.spark/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/layers/com.bloomberg.ds.buildpacks.hadoop/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]


:: loading settings :: file = /layers/com.bloomberg.ds.buildpacks.spark/spark/conf/ivysettings.xml


23/06/26 21:07:09 INFO kns= appId= execId= DelegationTokenUtil: Logging in user from delegation token
23/06/26 21:07:09 INFO kns= appId= execId= DelegationTokenUtil: hadoop token File found
23/06/26 21:07:09 INFO kns= appId= execId= DelegationTokenUtil: Loaded 2 tokens
Setting spark.hadoop.yarn.resourcemanager.principal to job
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
wire_names = 'GO1, CO1, CO5, CO6, GO6, GO5, CO2, CO3, GO3, GO8, CO4, CO8, CO7, GO2, GO9, GO7, GO4'
wire_names += ', EDG, GO1, PRN, BUS, CO5, PZM, RNS, ONE, DBF, CIS'
wire_names = list(set(wire_names.split(', ')))

In [11]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import lit
df = (
    librarian
        .datasets
        .newsarchive
        .prod()
        .pvf_newsldap_4()
        .pvf_newsldap_6()
        .begin(2021, 1, 1)
        .end(2023, 4, 30)
    .fetch()
)

                                                                                

In [13]:
df = (
    df
        .filter(df.wire_mnemonic.isin(wire_names))
        .select(['suid', 'wire_mnemonic', 'wire', 'headline','bloom_lang', 'timeofarrival', 'bunch_id'])
)
df = df.withColumn('body', lit(''))
df.head()

                                                                                

Row(suid='QMKJ31MB2SJR', wire_mnemonic='GO1', wire=1883, headline='FBO:Veteran Aff: Renovate Building 213 – Tech Training Dorm', bloom_lang=1, timeofarrival='2021-01-07T14:33:01.332', bunch_id=None, body='')

In [17]:
df.count()

                                                                                

12467613

In [18]:
english_df = df.filter("`bloom_lang` == '1'")
bunch_ids_with_at_least_two_rows = (
    english_df
        .groupBy("bunch_id")
        .count()
        .filter("`count` >= 2")
        .select(["bunch_id"])
)

In [19]:
bunch_ids_with_at_least_two_rows = bunch_ids_with_at_least_two_rows.rdd.map(lambda x :  x[0]).collect()
print(f"len(bunch_ids_with_at_least_two_rows) = {len(bunch_ids_with_at_least_two_rows)}")



len(bunch_ids_with_at_least_two_rows) = 93


                                                                                

In [20]:
df_filted_by_bunch_id = (
    df.filter(df.bunch_id.isin(bunch_ids_with_at_least_two_rows))
        .sort('bunch_id')
        .select(['suid', 'wire_mnemonic', 'wire', 'headline','body', 'bloom_lang', 'timeofarrival', 'bunch_id'])
)

In [23]:
df_filted_by_bunch_id.count()

                                                                                

192

In [24]:
df_filted_by_bunch_id.show()



+------------+-------------+----+--------------------+----+----------+--------------------+------------+
|        suid|wire_mnemonic|wire|            headline|body|bloom_lang|       timeofarrival|    bunch_id|
+------------+-------------+----+--------------------+----+----------+--------------------+------------+
|QMJGLKMB2SJQ|          GO1|1883|FDA: Abbreviated ...|    |         1|2021-01-07T00:41:...|QMJGLKMB2SJQ|
|QMJGLKMB2SJQ|          GO1|1883|FDA: Abbreviated ...|    |         1|2021-01-07T00:41:...|QMJGLKMB2SJQ|
|QMJU2BMB2SJW|          GO6|1884|India Res Bank: U...|    |         1|2021-01-07T05:32:...|QMJU2BMB2SJW|
|QMJU2BMB2SJW|          GO6|1884|India Res Bank: U...|    |         1|2021-01-07T05:32:...|QMJU2BMB2SJW|
|QMJY46MB2SJU|          GO5|1886|Germany Stats: Pr...|    |         1|2021-01-07T07:00:...|QMJY46MB2SJU|
|QMJY46MB2SJU|          GO5|1886|Germany Stats: Pr...|    |         1|2021-01-07T07:00:...|QMJY46MB2SJU|
|QMK0XCMB2SJZ|          GO5|1886|Stats Austria: Wh...| 

                                                                                

In [None]:
final_df = df_filted_by_bunch_id.withColumn("row_number",F.row_number().over(Window.partitionBy(df_filted_by_bunch_id.headline).orderBy(df_filted_by_bunch_id.headline.desc()))).filter(F.col("row_number")==1).drop("row_number").sort("bunch_id")