In [None]:
# =============================================================================
# PH√ÇN T√çCH D·ªÆ LI·ªÜU CH·ª®NG KHO√ÅN V·ªöI PYSPARK
# Stock Price Big Data Analysis
# =============================================================================

# %% [markdown]
# ## 1. Import th∆∞ vi·ªán v√† kh·ªüi t·∫°o Spark Session

# %%
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

# Thi·∫øt l·∫≠p style cho matplotlib
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

print("‚úÖ ƒê√£ import c√°c th∆∞ vi·ªán th√†nh c√¥ng")

# %%
# T·∫°o Spark Session k·∫øt n·ªëi v·ªõi Spark Master
spark = SparkSession.builder \
    .appName("Stock Price Big Data Analysis") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "8") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()

print("‚úÖ Spark Session ƒë√£ ƒë∆∞·ª£c kh·ªüi t·∫°o")
print(f"üìä Spark Version: {spark.version}")
print(f"üéØ Application ID: {spark.sparkContext.applicationId}")
print(f"üîó Master: {spark.sparkContext.master}")

# %% [markdown]
# ## 2. ƒê·ªçc d·ªØ li·ªáu t·ª´ HDFS

# %%
# ƒê·ªãnh nghƒ©a schema cho d·ªØ li·ªáu
stock_schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Open", DoubleType(), True),
    StructField("High", DoubleType(), True),
    StructField("Low", DoubleType(), True),
    StructField("Close", DoubleType(), True),
    StructField("Volume", LongType(), True)
])

# ƒê·ªçc t·∫•t c·∫£ file CSV t·ª´ HDFS
hdfs_path = "hdfs://namenode:9000/datack/*.csv"
print(f"üìÇ ƒê·ªçc d·ªØ li·ªáu t·ª´: {hdfs_path}")

df = spark.read \
    .option("header", "true") \
    .schema(stock_schema) \
    .csv(hdfs_path)

# Th√™m c·ªôt Symbol t·ª´ t√™n file
df = df.withColumn("filename", input_file_name())
df = df.withColumn("Symbol", regexp_extract(col("filename"), r"data-(\w+)_", 1))

# Chuy·ªÉn ƒë·ªïi Date sang timestamp
df = df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))

# Cache data ƒë·ªÉ tƒÉng t·ªëc ƒë·ªô x·ª≠ l√Ω
df.cache()

print(f"\n‚úÖ ƒê√£ ƒë·ªçc th√†nh c√¥ng {df.count():,} d√≤ng d·ªØ li·ªáu")
print(f"üìä S·ªë l∆∞·ª£ng c·ªï phi·∫øu: {df.select('Symbol').distinct().count()}")

# %%
# Xem c·∫•u tr√∫c d·ªØ li·ªáu
print("\nüìã C·∫§U TR√öC D·ªÆ LI·ªÜU:")
print("=" * 80)
df.printSchema()

print("\nüìã D·ªÆ LI·ªÜU M·∫™U:")
print("=" * 80)
df.show(10)

# %% [markdown]
# ## 3. Th·ªëng k√™ m√¥ t·∫£

# %%
print("\nüìä TH·ªêNG K√ä M√î T·∫¢")
print("=" * 80)
df.select("Open", "High", "Low", "Close", "Volume").describe().show()

# %%
# Ph·∫°m vi th·ªùi gian d·ªØ li·ªáu
date_range = df.agg(
    min("Date").alias("Start_Date"),
    max("Date").alias("End_Date")
).collect()[0]

print(f"\nüìÖ Ph·∫°m vi th·ªùi gian:")
print(f"   T·ª´: {date_range['Start_Date']}")
print(f"   ƒê·∫øn: {date_range['End_Date']}")

# %% [markdown]
# ## 4. Ph√¢n t√≠ch bi·∫øn ƒë·ªông gi√°

# %%
# T√≠nh to√°n c√°c ch·ªâ s·ªë k·ªπ thu·∫≠t
df_analysis = df.withColumn("Daily_Return", (col("Close") - col("Open")) / col("Open") * 100)
df_analysis = df_analysis.withColumn("Price_Range", col("High") - col("Low"))
df_analysis = df_analysis.withColumn("Volatility", (col("High") - col("Low")) / col("Open") * 100)

# T√≠nh Moving Average 7 ng√†y v√† 30 ng√†y
window_7 = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(-6, 0)
window_30 = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(-29, 0)

df_analysis = df_analysis.withColumn("MA_7", avg("Close").over(window_7))
df_analysis = df_analysis.withColumn("MA_30", avg("Close").over(window_30))

print("\n‚úÖ ƒê√£ t√≠nh to√°n xong c√°c ch·ªâ s·ªë k·ªπ thu·∫≠t")
print("\nüìä D·ªÆ LI·ªÜU SAU KHI PH√ÇN T√çCH:")
print("=" * 80)
df_analysis.select("Symbol", "Date", "Close", "Daily_Return", "MA_7", "MA_30").show(10)

# %%
# TOP 10 c·ªï phi·∫øu c√≥ gi√° ƒë√≥ng c·ª≠a cao nh·∫•t
print("\nüí∞ TOP 10 C·ªî PHI·∫æU GI√Å CAO NH·∫§T")
print("=" * 80)
top_expensive = df.groupBy("Symbol") \
    .agg(max("Close").alias("Max_Price")) \
    .orderBy(desc("Max_Price")) \
    .limit(10)
top_expensive.show()

# %%
# TOP 10 c·ªï phi·∫øu c√≥ kh·ªëi l∆∞·ª£ng giao d·ªãch cao nh·∫•t
print("\nüìà TOP 10 C·ªî PHI·∫æU KH·ªêI L∆Ø·ª¢NG GIAO D·ªäCH CAO NH·∫§T")
print("=" * 80)
top_volume = df.groupBy("Symbol") \
    .agg(sum("Volume").alias("Total_Volume")) \
    .orderBy(desc("Total_Volume")) \
    .limit(10)
top_volume.show()

# %% [markdown]
# ## 5. Ph√¢n t√≠ch xu h∆∞·ªõng th·ªã tr∆∞·ªùng

# %%
# T√≠nh t·ªïng gi√° tr·ªã giao d·ªãch theo nƒÉm
df_yearly = df.withColumn("Year", year("Date")) \
    .withColumn("Trade_Value", col("Close") * col("Volume")) \
    .groupBy("Year") \
    .agg(
        sum("Trade_Value").alias("Total_Trade_Value"),
        sum("Volume").alias("Total_Volume"),
        avg("Close").alias("Avg_Price")
    ) \
    .orderBy("Year")

print("\nüìä PH√ÇN T√çCH THEO NƒÇM")
print("=" * 80)
df_yearly.show()

# %%
# Tr·ª±c quan h√≥a xu h∆∞·ªõng theo nƒÉm
yearly_pd = df_yearly.toPandas()

fig, axes = plt.subplots(2, 2, figsize=(16, 10))
fig.suptitle('PH√ÇN T√çCH XU H∆Ø·ªöNG TH·ªä TR∆Ø·ªúNG THEO NƒÇM', fontsize=16, fontweight='bold')

# Gi√° tr·ªã giao d·ªãch
axes[0, 0].plot(yearly_pd['Year'], yearly_pd['Total_Trade_Value'], marker='o', linewidth=2)
axes[0, 0].set_title('T·ªïng Gi√° Tr·ªã Giao D·ªãch', fontsize=12, fontweight='bold')
axes[0, 0].set_xlabel('NƒÉm')
axes[0, 0].set_ylabel('Gi√° tr·ªã (USD)')
axes[0, 0].grid(True, alpha=0.3)

# Kh·ªëi l∆∞·ª£ng giao d·ªãch
axes[0, 1].plot(yearly_pd['Year'], yearly_pd['Total_Volume'], marker='s', color='green', linewidth=2)
axes[0, 1].set_title('T·ªïng Kh·ªëi L∆∞·ª£ng Giao D·ªãch', fontsize=12, fontweight='bold')
axes[0, 1].set_xlabel('NƒÉm')
axes[0, 1].set_ylabel('Kh·ªëi l∆∞·ª£ng')
axes[0, 1].grid(True, alpha=0.3)

# Gi√° trung b√¨nh
axes[1, 0].plot(yearly_pd['Year'], yearly_pd['Avg_Price'], marker='^', color='orange', linewidth=2)
axes[1, 0].set_title('Gi√° Trung B√¨nh', fontsize=12, fontweight='bold')
axes[1, 0].set_xlabel('NƒÉm')
axes[1, 0].set_ylabel('Gi√° (USD)')
axes[1, 0].grid(True, alpha=0.3)

# Bi·ªÉu ƒë·ªì t·ªïng h·ª£p
axes[1, 1].bar(yearly_pd['Year'], yearly_pd['Total_Trade_Value'], alpha=0.7)
axes[1, 1].set_title('Bi·ªÉu ƒê·ªì C·ªôt Gi√° Tr·ªã Giao D·ªãch', fontsize=12, fontweight='bold')
axes[1, 1].set_xlabel('NƒÉm')
axes[1, 1].set_ylabel('Gi√° tr·ªã (USD)')
axes[1, 1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

print("‚úÖ ƒê√£ v·∫Ω bi·ªÉu ƒë·ªì ph√¢n t√≠ch theo nƒÉm")

# %% [markdown]
# ## 6. Ph√¢n t√≠ch c·ªï phi·∫øu c·ª• th·ªÉ

# %%
# Ch·ªçn m·ªôt s·ªë c·ªï phi·∫øu n·ªïi ti·∫øng ƒë·ªÉ ph√¢n t√≠ch chi ti·∫øt
selected_stocks = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']

# L·ªçc d·ªØ li·ªáu
df_selected = df_analysis.filter(col("Symbol").isin(selected_stocks)) \
    .orderBy("Symbol", "Date")

print(f"\nüìå Ph√¢n t√≠ch {len(selected_stocks)} c·ªï phi·∫øu: {', '.join(selected_stocks)}")
print(f"üìä T·ªïng s·ªë d√≤ng d·ªØ li·ªáu: {df_selected.count():,}")

# %%
# Chuy·ªÉn sang Pandas ƒë·ªÉ v·∫Ω bi·ªÉu ƒë·ªì
df_selected_pd = df_selected.select("Symbol", "Date", "Close", "MA_7", "MA_30").toPandas()

# V·∫Ω bi·ªÉu ƒë·ªì gi√° c·ªï phi·∫øu
fig, axes = plt.subplots(len(selected_stocks), 1, figsize=(16, 4*len(selected_stocks)))
fig.suptitle('BI·ªÇU ƒê·ªí GI√Å C·ªî PHI·∫æU V√Ä MOVING AVERAGE', fontsize=16, fontweight='bold')

for idx, stock in enumerate(selected_stocks):
    stock_data = df_selected_pd[df_selected_pd['Symbol'] == stock].sort_values('Date')
    
    if len(stock_data) > 0:
        ax = axes[idx] if len(selected_stocks) > 1 else axes
        
        ax.plot(stock_data['Date'], stock_data['Close'], label='Close Price', linewidth=1.5, alpha=0.8)
        ax.plot(stock_data['Date'], stock_data['MA_7'], label='MA 7', linewidth=1, alpha=0.7)
        ax.plot(stock_data['Date'], stock_data['MA_30'], label='MA 30', linewidth=1, alpha=0.7)
        
        ax.set_title(f'{stock} - Gi√° ƒê√≥ng C·ª≠a v√† Moving Average', fontsize=12, fontweight='bold')
        ax.set_xlabel('Ng√†y')
        ax.set_ylabel('Gi√° (USD)')
        ax.legend()
        ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("‚úÖ ƒê√£ v·∫Ω bi·ªÉu ƒë·ªì gi√° c·ªï phi·∫øu")

# %% [markdown]
# ## 7. Ph√¢n t√≠ch t∆∞∆°ng quan

# %%
# T√≠nh ma tr·∫≠n t∆∞∆°ng quan gi·ªØa c√°c c·ªï phi·∫øu
# Pivot data ƒë·ªÉ c√≥ gi√° ƒë√≥ng c·ª≠a c·ªßa t·ª´ng c·ªï phi·∫øu theo ng√†y
df_pivot = df_selected.groupBy("Date").pivot("Symbol").agg(first("Close"))

# Chuy·ªÉn sang Pandas ƒë·ªÉ t√≠nh correlation
df_corr_pd = df_pivot.toPandas().set_index('Date')
correlation_matrix = df_corr_pd.corr()

print("\nüîó MA TR·∫¨N T∆Ø∆†NG QUAN GI·ªÆA C√ÅC C·ªî PHI·∫æU")
print("=" * 80)
print(correlation_matrix)

# %%
# V·∫Ω heatmap t∆∞∆°ng quan
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0,
            square=True, linewidths=1, cbar_kws={"shrink": 0.8})
plt.title('MA TR·∫¨N T∆Ø∆†NG QUAN GI·ªÆA C√ÅC C·ªî PHI·∫æU', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

print("‚úÖ ƒê√£ v·∫Ω heatmap t∆∞∆°ng quan")

# %% [markdown]
# ## 8. Ph√¢n t√≠ch r·ªßi ro v√† l·ª£i nhu·∫≠n

# %%
# T√≠nh to√°n ch·ªâ s·ªë r·ªßi ro v√† l·ª£i nhu·∫≠n
risk_return = df_analysis.groupBy("Symbol").agg(
    avg("Daily_Return").alias("Avg_Return"),
    stddev("Daily_Return").alias("Risk_StdDev"),
    min("Daily_Return").alias("Min_Return"),
    max("Daily_Return").alias("Max_Return")
).orderBy(desc("Avg_Return"))

print("\n‚öñÔ∏è PH√ÇN T√çCH R·ª¶I RO V√Ä L·ª¢I NHU·∫¨N")
print("=" * 80)
risk_return.show(20)

# %%
# Bi·ªÉu ƒë·ªì Risk-Return
risk_return_pd = risk_return.toPandas()

plt.figure(figsize=(12, 8))
plt.scatter(risk_return_pd['Risk_StdDev'], risk_return_pd['Avg_Return'], 
            s=100, alpha=0.6, c=range(len(risk_return_pd)), cmap='viridis')

# Th√™m label cho m·ªôt s·ªë ƒëi·ªÉm n·ªïi b·∫≠t
for idx in range(min(10, len(risk_return_pd))):
    plt.annotate(risk_return_pd.iloc[idx]['Symbol'], 
                (risk_return_pd.iloc[idx]['Risk_StdDev'], 
                 risk_return_pd.iloc[idx]['Avg_Return']),
                fontsize=8, alpha=0.7)

plt.xlabel('R·ªßi Ro (ƒê·ªô l·ªách chu·∫©n)', fontsize=12)
plt.ylabel('L·ª£i Nhu·∫≠n Trung B√¨nh (%)', fontsize=12)
plt.title('BI·ªÇU ƒê·ªí R·ª¶I RO - L·ª¢I NHU·∫¨N', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.axhline(y=0, color='r', linestyle='--', alpha=0.3)
plt.tight_layout()
plt.show()

print("‚úÖ ƒê√£ v·∫Ω bi·ªÉu ƒë·ªì r·ªßi ro-l·ª£i nhu·∫≠n")

# %% [markdown]
# ## 9. Ph√¢n t√≠ch kh·ªëi l∆∞·ª£ng giao d·ªãch

# %%
# Ph√¢n t√≠ch kh·ªëi l∆∞·ª£ng giao d·ªãch theo th√°ng
df_monthly = df.withColumn("Year", year("Date")) \
    .withColumn("Month", month("Date")) \
    .groupBy("Year", "Month") \
    .agg(
        sum("Volume").alias("Total_Volume"),
        avg("Close").alias("Avg_Price"),
        count("*").alias("Trading_Days")
    ) \
    .orderBy("Year", "Month")

print("\nüìä PH√ÇN T√çCH THEO TH√ÅNG")
print("=" * 80)
df_monthly.show(24)

# %% [markdown]
# ## 10. L∆∞u k·∫øt qu·∫£ ph√¢n t√≠ch

# %%
# L∆∞u k·∫øt qu·∫£ ph√¢n t√≠ch v·ªÅ HDFS
output_path = "hdfs://namenode:9000/analysis_results"

print("\nüíæ ƒêANG L∆ØU K·∫æT QU·∫¢ PH√ÇN T√çCH...")
print("=" * 80)

# L∆∞u d·ªØ li·ªáu ƒë√£ ph√¢n t√≠ch
df_analysis.write.mode("overwrite").parquet(f"{output_path}/stock_analysis")
print("‚úÖ ƒê√£ l∆∞u d·ªØ li·ªáu ph√¢n t√≠ch chi ti·∫øt")

# L∆∞u ph√¢n t√≠ch theo nƒÉm
df_yearly.write.mode("overwrite").parquet(f"{output_path}/yearly_analysis")
print("‚úÖ ƒê√£ l∆∞u ph√¢n t√≠ch theo nƒÉm")

# L∆∞u ph√¢n t√≠ch r·ªßi ro-l·ª£i nhu·∫≠n
risk_return.write.mode("overwrite").parquet(f"{output_path}/risk_return_analysis")
print("‚úÖ ƒê√£ l∆∞u ph√¢n t√≠ch r·ªßi ro-l·ª£i nhu·∫≠n")

print("\nüéâ HO√ÄN TH√ÄNH PH√ÇN T√çCH!")

# %% [markdown]
# ## 11. K·∫øt lu·∫≠n
# 
# ### T√≥m t·∫Øt k·∫øt qu·∫£:
# 
# 1. **H·ªá th·ªëng Big Data**: ƒê√£ tri·ªÉn khai th√†nh c√¥ng h·ªá th·ªëng ph√¢n t√≠ch d·ªØ li·ªáu l·ªõn v·ªõi Hadoop HDFS v√† Apache Spark
# 
# 2. **X·ª≠ l√Ω d·ªØ li·ªáu**: X·ª≠ l√Ω h√†ng tri·ªáu d√≤ng d·ªØ li·ªáu ch·ª©ng kho√°n t·ª´ nhi·ªÅu c√¥ng ty
# 
# 3. **Ph√¢n t√≠ch**: Th·ª±c hi·ªán ph√¢n t√≠ch xu h∆∞·ªõng, t∆∞∆°ng quan, r·ªßi ro v√† l·ª£i nhu·∫≠n
# 
# 4. **Tr·ª±c quan h√≥a**: T·∫°o c√°c bi·ªÉu ƒë·ªì tr·ª±c quan ƒë·ªÉ h·ªó tr·ª£ ra quy·∫øt ƒë·ªãnh ƒë·∫ßu t∆∞
# 
# ### ·ª®ng d·ª•ng th·ª±c t·∫ø:
# - Gi√∫p nh√† ƒë·∫ßu t∆∞ ƒë∆∞a ra quy·∫øt ƒë·ªãnh th√¥ng minh
# - Ph√°t hi·ªán xu h∆∞·ªõng v√† m·∫´u h√¨nh trong th·ªã tr∆∞·ªùng
# - ƒê√°nh gi√° r·ªßi ro v√† c∆° h·ªôi ƒë·∫ßu t∆∞
# - T·ªëi ∆∞u h√≥a danh m·ª•c ƒë·∫ßu t∆∞

# %%
# D·ª´ng Spark Session
spark.stop()
print("\n‚úÖ ƒê√£ d·ª´ng Spark Session")