# Spark 動作テスト

In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.config('spark.sql.session.timeZone', 'UTC').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
df = pd.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])
df

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


In [4]:
sdf = spark.createDataFrame(df)
type(sdf)

pyspark.sql.dataframe.DataFrame

In [5]:
sdf.show()

                                                                                

+---+---+-----+
|  a|  b|    c|
+---+---+-----+
|  1|100|  one|
|  2|200|  two|
|  3|300|three|
|  4|400| four|
|  5|500| five|
|  6|600|  six|
+---+---+-----+



In [6]:
sdf2 = sdf \
	.withColumn("d", sdf["b"] * 2) \
	.withColumn("f", F.substring("c", 0, 2))
sdf2.show()

+---+---+-----+----+---+
|  a|  b|    c|   d|  f|
+---+---+-----+----+---+
|  1|100|  one| 200| on|
|  2|200|  two| 400| tw|
|  3|300|three| 600| th|
|  4|400| four| 800| fo|
|  5|500| five|1000| fi|
|  6|600|  six|1200| si|
+---+---+-----+----+---+



In [7]:
# ファイル出力する際、内部的にパーティション分割されたデータのパーティション数にファイルが分かれて保存される
# coalesceを使うことでパーティション数を減らすことができる
# （ただし、そのための負荷はある。パーティションを減らした結果、1つのノードのメモリに収まらないデータ量になったらメモリエラー）
# repartitionでもパーティション数をコントロールできるが、前者は減らす専用に最適化された関数
sdf2.coalesce(1).write.parquet("test", mode="overwrite")

                                                                                

In [8]:
sdf2 = spark.read.parquet("test/*.parquet")
sdf2.show()

+---+---+-----+----+---+
|  a|  b|    c|   d|  f|
+---+---+-----+----+---+
|  1|100|  one| 200| on|
|  2|200|  two| 400| tw|
|  3|300|three| 600| th|
|  4|400| four| 800| fo|
|  5|500| five|1000| fi|
|  6|600|  six|1200| si|
+---+---+-----+----+---+



In [9]:
df = sdf2.toPandas()
df

Unnamed: 0,a,b,c,d,f
0,1,100,one,200,on
1,2,200,two,400,tw
2,3,300,three,600,th
3,4,400,four,800,fo
4,5,500,five,1000,fi
5,6,600,six,1200,si
