In [None]:
%pip install databricks-genai-inference langchain langchain_openai mlflow
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting databricks-genai-inference
  Using cached databricks_genai_inference-0.2.3-py3-none-any.whl (17 kB)
Collecting langchain
  Downloading langchain-0.1.17-py3-none-any.whl (867 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 867.6/867.6 kB 7.4 MB/s eta 0:00:00
Collecting langchain_openai
  Downloading langchain_openai-0.1.6-py3-none-any.whl (34 kB)
Collecting mlflow
  Downloading mlflow-2.12.1-py3-none-any.whl (20.2 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 20.2/20.2 MB 53.5 MB/s eta 0:00:00
Collecting tenacity==8.2.3
  Using cached tenacity-8.2.3-py3-none-any.whl (24 kB)
Collecting databricks-sdk==0.19.1
  Using cached databricks_sdk-0.19.1-py3-none-any.whl (447 kB)
Collecting typing-extensions>=4.7.1
  Using cached typing_extensions-4.11.0-py3-none-any.whl (34 kB)
Collecting httpx<1,>=0.23.0
  Using cached httpx-0.27.0-py3-none-any.whl (75 kB)
Collecting p

In [None]:
# load articles
result = spark.sql("SELECT * FROM hackathon_schema.articles")

df = result.toPandas()
print(df.head(5))

                                                 url  ... sentiment
0  https://www.msn.com/en-us/news/technology/tesl...  ...      None
1  https://www.wired.com/story/zhidou-rainbow-ev-...  ...      None
2  https://www.forbes.com/sites/brookecrothers/20...  ...      None
3  https://www.msn.com/en-us/autos/news/tesla-is-...  ...      None
4  https://www.msn.com/en-us/news/technology/tesl...  ...      None

[5 rows x 5 columns]


Extract topic sentiment from articles

In [None]:
df.columns

Index(['url', 'content', 'company_name', 'published_date', 'sentiment'], dtype='object')

In [None]:
import pandas as pd
from langchain.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field, validator
from langchain_openai import ChatOpenAI
from typing import Optional, Dict
import time
from langchain.chat_models import ChatDatabricks

model = ChatDatabricks(endpoint="databricks-dbrx-instruct")

# Define schema with topics
class TopicSentimentSchema(BaseModel):
    layoffs: Optional[float] = Field(description="Sentiment score for layoffs topic (-1 to 1 or null if not mentioned)")
    restructuring: Optional[float] = Field(description="Sentiment score for org restructuring topic (-1 to 1 or null if not mentioned)")
    board_changes: Optional[float] = Field(description="Sentiment score for board member departures or appointments topic (-1 to 1 or null if not mentioned)")
    mergers: Optional[float] = Field(description="Sentiment score for mergers or acquisitions topic (-1 to 1 or null if not mentioned)")
    investor_activity: Optional[float] = Field(description="Sentiment score for investor activity topic (-1 to 1 or null if not mentioned)")
    esg: Optional[float] = Field(description="Sentiment score for environmental, social, or governance issues (-1 to 1 or null if not mentioned)")
    revenue_growth: Optional[float] = Field(description="Sentiment score for revenue growth topic (-1 to 1 or null if not mentioned)")
    product_launches: Optional[float] = Field(description="Sentiment score for product launches topic (-1 to 1 or null if not mentioned)")
    expansion: Optional[float] = Field(description="Sentiment score for market expansion or contraction topic (-1 to 1 or null if not mentioned)")
    disputes: Optional[float] = Field(description="Sentiment score for legal disputes topic (-1 to 1 or null if not mentioned)")
    geo_political: Optional[float] = Field(description="Sentiment score for geo-political events topic (-1 to 1 or null if not mentioned)")
    macro_economic: Optional[float] = Field(description="Sentiment score for macro-economic events topic (-1 to 1 or null if not mentioned)")
    partnerships: Optional[float] = Field(description="Sentiment score for partnerships, contracts and deals topic (-1 to 1 or null if not mentioned)")
    cyber_security: Optional[float] = Field(description="Sentiment score for cyber security topic (-1 to 1 or null if not mentioned)")
    supply_chain: Optional[float] = Field(description="Sentiment score for supply chain topic (-1 to 1 or null if not mentioned)")
    labor_issues: Optional[float] = Field(description="Sentiment score for labor issues topic (-1 to 1 or null if not mentioned)")
    product_recalls: Optional[float] = Field(description="Sentiment score for product recalls topic (-1 to 1 or null if not mentioned)")
    overall_sentiment: Optional[float] = Field(description="Overall sentiment score for the article (-1 to 1 or null if not mentioned)")


def get_sentiment(schema, company, text):
    # And a query to prompt LLM to populate the data structure
    query = f"""
    Analyze the news about {company} and return sentiment values for provided topics.
    The sentiment should be defined based on whether it's good for a company and its shareholders (positive) or bad (negative).
    The values should be between -1 for most negative sentiment and 1 for most positive sentiment.
    0 for neutral sentiment. If a topic is not mentioned, the value should be null.
    """

    # Set up a parser + inject instructions into the prompt template.
    parser = PydanticOutputParser(pydantic_object=schema)

    prompt = PromptTemplate(
        template="Answer the user query.\n{format_instructions}\n{query}\n{text}\n",
        input_variables=["query", "text"],
        partial_variables={"format_instructions": parser.get_format_instructions()},
    )

    chain = prompt | model | parser
    response = chain.invoke({"query": query, "text": text})

    return response.json()


# iterate over rows
for index, row in df.iterrows():
    row_sentiment = get_sentiment(schema=TopicSentimentSchema, company=row['company_name'], text=row['content'])

    # dump all content of row sentiment to new column in df
    df.at[index, "sentiment"] = row_sentiment

print(df['sentiment'].head(5))

0    {"layoffs": null, "restructuring": null, "boar...
1    {"layoffs": null, "restructuring": 0.2, "board...
2    {"layoffs": null, "restructuring": null, "boar...
3    {"layoffs": null, "restructuring": null, "boar...
4    {"layoffs": null, "restructuring": null, "boar...
Name: sentiment, dtype: object


In [None]:
df['sentiment']

0     {"layoffs": null, "restructuring": null, "boar...
1     {"layoffs": null, "restructuring": 0.2, "board...
2     {"layoffs": null, "restructuring": null, "boar...
3     {"layoffs": null, "restructuring": null, "boar...
4     {"layoffs": null, "restructuring": null, "boar...
5     {"layoffs": null, "restructuring": null, "boar...
6     {"layoffs": -0.8, "restructuring": null, "boar...
7     {"layoffs": null, "restructuring": null, "boar...
8     {"layoffs": -0.8, "restructuring": null, "boar...
9     {"layoffs": null, "restructuring": 0.5, "board...
10    {"layoffs": null, "restructuring": null, "boar...
11    {"layoffs": -0.8, "restructuring": null, "boar...
12    {"layoffs": null, "restructuring": null, "boar...
13    {"layoffs": null, "restructuring": null, "boar...
14    {"layoffs": null, "restructuring": null, "boar...
15    {"layoffs": -0.8, "restructuring": null, "boar...
16    {"layoffs": -0.8, "restructuring": -0.7, "boar...
17    {"layoffs": null, "restructuring": null, "

In [None]:
# Add the sentiment column to the table
# alter_table_query = """
# ALTER TABLE hackathon_schema.articles
# ADD COLUMNS (sentiment STRING)
# """
# spark.sql(alter_table_query)

# verify
describe_table_query = "DESCRIBE hackathon_schema.articles"
result = spark.sql(describe_table_query)
result.show()

DataFrame[]

In [None]:
# update articles table with sentiment col
spark_df = spark.createDataFrame(df)
spark_df.createOrReplaceTempView("temp_articles")

# Run the merge query
merge_query = """
MERGE INTO hackathon_schema.articles AS target
USING temp_articles AS source
ON target.url = source.url
WHEN MATCHED THEN
  UPDATE SET
    target.sentiment = source.sentiment
"""
spark.sql(merge_query)

# Verify the update
result = spark.sql("SELECT * FROM hackathon_schema.articles")
result.show()

+--------------------+--------------------+------------+--------------+--------------------+
|                 url|             content|company_name|published_date|           sentiment|
+--------------------+--------------------+------------+--------------+--------------------+
|https://www.msn.c...|Tesla's 'apocalyp...|       Tesla|    2024-05-05|{"layoffs": null,...|
|https://www.wired...|As Elon Musk Aban...|       Tesla|    2024-05-05|{"layoffs": null,...|
|https://www.forbe...|Longer-Range Tesl...|       Tesla|    2024-05-05|{"layoffs": null,...|
|https://www.msn.c...|Tesla is facing m...|       Tesla|    2024-05-05|{"layoffs": null,...|
|https://www.msn.c...|Tesla's Optimus v...|       Tesla|    2024-05-05|{"layoffs": null,...|
|https://www.msn.c...|Tesla plans to ch...|       Tesla|    2024-05-05|{"layoffs": null,...|
|https://news.yaho...|Hyundai antes up ...|       Tesla|    2024-05-05|{"layoffs": -0.8,...|
|https://www.msn.c...|These are the kin...|       Tesla|    2024-05-05