In [1]:
import yfinance as yf
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, round as spark_round
from pyspark.sql.window import Window
import os

os.environ["JAVA_HOME"] = r"C:\\Users\\Pupi\\AppData\\Local\\Programs\\Eclipse Adoptium\\jdk-8.0.452.9-hotspot"
os.environ["HADOOP_HOME"] = r"C:\\hadoop"
os.environ["SPARK_HOME"] = r"C:\\spark\\spark-3.3.2-bin-hadoop3"
os.environ["PYSPARK_PYTHON"] = r"C:\\Users\\Pupi\\Desktop\\Git project\\stock-trend-etl-spark\\venv\\Scripts\\python.exe"
os.environ["HADOOP_OPTS"] = r"-Djava.library.path=C:\hadoop\bin"
os.environ["PATH"] += r";C:\hadoop\bin"

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Test") \
    .master("local[*]") \
    .getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x0000021FAE2678E0>


In [3]:
# 銘柄設定
TICKERS = ["VOD.L", "BP.L", "HSBA.L"]
START_DATE = "2018-01-01"
END_DATE = "2024-12-31"

df = yf.download("VOD.L", start=START_DATE, end=END_DATE, auto_adjust=False)
df = df.reset_index()
display([column for column in df.columns])
display(df)

[*********************100%***********************]  1 of 1 completed


[('Date', ''),
 ('Adj Close', 'VOD.L'),
 ('Close', 'VOD.L'),
 ('High', 'VOD.L'),
 ('Low', 'VOD.L'),
 ('Open', 'VOD.L'),
 ('Volume', 'VOD.L')]

Price,Date,Adj Close,Close,High,Low,Open,Volume
Ticker,Unnamed: 1_level_1,VOD.L,VOD.L,VOD.L,VOD.L,VOD.L,VOD.L
0,2018-01-02,232.580841,233.800003,235.300003,233.100006,235.149994,28518111
1,2018-01-03,232.232651,233.449997,234.399994,232.050003,234.399994,36335155
2,2018-01-04,234.073013,235.300003,235.300003,233.000000,234.250000,65362950
3,2018-01-05,235.565216,236.800003,236.899994,234.699997,236.000000,69338309
4,2018-01-08,236.410751,237.649994,238.800003,236.250000,236.600006,58835938
...,...,...,...,...,...,...,...
1762,2024-12-20,66.480278,66.500000,66.940002,66.120003,66.839996,153387989
1763,2024-12-23,66.460289,66.480003,66.919998,66.279999,66.919998,63293094
1764,2024-12-24,67.739906,67.760002,67.940002,66.620003,66.800003,27798312
1765,2024-12-27,67.320030,67.339996,67.959999,66.800003,67.519997,60270857


In [4]:
# 対象銘柄（イギリス株）
TICKERS = ["VOD.L", "BP.L", "HSBA.L"]
START_DATE = "2018-01-01"
END_DATE = "2024-12-31"

def fetch_stock_data(ticker):
    df = yf.download(ticker, start=START_DATE, end=END_DATE, auto_adjust=False)
    df = df.reset_index()
    df.columns = [f"{col[0]}" for col in df.columns]
    df["Ticker"] = ticker
    return df[["Date", "Ticker", "Open", "High", "Low", "Close", "Adj Close", "Volume"]]


# 全銘柄のデータを結合
all_data = pd.concat([fetch_stock_data(ticker) for ticker in TICKERS])
print(all_data)

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed

           Date  Ticker        Open        High         Low       Close  \
0    2018-01-02   VOD.L  235.149994  235.300003  233.100006  233.800003   
1    2018-01-03   VOD.L  234.399994  234.399994  232.050003  233.449997   
2    2018-01-04   VOD.L  234.250000  235.300003  233.000000  235.300003   
3    2018-01-05   VOD.L  236.000000  236.899994  234.699997  236.800003   
4    2018-01-08   VOD.L  236.600006  238.800003  236.250000  237.649994   
...         ...     ...         ...         ...         ...         ...   
1762 2024-12-20  HSBA.L  764.400024  765.599976  749.599976  760.799988   
1763 2024-12-23  HSBA.L  767.400024  770.000000  761.599976  765.099976   
1764 2024-12-24  HSBA.L  768.900024  772.799988  767.099976  772.000000   
1765 2024-12-27  HSBA.L  773.799988  782.500000  771.500000  777.099976   
1766 2024-12-30  HSBA.L  775.900024  781.700012  773.500000  781.700012   

       Adj Close    Volume  
0     232.580841  28518111  
1     232.232651  36335155  
2     234.07




In [5]:
# pandas → Sparkへ変換
spark_df = spark.createDataFrame(all_data)

# 欠損除去
spark_df = spark_df.dropna()

# 日次リターン（%）を計算
window_spec = Window.partitionBy("Ticker").orderBy("Date")
spark_df = spark_df.withColumn("Prev_Close", lag("Close").over(window_spec))
spark_df = spark_df.withColumn(
    "Return(%)",
    spark_round((col("Close") - col("Prev_Close")) / col("Prev_Close") * 100, 2)
)


  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [6]:
print(os.getcwd())

c:\Users\Pupi\Desktop\Git project\stock-trend-etl-spark\spark_jobs


In [7]:
# 出力パス
# output_path = "../data/processed/stock_prices.parquet"
output_path = r"C:\Users\Pupi\Desktop\Git project\stock-trend-etl-spark\data\processed\stock_prices.parquet"

# 保存（Parquet形式）
spark_df.write.mode("overwrite").parquet(output_path)

print(f"✅ データ保存完了: {output_path}")

spark.stop()

✅ データ保存完了: C:\Users\Pupi\Desktop\Git project\stock-trend-etl-spark\data\processed\stock_prices.parquet
