In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import json

# 1. Khởi tạo Spark
spark = SparkSession.builder.appName("Stock Analysis").getOrCreate()

# 2. Đọc CSV từ HDFS
df = spark.read.csv("hdfs://namenode:9000/datack/*.csv", header=True, inferSchema=True)
df = df.withColumn("Company", regexp_extract(input_file_name(), r"([A-Z]+)_", 1))
df = df.withColumn("Year", year(to_date(col("Date"))))

# 3. Phân tích 1: Giá TB theo năm
avg_year = df.groupBy("Year").agg(avg("Close").alias("avg")).orderBy("Year").collect()

# 4. Phân tích 2: Top 10 volatile
volatile = df.groupBy("Company").agg(stddev("Close").alias("vol")).orderBy(col("vol").desc()).limit(10).collect()

# 5. Phân tích 3: So sánh công ty lớn
major = df.filter(col("Company").isin(["AAPL","GOOG","AMZN","MSFT"])).groupBy("Company").agg(avg("Close").alias("avg")).collect()

# 6. Export JSON
results = {
    "avg_year": [{"year": int(r.Year), "avg": float(r.avg)} for r in avg_year],
    "volatile": [{"company": r.Company, "vol": float(r.vol)} for r in volatile],
    "major": [{"company": r.Company, "avg": float(r.avg)} for r in major]
}

with open('/home/jovyan/work/results.json', 'w') as f:
    json.dump(results, f)

print("✅ Done!")
spark.stop()