In [1]:
# from pyspark.sql.functions import * とする場合もありますが、
# Fで関数の名前空間を明示した方がわかりやすくて好きです。
# ただ、FだとPEP8に違反していますが。。。
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, TimestampType, StringType
from pyspark.sql.window import Window
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import shutil

In [2]:
# spark initialization
spark = SparkSession.builder.appName("gamedata").getOrCreate()

In [3]:
df = spark.read.csv("../hard_weekly.csv", header=True)

df = df.select(
    F.col("begin_date").cast("date"),
    F.col("end_date").cast("date"),
    "hw",
    F.col("units").cast("int"))

In [5]:
df.printSchema()

root
 |-- begin_date: date (nullable = true)
 |-- end_date: date (nullable = true)
 |-- hw: string (nullable = true)
 |-- units: integer (nullable = true)



In [6]:
df.show()

+----------+----------+-------+------+
|begin_date|  end_date|     hw| units|
+----------+----------+-------+------+
|1998-10-12|1998-10-18|     PS| 20918|
|1998-10-12|1998-10-18|     GB| 10867|
|1998-10-12|1998-10-18|    N64|  2965|
|1998-10-12|1998-10-18| SATURN|   858|
|1998-10-19|1998-10-25|     GB|163017|
|1998-10-19|1998-10-25|     PS| 21151|
|1998-10-19|1998-10-25|    N64|  3813|
|1998-10-19|1998-10-25| SATURN|  1061|
|1998-10-26|1998-11-01|     GB| 71679|
|1998-10-26|1998-11-01|NeoGeoP| 21471|
|1998-10-26|1998-11-01|     PS| 20128|
|1998-10-26|1998-11-01|    N64|  3787|
|1998-10-26|1998-11-01| SATURN|  1027|
|1998-11-02|1998-11-08|     GB| 52586|
|1998-11-02|1998-11-08|     PS| 27171|
|1998-11-02|1998-11-08|    N64|  4962|
|1998-11-02|1998-11-08|NeoGeoP|  4688|
|1998-11-02|1998-11-08| SATURN|   824|
|1998-11-09|1998-11-15|     GB| 37727|
|1998-11-09|1998-11-15|     PS| 23575|
+----------+----------+-------+------+
only showing top 20 rows



In [8]:
shutil.rmtree("hard_weekly")
df.write.parquet("hard_weekly")

                                                                                

In [9]:
df2 = spark.read.parquet("hard_weekly")

In [10]:
df2.show()

+----------+----------+-------+------+
|begin_date|  end_date|     hw| units|
+----------+----------+-------+------+
|1998-10-12|1998-10-18|     PS| 20918|
|1998-10-12|1998-10-18|     GB| 10867|
|1998-10-12|1998-10-18|    N64|  2965|
|1998-10-12|1998-10-18| SATURN|   858|
|1998-10-19|1998-10-25|     GB|163017|
|1998-10-19|1998-10-25|     PS| 21151|
|1998-10-19|1998-10-25|    N64|  3813|
|1998-10-19|1998-10-25| SATURN|  1061|
|1998-10-26|1998-11-01|     GB| 71679|
|1998-10-26|1998-11-01|NeoGeoP| 21471|
|1998-10-26|1998-11-01|     PS| 20128|
|1998-10-26|1998-11-01|    N64|  3787|
|1998-10-26|1998-11-01| SATURN|  1027|
|1998-11-02|1998-11-08|     GB| 52586|
|1998-11-02|1998-11-08|     PS| 27171|
|1998-11-02|1998-11-08|    N64|  4962|
|1998-11-02|1998-11-08|NeoGeoP|  4688|
|1998-11-02|1998-11-08| SATURN|   824|
|1998-11-09|1998-11-15|     GB| 37727|
|1998-11-09|1998-11-15|     PS| 23575|
+----------+----------+-------+------+
only showing top 20 rows



In [16]:
hwinfo = spark.read.csv("../hard_info.csv", header=True)

hwinfo = hwinfo.select(
    "hw",
    F.col("launch_day").cast("date"),
    "maker",
    "full_name").sort("maker")

In [20]:
shutil.rmtree("hard_info")
hwinfo.write.parquet("hard_info")

In [22]:
hwinfo.printSchema()

root
 |-- hw: string (nullable = true)
 |-- launch_day: date (nullable = true)
 |-- maker: string (nullable = true)
 |-- full_name: string (nullable = true)

