In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PySpark Hello World サンプルコード
基本的なSparkアプリケーションの例
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

def main():
    # SparkSessionの作成
    print("PySpark Hello World アプリケーションを開始します...")
    
    spark = SparkSession.builder \
        .appName("PySparkHelloWorld") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()
    
    print(f"Spark バージョン: {spark.version}")
    print(f"アプリケーション名: {spark.conf.get('spark.app.name')}")
    
    # 1. 基本的なRDD操作の例
    print("\n=== RDD操作の例 ===")
    
    # 簡単なリストからRDDを作成
    data = ["Hello", "World", "from", "PySpark", "!"]
    rdd = spark.sparkContext.parallelize(data)
    
    print("元のデータ:")
    print(rdd.collect())
    
    # 文字数を数える
    word_lengths = rdd.map(lambda word: (word, len(word)))
    print("\n各単語の文字数:")
    print(word_lengths.collect())
    
    # 2. DataFrame操作の例
    print("\n=== DataFrame操作の例 ===")
    
    # サンプルデータの作成
    sample_data = [
        ("Alice", 25, "エンジニア"),
        ("Bob", 30, "デザイナー"),
        ("Charlie", 35, "マネージャー"),
        ("Diana", 28, "エンジニア"),
        ("Eve", 32, "デザイナー")
    ]
    
    # DataFrameの作成（英語のカラム名を使用）
    df = spark.createDataFrame(sample_data, ["name", "age", "job"])
    
    print("元のDataFrame:")
    df.show()
    
    # 基本的なフィルタリング
    print("\n30歳以上の従業員:")
    df.filter(col("age") >= 30).show()
    
    # グループ化と集計
    print("\n職種別の平均年齢:")
    df.groupBy("job").agg({"age": "avg"}).show()
    
    # 3. SQL操作の例
    print("\n=== SQL操作の例 ===")
    
    # DataFrameを一時ビューとして登録
    df.createOrReplaceTempView("employees")
    
    # SQLクエリの実行（英語のカラム名を使用）
    result = spark.sql("""
        SELECT job, 
               COUNT(*) as count,
               AVG(age) as avg_age
        FROM employees 
        GROUP BY job
        ORDER BY count DESC
    """)
    
    print("SQLクエリの結果:")
    result.show()
    
    # 4. 簡単な計算例
    print("\n=== 計算例 ===")
    
    # 1から10までの数字の二乗を計算
    numbers = spark.range(1, 11)
    squared = numbers.select(col("id"), (col("id") ** 2).alias("squared"))
    
    print("1から10までの数字とその二乗:")
    squared.show()
    
    # 合計の計算
    total_squared = squared.agg({"squared": "sum"}).collect()[0][0]
    print(f"\n二乗の合計: {total_squared}")
    
    print("\n=== PySpark Hello World アプリケーションが正常に完了しました！ ===")
    
    # SparkSessionの停止
    spark.stop()

if __name__ == "__main__":
    main() 

PySpark Hello World アプリケーションを開始します...


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/29 05:38:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark バージョン: 4.0.0
アプリケーション名: PySparkHelloWorld

=== RDD操作の例 ===
元のデータ:
['Hello', 'World', 'from', 'PySpark', '!']

各単語の文字数:


                                                                                

[('Hello', 5), ('World', 5), ('from', 4), ('PySpark', 7), ('!', 1)]

=== DataFrame操作の例 ===
元のDataFrame:
+-------+---+------------+
|   name|age|         job|
+-------+---+------------+
|  Alice| 25|  エンジニア|
|    Bob| 30|  デザイナー|
|Charlie| 35|マネージャー|
|  Diana| 28|  エンジニア|
|    Eve| 32|  デザイナー|
+-------+---+------------+


30歳以上の従業員:
+-------+---+------------+
|   name|age|         job|
+-------+---+------------+
|    Bob| 30|  デザイナー|
|Charlie| 35|マネージャー|
|    Eve| 32|  デザイナー|
+-------+---+------------+


職種別の平均年齢:
+------------+--------+
|         job|avg(age)|
+------------+--------+
|  エンジニア|    26.5|
|  デザイナー|    31.0|
|マネージャー|    35.0|
+------------+--------+


=== SQL操作の例 ===
SQLクエリの結果:
+------------+-----+-------+
|         job|count|avg_age|
+------------+-----+-------+
|  エンジニア|    2|   26.5|
|  デザイナー|    2|   31.0|
|マネージャー|    1|   35.0|
+------------+-----+-------+


=== 計算例 ===
1から10までの数字とその二乗:
+---+-------+
| id|squared|
+---+-------+
|  1|    1.0|
|  2|    4.0|
|  3|    9.0

In [2]:
spark = SparkSession.builder \
        .appName("ETL_TEST") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()

In [15]:
from pyspark.sql.window import Window
from pyspark.sql import functions as f
from pyspark.sql import SparkSession

df = spark.read.csv("./workspace/sampledata/input.csv", header=True, inferSchema=True)
df_new = df.orderBy("Date")

#df_new = df.withColumn("movingAverage", f.avg(df["Adj Close"]).over(Window.partitionBy("Code").orderBy("Date").rowsBetween(-5,0)))

df_new.show()




+----+------------+------+------+------+------+---------+----------+
|Code|        Date|  Open|  High|   Low| Close|Adj Close|    Volume|
+----+------------+------+------+------+------+---------+----------+
|AMZN| Apr 1, 2024|180.79| 183.0|179.95|180.97|   180.97|29,174,500|
|AMZN| Apr 1, 2025|187.86|193.93| 187.2|192.17|   192.17|41,267,300|
|AMZN|Apr 10, 2024|182.77|186.27|182.67|185.95|   185.95|35,879,200|
|AMZN|Apr 10, 2025|185.44|186.87|175.85|181.22|   181.22|68,302,000|
|AMZN|Apr 11, 2024|186.74|189.77|185.51|189.05|   189.05|40,020,700|
|AMZN|Apr 11, 2025|179.93|185.86| 178.0|184.87|   184.87|50,594,300|
|AMZN|Apr 12, 2024|187.72|188.38|185.08|186.13|   186.13|38,554,300|
|AMZN|Apr 15, 2024|187.43|188.69| 183.0|183.62|   183.62|48,052,400|
|AMZN|Apr 16, 2024|183.27|184.83|182.26|183.32|   183.32|32,891,300|
|AMZN|Apr 17, 2024|184.31|184.57|179.82|181.28|   181.28|31,359,700|
|AMZN|Apr 18, 2024|181.47|182.39|178.65|179.22|   179.22|30,723,800|
|AMZN|Apr 19, 2024|178.74| 179.0|1

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DateType
from datetime import datetime

def parse_date_string(date_str):
    """
    'Apr 26, 2024'のような文字列をDate型に変換する関数
    
    Args:
        date_str (str): 'MMM dd, yyyy'形式の日付文字列
        
    Returns:
        datetime.date: パースされた日付、パースできない場合はNone
    """
    if not date_str:
        return None
    
    try:
        # 月の略称を数字にマッピング
        month_map = {
            'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
            'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
        }
        
        # 正規表現でパース
        pattern = r'^([A-Za-z]{3})\s+(\d{1,2}),\s+(\d{4})$'
        match = re.match(pattern, date_str.strip())
        
        if match:
            month_str, day_str, year_str = match.groups()
            month = month_map.get(month_str)
            day = int(day_str)
            year = int(year_str)
            
            if month and 1 <= day <= 31 and 1900 <= year <= 2100:
                return datetime(year, month, day).date()
        
        return None
        
    except Exception:
        return None

# UDFとして登録
parse_date_udf = udf(parse_date_string, DateType())


In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col

# SparkSessionの作成
spark = SparkSession.builder \
    .appName("DateParseTest") \
    .getOrCreate()

# テスト用のデータ
test_dates = [
    "Apr 26, 2024",
    "Jan 1, 2025", 
    "Dec 31, 2023",
    "Mar 15, 2024",
    "Feb 29, 2024",  # うるう年
    "Invalid date",
    "Apr 32, 2024",  # 無効な日付
    ""
]

# テストデータのDataFrameを作成
test_df = spark.createDataFrame([(date,) for date in test_dates], ["date_string"])

# パース関数を適用
result_df = test_df.withColumn("parsed_date", parse_date_udf(col("date_string")))

print("=== 日付パースのテスト ===")
result_df.show(truncate=False)

# 実際のCSVファイルに適用
print("\n=== 実際のCSVファイルに適用 ===")

# CSVファイルを読み込み
df = spark.read.csv("./workspace/sampledata/input.csv", header=True, inferSchema=True)

# 日付をパース
df_with_parsed_date = df.withColumn("Date", parse_date_udf(col("Date")))

# パースされた日付でソート
df_sorted = df_with_parsed_date.orderBy("Date")

# 結果を表示
df_sorted.select("Code", "Date", "Close").show(10)

# データ型を確認
print("\n=== スキーマ ===")
df_sorted.printSchema()

=== 日付パースのテスト ===
+------------+-----------+
|date_string |parsed_date|
+------------+-----------+
|Apr 26, 2024|2024-04-26 |
|Jan 1, 2025 |2025-01-01 |
|Dec 31, 2023|2023-12-31 |
|Mar 15, 2024|2024-03-15 |
|Feb 29, 2024|2024-02-29 |
|Invalid date|NULL       |
|Apr 32, 2024|NULL       |
|            |NULL       |
+------------+-----------+


=== 実際のCSVファイルに適用 ===
+----+----------+------+
|Code|      Date| Close|
+----+----------+------+
|AMZN|2024-04-01|180.97|
|AMZN|2024-04-02|180.69|
|AMZN|2024-04-03|182.41|
|AMZN|2024-04-04| 180.0|
|AMZN|2024-04-05|185.07|
|AMZN|2024-04-08|185.19|
|AMZN|2024-04-09|185.67|
|AMZN|2024-04-10|185.95|
|AMZN|2024-04-11|189.05|
|AMZN|2024-04-12|186.13|
+----+----------+------+
only showing top 10 rows

=== スキーマ ===
root
 |-- Code: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nu

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f # avgを直接インポート
from pyspark.sql.types import DateType
from pyspark.sql.window import Window
from datetime import datetime
import re


def parse_date_string(date_str):
    """
    'Apr 26, 2024'のような文字列をDate型に変換する関数
    
    Args:
        date_str (str): 'MMM dd, yyyy'形式の日付文字列
        
    Returns:
        datetime.date: パースされた日付、パースできない場合はNone
    """
    if not date_str:
        return None
    
    try:
        # 月の略称を数字にマッピング
        month_map = {
            'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
            'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
        }
        
        # 正規表現でパース
        pattern = r'^([A-Za-z]{3})\s+(\d{1,2}),\s+(\d{4})$'
        match = re.match(pattern, date_str.strip())
        
        if match:
            month_str, day_str, year_str = match.groups()
            month = month_map.get(month_str)
            day = int(day_str)
            year = int(year_str)
            
            if month and 1 <= day <= 31 and 1900 <= year <= 2100:
                return datetime(year, month, day).date()
        
        return None
        
    except Exception:
        return None

# UDFとして登録
parse_date_udf = udf(parse_date_string, DateType())

# SparkSessionの作成
spark = SparkSession.builder \
    .appName("DateParseTest") \
    .getOrCreate()

# CSVファイルを読み込み
df = spark.read.csv("./workspace/sampledata/input.csv", header=True, inferSchema=True)

df_new = df.withColumn(
            "Date", parse_date_udf(col("Date"))).withColumn(
                "movingAverage", f.avg(df["Adj Close"]).over(
                    Window.partitionBy("Code").orderBy("Date").rowsBetween(-5,0)))
            

df_new.printSchema()

df_new.coalesce(1).write.option("header", "true").mode("overwrite").csv("./workspace/sampledata/output.csv")


root
 |-- Code: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: string (nullable = true)
 |-- movingAverage: double (nullable = true)



                                                                                