In [3]:
!pip3 install matplotlib
!pip3 install seaborn

[0mDefaulting to user installation because normal site-packages is not writeable
[33mDEPRECATION: Loading egg at /opt/bitnami/python/lib/python3.11/site-packages/pip-23.3.1-py3.11.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Discussion can be found at https://github.com/pypa/pip/issues/12330[0m[33m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, lag, lit
from pyspark.sql.window import Window
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime

# start spark session and setup minio configuration
spark = SparkSession.builder \
    .appName("MinIO Integration") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.host", "jupyter-notebook") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.1.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.maximum", "50") \
    .config("spark.hadoop.fs.s3a.threads.core", "20") \
    .config("spark.hadoop.fs.s3a.connection.timeout", "5000") \
    .config("spark.hadoop.fs.s3a.retry.limit", "10") \
    .config("spark.hadoop.fs.s3a.attempts.maximum", "10") \
    .config("spark.hadoop.fs.s3a.multipart.size", "104857600") \
    .config("spark.default.parallelism", "8") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "1") \
    .getOrCreate()


In [None]:

# Load the CSV files from MinIO for each company
df_AAPL = spark.read.csv('s3a://your-bucket/AAPL.csv', header=True, inferSchema=True)
df_AMZN = spark.read.csv('s3a://your-bucket/AMZN.csv', header=True, inferSchema=True)
df_GOOG = spark.read.csv('s3a://your-bucket/GOOG.csv', header=True, inferSchema=True)
df_MSFT = spark.read.csv('s3a://your-bucket/MSFT.csv', header=True, inferSchema=True)

# Define a function to calculate moving average and daily return using Spark
def calculate_metrics(df, company_name):
    # Convert 'Date' column to date type
    df = df.withColumn('Date', col('Date').cast('date'))
    
    # Calculate moving average (50-day)
    windowSpec = Window.orderBy("Date").rowsBetween(-49, 0)
    df = df.withColumn('50_MA', avg(col('Close')).over(windowSpec))
    
    # Calculate daily returns using lag function
    windowSpecLag = Window.orderBy("Date")
    df = df.withColumn('Prev_Close', lag(col('Close'), 1).over(windowSpecLag))
    df = df.withColumn('Daily_Return', (col('Close') - col('Prev_Close')) / col('Prev_Close'))
    
    # Add a 'Company' column
    df = df.withColumn('Company', lit(company_name))
    
    return df

# Apply the function to each company
df_AAPL_metrics = calculate_metrics(df_AAPL, 'AAPL')
df_AMZN_metrics = calculate_metrics(df_AMZN, 'AMZN')
df_GOOG_metrics = calculate_metrics(df_GOOG, 'GOOG')
df_MSFT_metrics = calculate_metrics(df_MSFT, 'MSFT')

# Combine all the data into one DataFrame
combined_df = df_AAPL_metrics.union(df_AMZN_metrics).union(df_GOOG_metrics).union(df_MSFT_metrics)

# Show combined data
combined_df.show()


In [23]:
from pyspark.sql.functions import col

# 為每個公司的 Daily_Return 列進行重命名，避免重複列名問題
df_AAPL_returns = df_AAPL_metrics.select('Date', col('Daily_Return').alias('AAPL_Daily_Return'))
df_AMZN_returns = df_AMZN_metrics.select('Date', col('Daily_Return').alias('AMZN_Daily_Return'))
df_GOOG_returns = df_GOOG_metrics.select('Date', col('Daily_Return').alias('GOOG_Daily_Return'))
df_MSFT_returns = df_MSFT_metrics.select('Date', col('Daily_Return').alias('MSFT_Daily_Return'))

# Join 所有 DataFrame，以對齊日期
df_joined = df_AAPL_returns.join(df_AMZN_returns, 'Date', 'inner') \
                           .join(df_GOOG_returns, 'Date', 'inner') \
                           .join(df_MSFT_returns, 'Date', 'inner')

# 計算各公司之間的相關性
correlation_AAPL_AMZN = df_joined.stat.corr('AAPL_Daily_Return', 'AMZN_Daily_Return')
correlation_AAPL_GOOG = df_joined.stat.corr('AAPL_Daily_Return', 'GOOG_Daily_Return')
correlation_AAPL_MSFT = df_joined.stat.corr('AAPL_Daily_Return', 'MSFT_Daily_Return')

correlation_AMZN_GOOG = df_joined.stat.corr('AMZN_Daily_Return', 'GOOG_Daily_Return')
correlation_AMZN_MSFT = df_joined.stat.corr('AMZN_Daily_Return', 'MSFT_Daily_Return')

correlation_GOOG_MSFT = df_joined.stat.corr('GOOG_Daily_Return', 'MSFT_Daily_Return')

# 將結果放到 Pandas DataFrame 中
correlation_data = {
    'AAPL': [1, correlation_AAPL_AMZN, correlation_AAPL_GOOG, correlation_AAPL_MSFT],
    'AMZN': [correlation_AAPL_AMZN, 1, correlation_AMZN_GOOG, correlation_AMZN_MSFT],
    'GOOG': [correlation_AAPL_GOOG, correlation_AMZN_GOOG, 1, correlation_GOOG_MSFT],
    'MSFT': [correlation_AAPL_MSFT, correlation_AMZN_MSFT, correlation_GOOG_MSFT, 1]
}

# 轉換為 Pandas DataFrame
correlation_df = pd.DataFrame(correlation_data, index=['AAPL', 'AMZN', 'GOOG', 'MSFT'])

# 使用 Seaborn 繪製相關性熱力圖
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 6))
sns.heatmap(correlation_df, annot=True, cmap='coolwarm', linewidths=0.5)
plt.title('Correlation Heatmap of Daily Returns')
plt.show()


2024-10-26 13:54:12.006404
2024-10-26 13:54:12.102443
declair_time_spend : 0:00:00.096039
total process data count: 187
total time spend after count : 0:00:00.172128


24/10/26 13:54:12 INFO FileSourceStrategy: Pushed Filters: 
24/10/26 13:54:12 INFO FileSourceStrategy: Post-Scan Filters: 
24/10/26 13:54:12 INFO MemoryStore: Block broadcast_114 stored as values in memory (estimated size 214.9 KiB, free 432.4 MiB)
24/10/26 13:54:12 INFO MemoryStore: Block broadcast_114_piece0 stored as bytes in memory (estimated size 35.6 KiB, free 432.4 MiB)
24/10/26 13:54:12 INFO BlockManagerInfo: Added broadcast_114_piece0 in memory on jupyter-notebook:34427 (size: 35.6 KiB, free: 434.1 MiB)
24/10/26 13:54:12 INFO SparkContext: Created broadcast 114 from count at NativeMethodAccessorImpl.java:0
24/10/26 13:54:12 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
24/10/26 13:54:12 INFO DAGScheduler: Registering RDD 277 (count at NativeMethodAccessorImpl.java:0) as input to shuffle 34
24/10/26 13:54:12 INFO DAGScheduler: Got map stage job 74 (count at NativeMethodAccessorImpl.java:0) wi

total process data count: 187
total process data count: 187
total process data count: 187


24/10/26 13:54:12 INFO FileSourceStrategy: Pushed Filters: 
24/10/26 13:54:12 INFO FileSourceStrategy: Post-Scan Filters: 
24/10/26 13:54:12 INFO MemoryStore: Block broadcast_123 stored as values in memory (estimated size 214.9 KiB, free 432.4 MiB)
24/10/26 13:54:12 INFO MemoryStore: Block broadcast_123_piece0 stored as bytes in memory (estimated size 35.6 KiB, free 432.4 MiB)
24/10/26 13:54:12 INFO BlockManagerInfo: Added broadcast_123_piece0 in memory on jupyter-notebook:34427 (size: 35.6 KiB, free: 434.1 MiB)
24/10/26 13:54:12 INFO SparkContext: Created broadcast 123 from count at <unknown>:0
24/10/26 13:54:12 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
24/10/26 13:54:12 INFO DAGScheduler: Registering RDD 298 (count at <unknown>:0) as input to shuffle 37
24/10/26 13:54:12 INFO DAGScheduler: Got map stage job 80 (count at <unknown>:0) with 1 output partitions
24/10/26 13:54:12 INFO DAGScheduler: 

total process data count: 187
total process data count: 187
total process data count: 187
total time spend after count : 0:00:00.611757


24/10/26 13:54:12 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 39 to 172.18.0.7:37740
24/10/26 13:54:12 INFO BlockManagerInfo: Removed broadcast_126_piece0 on jupyter-notebook:34427 in memory (size: 35.6 KiB, free: 434.1 MiB)
24/10/26 13:54:12 INFO BlockManagerInfo: Removed broadcast_126_piece0 on 172.18.0.7:33187 in memory (size: 35.6 KiB, free: 1048.5 MiB)
24/10/26 13:54:12 INFO TaskSetManager: Finished task 0.0 in stage 125.0 (TID 85) in 15 ms on 172.18.0.7 (executor 0) (1/1)
24/10/26 13:54:12 INFO TaskSchedulerImpl: Removed TaskSet 125.0, whose tasks have all completed, from pool 
24/10/26 13:54:12 INFO DAGScheduler: ResultStage 125 (count at <unknown>:0) finished in 0.022 s
24/10/26 13:54:12 INFO BlockManagerInfo: Removed broadcast_128_piece0 on jupyter-notebook:34427 in memory (size: 5.8 KiB, free: 434.1 MiB)
24/10/26 13:54:12 INFO DAGScheduler: Job 85 is finished. Cancelling potential speculative or zombie tasks for this job
24/10/26 13:54: