In [None]:
####NOTEBOOK FOR EXPERIMENT-3-ESG INVESTMENT ANALYSIS USING DATA ANALYTICS
As its internal company data , this is relatively small. Although our data set is relatively small, we show how one could distribute the scraping process using a user defined function (UDF), assuming the third-party library `PyPDF2` is available across your Spark environment.
import requests
import PyPDF2
import io

@udf('string')
def extract_content(url):

    # retrieve PDF binary stream
    response = requests.get(url)
    open_pdf_file = io.BytesIO(response.content)
    pdf = PyPDF2.PdfFileReader(open_pdf_file)

    # return concatenated content
    text = [pdf.getPage(i).extractText() for i in range(0, pdf.getNumPages())]
    return "\n".join(text)
Beyond regular expressions and fairly complex data cleansing  (reported in the attached notebooks), we also want to leverage more advanced NLP capabilities to tokenise content into grammatically valid sentences. Given the time it takes to load trained NLP pipelines in memory (such as the `spacy` library below), we ensure our model is loaded only once per Spark executor using a PandasUDF strategy as follows.
import gensim
import spacy
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('array', PandasUDFType.SCALAR_ITER)
def extract_statements(content_series_iter):

    # load spacy model for english only once
    spacy.cli.download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm")

    # provide process_text function with our loaded NLP model
    # clean and tokenize a batch of PDF content
    for content_series in content_series_iter:

    yield content_series.map(lambda x: process_text(nlp, x))

      The 4 banks data which will be fed to NLP and LDA Model and hyperparameters
We compute our term frequencies and capture our LDA model and hyperparameters using MLflow experiments tracking.
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation as LDA
import mlflow

# compute word frequencies
# stop words are common english words + banking related buzzwords
word_tf_vectorizer = CountVectorizer(stop_words=stop_words, ngram_range=(1,1))
word_tf = word_tf_vectorizer.fit_transform(esg['lemma'])

# track experiment on ml-flow
with mlflow.start_run(run_name='topic_modeling'):

    # Train a LDA model with 9 topics
    lda = LDA(random_state = 42, n_components = 9, learning_decay = .3)
    lda.fit(word_tf)

    # Log model
    mlflow.sklearn.log_model(lda, "model")
    mlflow.log_param('n_components', '9')
    mlflow.log_param('learning_decay', '.3')
    mlflow.log_metric('perplexity', lda.perplexity(word_tf))



Using seaborn visualisation, we can easily flag key differences across our companies (organisations' names redacted). When some organisations would put more focus on valuing employees and promoting diversity and inclusion (such as ORG-21), some seem to be more focused towards ethical investments (ORG-14). As the output of LDA is a probability distribution across our 9 topics instead of one specific theme, we easily unveil the most descriptive ESG initiative for any given organisation using a simple SQL statement and a partitioning function that captures the highest probability for each theme.
WITH ranked (
    SELECT
        e.topic,
        e.statement,
        e.company,
        dense_rank() OVER (
            PARTITION BY e.company, e.topic ORDER BY e.probability DESC
        ) as rank
    FROM esg_reports e
)

SELECT
    t.topic,
    t.statement
FROM ranked t
WHERE t.company = 'goldman sachs'
AND t.rank = 1
This SQL statement provides us with a NLP generated executive summary for Goldman Sachs (see original report), summarising a complex 70+ pages long document into 9 ESG initiatives / actions

we easily unveil the most descriptive ESG initiative for any given organisation using a simple SQL statement and a partitioning function that captures the highest probability for each theme.
Sample Snippets are given below.
WITH ranked (
    SELECT
        e.topic,
        e.statement,
        e.company,
        dense_rank() OVER (
            PARTITION BY e.company, e.topic ORDER BY e.probability DESC
        ) as rank
    FROM esg_reports e
)

SELECT
    t.topic,
    t.statement
FROM ranked t
WHERE t.company = 'goldman sachs'
AND t.rank = 1
This SQL statement provides us with a NLP generated executive summary for Goldman Sachs. summarising a complex 70+ pages long document into 9 ESG initiatives / actions.


{{{{ Sample Snippets are given below.
WITH ranked (
    SELECT
        e.topic,
        e.statement,
        e.company,
        dense_rank() OVER (
            PARTITION BY e.company, e.topic ORDER BY e.probability DESC
        ) as rank
    FROM esg_reports e
)

SELECT
    t.topic,
    t.statement
FROM ranked t
WHERE t.company = 'goldman sachs'
AND t.rank = 1
}}}}}

Using seaborn visualisation, we can easily flag key differences across our companies (organisations' names redacted). When some organisations would put more focus on valuing employees and promoting diversity and inclusion (such as ORG-21), some seem to be more focused towards ethical investments (ORG-14). As the output of LDA is a probability distribution across our 9 topics instead of one specific theme, we easily unveil the most descriptive ESG initiative for any given organisation using a simple SQL statement and a partitioning function that captures the highest probability for each theme.
WITH ranked (
    SELECT
        e.topic,
        e.statement,
        e.company,
        dense_rank() OVER (
            PARTITION BY e.company, e.topic ORDER BY e.probability DESC
        ) as rank
    FROM esg_reports e
)

SELECT
    t.topic,
    t.statement
FROM ranked t
WHERE t.company = 'goldman sachs'
AND t.rank = 1
This SQL statement provides us with a NLP generated executive summary for Goldman Sachs (see original report), summarising a complex 70+ pages long document into 9 ESG initiatives / actions

GDELT
Given the volume of data available in GDELT (100 million records for the last 18 months only), we leverage the lakehouse paradigm by moving data from raw, to filtered and enriched, respectively from Bronze, to Silver and Gold layers, and extend our process to operate in near real time (GDELT files are published every 15mn). For that purpose, we use a Structured Streaming approach that we `trigger` in batch mode with each batch operating on data increment only. By unifying Streaming and Batch, Spark is the de-facto standard for data manipulation and ETL processes in modern data lake infrastructures.
gdelt_stream_df = spark \
    .readStream \
    .format("delta") \
    .table("esg_gdelt_bronze") \
    .withColumn("themes", filter_themes(F.col("themes"))) \
    .withColumn("organisation", F.explode(F.col("organisations"))) \
    .select(
        F.col("publishDate"),
        F.col("organisation"),
        F.col("documentIdentifier").alias("url"),
        F.col("themes"),
        F.col("tone.tone")
    )

gdelt_stream_df \
    .writeStream \
    .trigger(Trigger.Once) \
    .option("checkpointLocation", "/tmp/gdelt_esg") \
    .format("delta") \
    .table("esg_gdelt_silver")
From the variety of dimensions available in GDELT, we only focus on sentiment analysis (using the tone variable) for financial news related articles only. We assume financial news articles to be well captured by the GDELT taxonomy starting with ECON_*. Furthermore, we assume all environmental articles to be captured as ENV_* and social articles to be captured by UNGP_* taxonomies (UN guiding principles on human rights


Given the volume of data available in GDELT (100 million records for the last 18 months only), we leverage the lakehouse paradigm by moving data from raw, to filtered and enriched, respectively from Bronze, to Silver and Gold layers, and extend our process to operate in near real time (GDELT files are published every 15mn). For that purpose, we use a Structured Streaming approach that we `trigger` in batch mode with each batch operating on data increment only. By unifying Streaming and Batch, Spark is the de-facto standard for data manipulation and ETL processes in modern data lake infrastructures.
gdelt_stream_df = spark \
    .readStream \
    .format("delta") \
    .table("esg_gdelt_bronze") \
    .withColumn("themes", filter_themes(F.col("themes"))) \
    .withColumn("organisation", F.explode(F.col("organisations"))) \
    .select(
        F.col("publishDate"),
        F.col("organisation"),
        F.col("documentIdentifier").alias("url"),
        F.col("themes"),
        F.col("tone.tone")
    )

gdelt_stream_df \
    .writeStream \
    .trigger(Trigger.Once) \
    .option("checkpointLocation", "/tmp/gdelt_esg") \
    .format("delta") \
    .table("esg_gdelt_silver")
From the variety of dimensions available in GDELT, we only focus on sentiment analysis (using the tone variable) for financial news related articles only. We assume financial news articles to be well captured by the GDELT taxonomy starting with ECON_*. Furthermore, we assume all environmental articles to be captured as ENV_* and social articles to be captured by UNGP_* taxonomies (UN guiding principles on human rights

data lake infrastructures.
gdelt_stream_df = spark \
    .readStream \
    .format("delta") \
    .table("esg_gdelt_bronze") \
    .withColumn("themes", filter_themes(F.col("themes"))) \
    .withColumn("organisation", F.explode(F.col("organisations"))) \
    .select(
        F.col("publishDate"),
        F.col("organisation"),
        F.col("documentIdentifier").alias("url"),
        F.col("themes"),
        F.col("tone.tone")
    )

gdelt_stream_df \
    .writeStream \
    .trigger(Trigger.Once) \
    .option("checkpointLocation", "/tmp/gdelt_esg") \
    .format("delta") \
    .table("esg_gdelt_silver")

Graphframes


Using seaborn visualisation, we can easily flag key differences across our companies (organisations' names redacted). When some organisations would put more focus on valuing employees and promoting diversity and inclusion (such as ORG-21), some seem to be more focused towards ethical investments (ORG-14). As the output of LDA is a probability distribution across our 9 topics instead of one specific theme, we easily unveil the most descriptive ESG initiative for any given organisation using a simple SQL statement and a partitioning function that captures the highest probability for each theme.
Sample Snippets are given below.
WITH ranked (
    SELECT
        e.topic,
        e.statement,
        e.company,
        dense_rank() OVER (
            PARTITION BY e.company, e.topic ORDER BY e.probability DESC
        ) as rank
    FROM esg_reports e
)

SELECT
    t.topic,
    t.statement
FROM ranked t
WHERE t.company = 'goldman sachs'
AND t.rank = 1
This SQL statement provides us with a NLP generated executive summary for Goldman Sachs. summarising a complex 70+ pages long document into 9 ESG initiatives / actions.

3.1 Data acquisition
Given the volume of data available in GDELT (100 million records for the last 18 months only), we leverage the lakehouse paradigm by moving data from raw, to filtered and enriched, respectively from Bronze, to Silver and Gold layers, and extend our process to operate in near real time (GDELT files are published every 15mn). For that purpose, we use a Structured Streaming approach that we `trigger` in batch mode with each batch operating on data increment only. By unifying Streaming and Batch, Spark is the de-facto standard for data manipulation and ETL processes in modern data lake infrastructures.
gdelt_stream_df = spark \
    .readStream \
    .format("delta") \
    .table("esg_gdelt_bronze") \
    .withColumn("themes", filter_themes(F.col("themes"))) \
    .withColumn("organisation", F.explode(F.col("organisations"))) \
    .select(
        F.col("publishDate"),
        F.col("organisation"),
        F.col("documentIdentifier").alias("url"),
        F.col("themes"),
        F.col("tone.tone")
    )

gdelt_stream_df \
    .writeStream \
    .trigger(Trigger.Once) \
    .option("checkpointLocation", "/tmp/gdelt_esg") \
    .format("delta") \
    .table("esg_gdelt_silver")
From the variety of dimensions available in GDELT, we only focus on sentiment analysis (using the tone variable) for financial news related articles only. We assume financial news articles to be well captured by the GDELT taxonomy starting with ECON_*. Furthermore, we assume all environmental articles to be captured as ENV_* and social articles to be captured by UNGP_* taxonomies {Citation:(UN guiding principles on human rights).}



3.1 Data acquisition
Given the volume of data available in GDELT (100 million records for the last 18 months only), we leverage the lakehouse paradigm by moving data from raw, to filtered and enriched, respectively from Bronze, to Silver and Gold layers, and extend our process to operate in near real time (GDELT files are published every 15mn). For that purpose, we use a Structured Streaming approach that we `trigger` in batch mode with each batch operating on data increment only. By unifying Streaming and Batch, Spark is the de-facto standard for data manipulation and ETL processes in modern data lake infrastructures.
gdelt_stream_df = spark \
    .readStream \
    .format("delta") \
    .table("esg_gdelt_bronze") \
    .withColumn("themes", filter_themes(F.col("themes"))) \
    .withColumn("organisation", F.explode(F.col("organisations"))) \
    .select(
        F.col("publishDate"),
        F.col("organisation"),
        F.col("documentIdentifier").alias("url"),
        F.col("themes"),
        F.col("tone.tone")
    )

gdelt_stream_df \
    .writeStream \
    .trigger(Trigger.Once) \
    .option("checkpointLocation", "/tmp/gdelt_esg") \
    .format("delta") \
    .table("esg_gdelt_silver")
From the variety of dimensions available in GDELT, we only focus on sentiment analysis (using the tone variable) for financial news related articles only. We assume financial news articles to be well captured by the GDELT taxonomy starting with ECON_*. Furthermore, we assume all environmental articles to be captured as ENV_* and social articles to be captured by UNGP_* taxonomies {Citation:(UN guiding principles on human rights).}
3.2.Sentiment analysis as proxy for ESG
Without any industry standard nor existing models to define environmental, social and governance metrics, and without any ground truth availability at the time of this study, its  assumed that the overall tone captured from financial news articles is a good proxy for companies' ESG scores.


Using Graphframes, [Citation:]we can easily create a network of companies sharing a common media coverage. Our assumption is that the more companies are mentioned together in news articles, the stronger their link will be (edge weight). Although this assumption may also infer wrong connections because of random co-occurrence in news articles, this undirected weighted graph will help us find companies' importance relative to our core FSIs we would like to assess.
Sample code frame:
val buildTuples = udf((organisations: Seq[String]) => {
    // as undirected, we create both IN and OUT connections
    organisations.flatMap(x1 => {
        organisations.map(x2 => {
        (x1, x2)
        })
    }).toSeq.filter({ case (x1, x2) =>
        x1 != x2 // remove self edges
    })
})

val edges = spark.read.table("esg_gdelt")
    .groupBy("url")
    .agg(collect_list(col("organisation")).as("organisations"))
    .withColumn("tuples", buildTuples(col("organisations")))
    .withColumn("tuple", explode(col("tuples")))
    .withColumn("src", col("tuple._1"))
    .withColumn("dst", col("tuple._2"))
    .groupBy("src", "dst")
    .count()

import org.graphframes.GraphFrame
val esgGraph = GraphFrame(nodes, edges)

The depth of a graph is the maximum of every shortest path possible, or the number of connections it takes for any random node to reach any others (the smaller the depth is, denser is our network).
Sample code frame:
val shortestPaths = esgGraph.shortestPaths.landmarks(landmarks).run()
val filterDepth = udf((distances: Map[String, Int]) => {
    distances.values.exists(_ )

We filter the  graph under consideration, to have a maximum depth of 4. This process reduces our graph further down to 2,300 businesses and 54,000 connections, allowing to run Page Rank algorithm more extensively with more iterations in order to better capture industry influence.

val prNodes = esgDenseGraph .parallelPersonalizedPageRank .maxIter(100) .sourceIds(landmarks) .run()

Fig:3: ESG Score variations with industry
Using Graphframes, [Citation:]we can easily create a network of companies sharing a common media coverage. Our assumption is that the more companies are mentioned together in news articles, the stronger their link will be (edge weight). Although this assumption may also infer wrong connections because of random co-occurrence in news articles, this undirected weighted graph will help us find companies' importance relative to our core FSIs we would like to assess.
Sample code frame:
val buildTuples = udf((organisations: Seq[String]) => {
    // as undirected, we create both IN and OUT connections
    organisations.flatMap(x1 => {
        organisations.map(x2 => {
        (x1, x2)
        })
    }).toSeq.filter({ case (x1, x2) =>
        x1 != x2 // remove self edges
    })
})

val edges = spark.read.table("esg_gdelt")
    .groupBy("url")
    .agg(collect_list(col("organisation")).as("organisations"))
    .withColumn("tuples", buildTuples(col("organisations")))
    .withColumn("tuple", explode(col("tuples")))
    .withColumn("src", col("tuple._1"))
    .withColumn("dst", col("tuple._2"))
    .groupBy("src", "dst")
    .count()

import org.graphframes.GraphFrame
val esgGraph = GraphFrame(nodes, edges)
By studying this graph further, we observe a power of law distribution of its edge weights: 90% of the connected businesses share a very few connections. We drastically reduce the graph size from 51,679,930 down to 61,143 connections by filtering edges for a weight of 200 or above (empirically led threshold). Prior to running Page Rank, we also optimise our graph by further reducing the number of connections through a Shortest Path[Citation] algorithm and compute the maximum number of hops a node needs to follow to reach any of our core FSIs vertices (captured in `landmarks` array). The depth of a graph is the maximum of every shortest path possible, or the number of connections it takes for any random node to reach any others (the smaller the depth is, denser is our network).
Sample code frame:
val shortestPaths = esgGraph.shortestPaths.landmarks(landmarks).run()
val filterDepth = udf((distances: Map[String, Int]) => {
    distances.values.exists(_ )
