# NoteBook で PySpark を使用したデータ操作

 HDFS の「/clickstream_data」の CSV を読み込み、データフレームを作成する

In [3]:
results = spark.read\
            .option("inferSchema", "true") \
            .option("header", "true") \
            .csv('/clickstream_data') \
            .toDF(
            "wcs_click_date_sk", "wcs_click_time_sk", "wcs_sales_sk", "wcs_item_sk", "wcs_web_page_sk", "wcs_user_sk"
            )

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1558243736434_0002,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


実行結果のデータ等の情報を出力

In [4]:
print(type(results))

results.printSchema()
results.show()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- wcs_click_date_sk: integer (nullable = true)
 |-- wcs_click_time_sk: integer (nullable = true)
 |-- wcs_sales_sk: string (nullable = true)
 |-- wcs_item_sk: integer (nullable = true)
 |-- wcs_web_page_sk: integer (nullable = true)
 |-- wcs_user_sk: string (nullable = true)

+-----------------+-----------------+------------+-----------+---------------+-----------+
|wcs_click_date_sk|wcs_click_time_sk|wcs_sales_sk|wcs_item_sk|wcs_web_page_sk|wcs_user_sk|
+-----------------+-----------------+------------+-----------+---------------+-----------+
|            38569|             4250|        null|       7840|             18|       null|
|            38569|            85106|        null|      11130|             18|       null|
|            38569|            52655|        null|       3716|             18|       null|
|            38569|            70934|        null|      13243|             18|       null|
|            38569|            40166

Hive テーブル作成前の設定 / 情報確認

In [5]:
# SUCCESS ファイル保存の無効化
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") 

# Parquet ファイルが保存される、Warehouse ディレクトリを確認
print(spark.conf.get("spark.sql.warehouse.dir"))

hdfs:///user/hive/warehouse

実行結果を Parquet ファイルとして Hive テーブルに保存

In [6]:
results.write.format("parquet").mode("overwrite").saveAsTable("web_clickstreams")

```
kubectl exec -n mssql-cluster -it master-0 -c hadoop /bin/bash
hive -e "desc web_clickstreams"
```

Hive テーブルに対して、Spark SQL の実行

In [7]:
sqlDF = spark.sql("SELECT * FROM web_clickstreams LIMIT 10")
sqlDF.show()

+-----------------+-----------------+------------+-----------+---------------+-----------+
|wcs_click_date_sk|wcs_click_time_sk|wcs_sales_sk|wcs_item_sk|wcs_web_page_sk|wcs_user_sk|
+-----------------+-----------------+------------+-----------+---------------+-----------+
|            38569|             4250|        null|       7840|             18|       null|
|            38569|            85106|        null|      11130|             18|       null|
|            38569|            52655|        null|       3716|             18|       null|
|            38569|            70934|        null|      13243|             18|       null|
|            38569|            40166|        null|       5389|             18|       null|
|            38570|            73271|        null|       3331|             18|       null|
|            38570|            24651|        null|      10049|             18|       null|
|            38570|            23805|        null|        921|             18|       null|

In [8]:
sqlDF = spark.sql("SELECT wcs_click_date_sk, COUNT(*) AS cnt\
                     FROM web_clickstreams\
                   GROUP BY wcs_click_date_sk\
                   ORDER BY COUNT(*) DESC")
sqlDF.show()

+-----------------+---+
|wcs_click_date_sk|cnt|
+-----------------+---+
|            38577|108|
|            38587| 93|
|            38573| 93|
|            38580| 84|
|            38586| 81|
|            38581| 81|
|            38582| 81|
|            38584| 78|
|            38576| 78|
|            38589| 72|
|            38579| 72|
|            38585| 66|
|            38578| 66|
|            38583| 63|
|            38570| 63|
|            38575| 63|
|            38588| 63|
|            38574| 60|
|            38571| 42|
|            38572| 42|
+-----------------+---+
only showing top 20 rows

データフレームの情報を確認

In [9]:
sqlDF.dtypes

[('wcs_click_date_sk', 'int'), ('cnt', 'bigint')]

In [10]:
type(sqlDF)

<class 'pyspark.sql.dataframe.DataFrame'>

データフレームの内容を CSV として出力

In [11]:
sqlDF.repartition(1)\
    .write \
    .format("csv") \
    .option("header",'true') \
    .mode("overwrite") \
    .save("/output")