ให้อ่านไฟล์ csv เพื่อสร้างและจัดการข้อมูลโดยใช้ pyspark dataframe และเปรียบเทียบการทำงานกับการใช้ pandas dataframe

Install pyspark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Upload file stock.csv

In [2]:
from google.colab import files

uploaded = files.upload()

Saving stock.csv to stock.csv


1.สร้าง Spark session

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

2.1 อ่านไฟล์ stock.csv มาสร้าง pandas DataFrame (df)

In [5]:
import pandas as pd
pd_stock = pd.read_csv('stock.csv',parse_dates=['Date'])
pd_stock

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539
3,2012-01-06,59.419998,59.450001,58.869999,59.000000,8069400,51.459220
4,2012-01-09,59.029999,59.549999,58.919998,59.180000,6679300,51.616215
...,...,...,...,...,...,...,...
1253,2016-12-23,69.430000,69.750000,69.360001,69.540001,4803900,69.032411
1254,2016-12-27,69.300003,69.820000,69.250000,69.699997,4435700,69.191240
1255,2016-12-28,69.940002,70.000000,69.260002,69.309998,4875700,68.804087
1256,2016-12-29,69.209999,69.519997,69.120003,69.260002,4298400,68.754456


2.2 อ่านไฟล์ stock.csv มาสร้าง pyspark DataFrame (pf) โดยกำหนดให้มีการ inferSchema จากข้อมูล

In [98]:
sp_stock = spark.read.csv('stock.csv', header=True,inferSchema=True)
sp_stock.show(truncate=False)
sp_stock.printSchema()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|Date      |Open              |High              |Low               |Close             |Volume  |Adj Close         |
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|59.970001         |61.060001         |59.869999         |60.330002         |12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998         |59.470001         |59.709998999999996|9593300 |52.078475         |
|2012-01-05|59.349998         |59.619999         |58.369999         |59.419998         |12768200|51.825539         |
|2012-01-06|59.419998         |59.450001         |58.869999         |59.0              |8069400 |51.45922          |
|2012-01-09|59.029999         |59.549999         |58.919998         |59.18             |6679300 |51.616215000000004|
|2012-01-10|59.43             |59.709998999999996|58.98         

3.1 df มีคอลัมน์อะไรบ้าง

In [99]:
pd_stock.columns

Index(['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close'], dtype='object')

3.2 pf มีคอลัมน์อะไรบ้าง

In [100]:
sp_stock.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

4.1 ให้แสดง datatype ของคอลัมน์ใน df

In [101]:
pd_stock.dtypes

Date         datetime64[ns]
Open                float64
High                float64
Low                 float64
Close               float64
Volume                int64
Adj Close           float64
dtype: object

4.2 ให้แสดง datatype ของคอลัมน์ใน pf

In [102]:
sp_stock.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



5.1 แสดงข้อมูล 5 อันดับแรกของ df

In [103]:
pd_stock.head(5)

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539
3,2012-01-06,59.419998,59.450001,58.869999,59.0,8069400,51.45922
4,2012-01-09,59.029999,59.549999,58.919998,59.18,6679300,51.616215


5.2 แสดงข้อมูล 5 อันดับแรกของ pf

In [104]:
sp_stock.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



6.1 สร้าง df2 จาก df โดยเพิ่มคอลัมน์ HV Ratio ที่เกิดจาก High Price/volume ของสต๊อกในแต่ละวัน

In [105]:
df2 = pd_stock.copy()
df2["HV Ratio"] = df2["High"]/df2["Volume"]
df2

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close,HV Ratio
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235,0.000005
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475,0.000006
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539,0.000005
3,2012-01-06,59.419998,59.450001,58.869999,59.000000,8069400,51.459220,0.000007
4,2012-01-09,59.029999,59.549999,58.919998,59.180000,6679300,51.616215,0.000009
...,...,...,...,...,...,...,...,...
1253,2016-12-23,69.430000,69.750000,69.360001,69.540001,4803900,69.032411,0.000015
1254,2016-12-27,69.300003,69.820000,69.250000,69.699997,4435700,69.191240,0.000016
1255,2016-12-28,69.940002,70.000000,69.260002,69.309998,4875700,68.804087,0.000014
1256,2016-12-29,69.209999,69.519997,69.120003,69.260002,4298400,68.754456,0.000016


6.2 สร้าง pf2 จาก pf โดยเพิ่มคอลัมน์ HV Ratio ที่เกิดจาก High Price/volume ของสต๊อกในแต่ละวัน

In [106]:
from pyspark.sql.functions import concat
pf2 = sp_stock.withColumn('HV Ratio', sp_stock.High/sp_stock.Volume).show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV Ratio|
+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
|2012-01-09|         59.029999|   

7.1  จากข้อมูล df วันที่ใดมีราคา High ที่สูงที่สุด

In [107]:
pd_stock[pd_stock['High']==pd_stock['High'].max()]

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
761,2015-01-13,90.800003,90.970001,88.93,89.309998,8215400,83.825448


7.2  จากข้อมูล pf วันที่ใดมีราคา High ที่สูงที่สุด

In [108]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
windowDept = Window.orderBy(col("High").desc())
sp_stock.withColumn("row",row_number().over(windowDept)) \
  .filter(col("row") == 1).drop("row") \
  .show()

+----------+---------+---------+-----+---------+-------+---------+
|      Date|     Open|     High|  Low|    Close| Volume|Adj Close|
+----------+---------+---------+-----+---------+-------+---------+
|2015-01-13|90.800003|90.970001|88.93|89.309998|8215400|83.825448|
+----------+---------+---------+-----+---------+-------+---------+



8.1 ให้แสดงข้อมูล min และ max ของ Volume ใน df

In [109]:
max_pd = pd_stock[pd_stock['Volume']==pd_stock['Volume'].max()]
min_pd = pd_stock[pd_stock['Volume']==pd_stock['Volume'].min()]
max_min_pd = pd.concat([max_pd,min_pd])
max_min_pd.index=['max',"min"]
max_min_pd

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
max,2015-10-14,66.610001,67.949997,60.02,60.029999,80898100,57.429177
min,2013-12-24,77.949997,78.239998,77.629997,78.010002,2094900,71.430138


8.2 ให้แสดงข้อมูล min และ max ของ Volume ใน pf

In [110]:
from pyspark.sql.functions import min, max

sp_stock.select(min(col('Volume')), max(col('Volume'))).show()

+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|    2094900|   80898100|
+-----------+-----------+



9.1 ใน df จำนวนวันที่มีค่า High มากกว่า 80 คิดเป็นกี่เปอร์เซนต์ของข้อมูลทั้งหมด

In [78]:
high_pd = pd_stock[pd_stock['High']>80]
count_high = len(high_pd)
avg_pd = (count_high/len(pd_stock))*100
print(avg_pd,"%")

9.141494435612083 %


9.2 ใน pf จำนวนวันที่มีค่า High มากกว่า 80 คิดเป็นกี่เปอร์เซนต์ของข้อมูลทั้งหมด

In [111]:

sp_stock.createOrReplaceTempView("temp")
high_sp = spark.sql("select count(*) as c_high from temp where High > 80")
dataCollect = high_sp.collect()[0][0]
avg_sp = (dataCollect/sp_stock.count())*100
print(avg_sp,"%")

9.141494435612083 %


10.1 ใน df ให้แสดงวิธีหาค่า high ของแต่ละปี

In [112]:
df3 = pd_stock.copy()
df3['Date'] = df3['Date'].dt.to_period('Y')
df3.groupby(['Date']).mean()[["High"]]


Unnamed: 0_level_0,High
Date,Unnamed: 1_level_1
2012,67.60212
2013,75.729405
2014,77.74004
2015,73.064167
2016,70.019643


10.2 ใน pf ให้แสดงวิธีหาค่า high ของแต่ละปี

In [150]:
from pyspark.sql.functions import *
sp_stock1 = sp_stock.withColumn('Date',to_timestamp(sp_stock.Date, 'yyyy-MM-dd'))
sp_stock1.groupBy(year("Date").alias("Year")).agg(mean("High").alias("High")).orderBy('Year').show()


+----+-----------------+
|Year|             High|
+----+-----------------+
|2012|67.60211992799995|
|2013|75.72940490079367|
|2014|77.74004000396819|
|2015| 73.0641670238095|
|2016| 70.0196429047619|
+----+-----------------+



11.1 ใน df ให้แสดงค่าเฉลี่ยของราคา close ของแต่ละเดือน (รวมทุกปีเข้าด้วยกัน)

In [149]:
df4 = pd_stock.copy()
df4['Month'] = df4['Date'].dt.month
df4.groupby(["Month"]).mean()[["Close"]]


Unnamed: 0_level_0,Close
Month,Unnamed: 1_level_1
1,71.44802
2,71.306804
3,71.777944
4,72.973619
5,72.309717
6,72.495377
7,74.439719
8,73.029819
9,72.184118
10,71.578545


11.2 ใน pf ให้แสดงค่าเฉลี่ยของราคา close ของแต่ละเดือน (รวมทุกปีเข้าด้วยกัน)

In [161]:
sp_stock2 = sp_stock.withColumn('Date',to_timestamp(sp_stock.Date, 'yyyy-MM-dd'))
sp_stock2.groupBy(month("Date").alias("Month")).agg(mean("Close").alias("Close")).orderBy('Month').show()

+-----+-----------------+
|Month|            Close|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

