In [202]:
import yfinance as yf
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, year, month, sum as spark_sum, avg as spark_avg

In [192]:
# Function Libs
def download_yfinance_data(ticker, start_date, end_date):
    data = yf.download(ticker, start=start_date, end=end_date).reset_index()
    data.columns = data.columns.droplevel(-1)
    data['Stock'] = ticker
    data['Source'] = 'yfinance'
    return data

def download_alpha_vantage_data(ticker, api_key):
    #url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={ticker}&apikey={api_key}&outputsize=full"
    url = "https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol=IBM&outputsize=full&apikey=demo"
    response = requests.get(url)
    
    data = pd.DataFrame.from_dict(response.json()['Time Series (Daily)'], orient= 'index').sort_index(axis=1)
    data.rename(columns={
        'timestamp': 'Date',
        '1. open': 'Open',
        '2. high': 'High',
        '3. low': 'Low',
        '4. close': 'Close',
        '5. adjusted close': 'Adj Close',
        '6. volume': 'Volume'
    }, inplace=True)
    data['Stock'] = "NVDA"
    data['Source'] = 'alpha vantage'
    data['Date'] = data.index
    data = data[['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'Stock', 'Source']].reset_index(drop=True)
    
    return data

In [215]:
# Configuration
from secret import credential
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

start_date = "2023-01-01"
end_date = "2024-01-01"
tickers = ['IBM']
alpha_vantage_api_key = credential.ALPHA_VANTAGE_API_KEY

### Data Preparation

In [194]:
# Download stock data from yfinance and alpha_vantage
yfinance_data = pd.concat([download_yfinance_data(ticker, start_date, end_date) for ticker in tickers])
alpha_vantage_data = pd.concat([download_alpha_vantage_data(ticker, alpha_vantage_api_key) for ticker in tickers])

[*********************100%***********************]  1 of 1 completed


In [195]:
alpha_vantage_data

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Stock,Source
0,2024-11-15,207.46,208.49,204.07,204.99,204.99,3986460,NVDA,alpha vantage
1,2024-11-14,210.0,210.4999,206.35,208.99,208.99,6372853,NVDA,alpha vantage
2,2024-11-13,209.5,211.41,209.0701,210.92,210.92,3247830,NVDA,alpha vantage
3,2024-11-12,211.9,213.03,209.06,210.86,210.86,2818216,NVDA,alpha vantage
4,2024-11-11,214.4,215.41,213.48,213.57,211.891827977227,3012987,NVDA,alpha vantage
...,...,...,...,...,...,...,...,...,...
6297,1999-11-05,92.75,92.94,90.19,90.25,45.7112254831793,13737600,NVDA,alpha vantage
6298,1999-11-04,94.44,94.44,90.0,91.56,46.3747346840985,16697600,NVDA,alpha vantage
6299,1999-11-03,95.87,95.94,93.5,94.37,47.7979872448491,10369100,NVDA,alpha vantage
6300,1999-11-02,96.75,96.81,93.69,94.81,48.0208452970662,11105400,NVDA,alpha vantage


In [196]:
yfinance_data

Price,Date,Adj Close,Close,High,Low,Open,Volume,Stock,Source
0,2023-01-03 00:00:00+00:00,130.147705,141.550003,141.899994,140.479996,141.100006,3338600,IBM,yfinance
1,2023-01-04 00:00:00+00:00,131.113129,142.600006,143.619995,141.369995,142.070007,3869200,IBM,yfinance
2,2023-01-05 00:00:00+00:00,129.743164,141.110001,142.500000,140.009995,142.440002,2866600,IBM,yfinance
3,2023-01-06 00:00:00+00:00,132.124512,143.699997,144.250000,141.580002,142.380005,3574000,IBM,yfinance
4,2023-01-09 00:00:00+00:00,131.986603,143.550003,145.470001,143.399994,144.080002,3987700,IBM,yfinance
...,...,...,...,...,...,...,...,...,...
245,2023-12-22 00:00:00+00:00,156.483139,162.139999,162.410004,161.000000,161.100006,2439800,IBM,yfinance
246,2023-12-26 00:00:00+00:00,157.515808,163.210007,163.309998,162.050003,162.229996,1772400,IBM,yfinance
247,2023-12-27 00:00:00+00:00,157.757095,163.460007,163.639999,162.679993,163.139999,3234600,IBM,yfinance
248,2023-12-28 00:00:00+00:00,158.036957,163.750000,163.960007,163.399994,163.960007,2071300,IBM,yfinance


In [197]:
# Load Data to PySpark
spark = SparkSession.builder.appName("Stock Data Preparation").getOrCreate()

yfinance_df = spark.createDataFrame(yfinance_data)
alpha_vantage_df = spark.createDataFrame(alpha_vantage_data)

In [207]:
yfinance_df.printSchema()
alpha_vantage_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Stock: string (nullable = true)
 |-- Source: string (nullable = true)

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Stock: string (nullable = true)
 |-- Source: string (nullable = true)



In [206]:
# Align the data type of the two PySpark Dataframes
yfinance_df = yfinance_df.withColumn("Date", to_date(col("Date"), "yyyyMMdd"))
alpha_vantage_df = alpha_vantage_df.withColumn("Date", to_date(col("Date"), "yyyyMMdd"))

double_cols = ['Open', 'High', 'Low', 'Close', 'Adj Close']
for double_col in double_cols:
    alpha_vantage_df = alpha_vantage_df.withColumn(double_col, col(double_col).cast("double"))

alpha_vantage_df = alpha_vantage_df.withColumn("Volume", col("Volume").cast("long"))

In [212]:
py_df = yfinance_df.union(alpha_vantage_df)

root
 |-- Date: date (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Stock: string (nullable = true)
 |-- Source: string (nullable = true)



In [213]:
py_df.show()

Py4JJavaError: An error occurred while calling o135.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 4.0 failed 1 times, most recent failure: Lost task 6.0 in stage 4.0 (TID 24) (DESKTOP-FA6QR84 executor driver): java.net.SocketException: Connection reset
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:271)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.net.SocketException: Connection reset
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:271)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [199]:
# Remove na and duplicate values
yfinance_df = yfinance_df.na.drop().dropDuplicates()
alpha_vantage_df = alpha_vantage_df.na.drop().dropDuplicates()

In [None]:
# Adjust date and create week/month/year columns
for df in [yfinance_df, alpha_vantage_df]:
    df = df.withColumn("Year", year(col("Date")))
    df = df.withColumn("Month", month(col("Date")))
    df = df.withColumn("Week", weekofyear(col("Date")))

# Step 8: Validate data between the two sources (simple join example)
joined_df = yfinance_df.join(alpha_vantage_df, ["Date", "Stock"], "inner")
joined_df.show(5)

# Step 9: Analysis - Add average/sum by group
result_df = yfinance_df.groupBy("Stock", "Year", "Month").agg(
    spark_avg("Close").alias("Avg_Close"),
    spark_sum("Volume").alias("Total_Volume")
)
result_df.show(5)

# Step 10: Spark SQL
# Register the DataFrame as a SQL temporary view
result_df.createOrReplaceTempView("stock_analysis")

# Define and execute an SQL query
query = """
SELECT Stock, Year, Month, Avg_Close, Total_Volume
FROM stock_analysis
WHERE Total_Volume > 1000000
ORDER BY Year, Month, Stock
"""
query_result = spark.sql(query)
query_result.show(5)

# Step 11: Write the output to Parquet
query_result.write.mode("overwrite").parquet("output/stock_analysis_result.parquet")

# Stop the Spark session
spark.stop()

### Data wangling
1. Study table info, head, schema
2. Check na and remove duplicates
3. Check abnormal values
4. Adjust Date, generate week/month/year
5. Convert data format
6. validate data from 2 sources

### Analysis
1. add average/sum by group

### Spark SQL
1. define schema
2. execute query
3. Write the output query to parquet

### Optional
Put the process to streaming, for real-time process