In [21]:
import requests
import json

url = 'https://www.alphavantage.co/query?function=NEWS_SENTIMENT&tickers=TSLA&time_from=20000101T0130&limit=10000&apikey=MLM2BRQZ9PL2JJ9J'
r = requests.get(url)
data = r.json()

file_path = '/home/raw_data.json'

with open(file_path, 'w') as file:
    json.dump(data, file)

print(f'Data saved to: {file_path}')

Data saved to: /home/raw_data.json


In [None]:
# Import necessary modules
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("GoogleColabSparkSession") \
    .getOrCreate()

# Check if the Spark session is created successfully
print(spark.version)

In [22]:
tesla_df = spark.read \
.format('json') \
.option('header','true') \
.load('/home/raw_data.json')

In [23]:
tesla_df.printSchema()

root
 |-- feed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- authors: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- banner_image: string (nullable = true)
 |    |    |-- category_within_source: string (nullable = true)
 |    |    |-- overall_sentiment_label: string (nullable = true)
 |    |    |-- overall_sentiment_score: double (nullable = true)
 |    |    |-- source: string (nullable = true)
 |    |    |-- source_domain: string (nullable = true)
 |    |    |-- summary: string (nullable = true)
 |    |    |-- ticker_sentiment: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- relevance_score: string (nullable = true)
 |    |    |    |    |-- ticker: string (nullable = true)
 |    |    |    |    |-- ticker_sentiment_label: string (nullable = true)
 |    |    |    |    |-- ticker_sentiment_score: string (nullable = true)
 |    |    |-- tim

In [24]:
from pyspark.sql.functions import col, explode

# Explode the 'feed' array column
exploded_df = tesla_df.select(
    col("items"),
    col("relevance_score_definition"),
    col("sentiment_score_definition"),
    explode("feed").alias("feed_exploded")
)

# Flatten the nested structure within the exploded 'feed' column
flattened_df = exploded_df.select(
    col("items"),
    col("relevance_score_definition"),
    col("sentiment_score_definition"),
    col("feed_exploded.authors"),
    col("feed_exploded.banner_image"),
    col("feed_exploded.category_within_source"),
    col("feed_exploded.overall_sentiment_label"),
    col("feed_exploded.overall_sentiment_score"),
    col("feed_exploded.source"),
    col("feed_exploded.source_domain"),
    col("feed_exploded.summary"),
    col("feed_exploded.ticker_sentiment"),
    col("feed_exploded.time_published"),
    col("feed_exploded.title"),
    col("feed_exploded.topics"),
    col("feed_exploded.url")
)

# Explode the 'ticker_sentiment' array column
exploded_ticker_sentiment_df = flattened_df.select(
    col("items"),
    col("relevance_score_definition"),
    col("sentiment_score_definition"),
    col("authors"),
    col("banner_image"),
    col("category_within_source"),
    col("overall_sentiment_label"),
    col("overall_sentiment_score"),
    col("source"),
    col("source_domain"),
    col("summary"),
    explode("ticker_sentiment").alias("ticker_sentiment_exploded"),
    col("time_published"),
    col("title"),
    col("topics"),
    col("url")
)

# Select individual elements from the exploded 'ticker_sentiment' array
final_ticker_sentiment_df = exploded_ticker_sentiment_df.select(
    col("items"),
    col("relevance_score_definition"),
    col("sentiment_score_definition"),
    col("authors"),
    col("banner_image"),
    col("category_within_source"),
    col("overall_sentiment_label"),
    col("overall_sentiment_score"),
    col("source"),
    col("source_domain"),
    col("summary"),
    col("ticker_sentiment_exploded.relevance_score").alias("ticker_relevance_score"),
    col("ticker_sentiment_exploded.ticker").alias("ticker"),
    col("ticker_sentiment_exploded.ticker_sentiment_label").alias("ticker_sentiment_label"),
    col("ticker_sentiment_exploded.ticker_sentiment_score").alias("ticker_sentiment_score"),
    col("time_published"),
    col("title"),
    col("topics"),
    col("url")
)

# Explode the 'topics' array column
exploded_topics_df = final_ticker_sentiment_df.select(
    col("items"),
    col("relevance_score_definition"),
    col("sentiment_score_definition"),
    col("authors"),
    col("banner_image"),
    col("category_within_source"),
    col("overall_sentiment_label"),
    col("overall_sentiment_score"),
    col("source"),
    col("source_domain"),
    col("summary"),
    col("ticker_relevance_score"),
    col("ticker"),
    col("ticker_sentiment_label"),
    col("ticker_sentiment_score"),
    col("time_published"),
    col("title"),
    explode("topics").alias("topics_exploded"),
    col("url")
)

# Select individual elements from the exploded 'topics' array
proc_df = exploded_topics_df.select(
    col("items"),
    col("relevance_score_definition"),
    col("sentiment_score_definition"),
    col("authors"),
    col("banner_image"),
    col("category_within_source"),
    col("overall_sentiment_label"),
    col("overall_sentiment_score"),
    col("source"),
    col("source_domain"),
    col("summary"),
    col("ticker_relevance_score"),
    col("ticker"),
    col("ticker_sentiment_label"),
    col("ticker_sentiment_score"),
    col("time_published"),
    col("title"),
    col("topics_exploded.relevance_score").alias("topic_relevance_score"),
    col("topics_exploded.topic").alias("topic"),
    col("url")
)

# Show the final DataFrame
proc_df.show(truncate=False)

+-----+------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+------------------+-------------------------------------------------------------------------------------------------------------------------------+----------------------+-----------------------+-----------------------+-----------------+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+------+----------------------+----------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+---------

In [25]:
from pyspark.sql.functions import concat_ws

# Concatenate array elements into a single string
tesladata_df = proc_df.withColumn("authors_str", concat_ws(",", col("authors")))

# Drop the original array column
tesladata_df = proc_df.drop("authors")

# Show the modified DataFrame
tesladata_df.show(truncate=False)

+-----+------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------+----------------------+-----------------------+-----------------------+-----------------+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+------+----------------------+----------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+------

In [26]:
tesladata_df.write.csv("/home/tsladataoutput.csv", header=True, mode="overwrite")