애플 주식 데이터를 가지고 간단한 데이터 분석을 해보자. 모든 답은 Pyspark을 통해 이뤄져야 한다.

먼저 PySpark과 Py4J를 설치하자

In [None]:
!pip install pyspark==3.0.1 py4j==0.10.9 

Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 25 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=3f91b34e28284f3ac747eb2ccaea084251a1bda324f01835647d608c4a603e50
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
!ls

sample_data


In [None]:
!ls -tl sample_data

total 55504
-rw-r--r-- 1 root root 18289443 Feb 18 14:33 mnist_test.csv
-rw-r--r-- 1 root root   301141 Feb 18 14:33 california_housing_test.csv
-rw-r--r-- 1 root root  1706430 Feb 18 14:33 california_housing_train.csv
-rw-r--r-- 1 root root 36523880 Feb 18 14:33 mnist_train_small.csv
-rwxr-xr-x 1 root root     1697 Jan  1  2000 anscombe.json
-rwxr-xr-x 1 root root      930 Jan  1  2000 README.md


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local(*)")\
        .appName("PySpark_Tutorial")\
        .getOrCreate()

In [None]:
spark

### **python 객체를 RDD로 변환해보기**

#### **1> Python list 생성**

In [None]:
name_list_json = ['{"name": "keeyong"}', '{"name": "benjamin"}',  '{"name": "claire"}']

In [None]:
for i in name_list_json:
  print(i)

{"name": "keeyong"}
{"name": "benjamin"}
{"name": "claire"}


In [None]:
import json

for name in name_list_json:
  json_name_dict = json.loads(name)
  print(json_name_dict["name"])


keeyong
benjamin
claire


#### **2> 파이썬 리스트를 RDD로 변환. RDD로 변환되는 순간 spark클러스터의 서버들에 데이터가 나눠 저장됨(블럭 파티션), lazy Excution이 됨** 

In [None]:
rdd = spark.sparkContext.parallelize(name_list_json) # 패러랠 진행하여 각 서버에 분산 저장

In [None]:
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

In [None]:
rdd.count() # 분산 저장된 데이터의 갯수

3

In [None]:
rdd.collect()

['{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}']

In [None]:
parsed_rdd = rdd.map(lambda el:json.loads(el)) # rdd를 json형태로 구조화

In [None]:
parsed_rdd

PythonRDD[2] at RDD at PythonRDD.scala:53

In [None]:
parsed_rdd.count()

3

In [None]:
parsed_rdd.collect()

[{'name': 'keeyong'}, {'name': 'benjamin'}, {'name': 'claire'}]

In [None]:
parsed_name_rdd = rdd.map(lambda el:json.loads(el)["name"])

In [None]:
parsed_name_rdd.collect()

['keeyong', 'benjamin', 'claire']

#### Spark Session 만들기

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Dataframe basic example") \
    .getOrCreate()

#### 파이썬 리스트를 데이터프레임으로 변환하기

In [None]:
from pyspark.sql.types import StringType

df = spark.createDataFrame(name_list_json, StringType())

In [None]:
df.count()

3

In [None]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [None]:
df.select('*').collect()

[Row(value='{"name": "keeyong"}'),
 Row(value='{"name": "benjamin"}'),
 Row(value='{"name": "claire"}')]

In [None]:
from pyspark.sql import Row

row = Row("name")
df_name = parsed_name_rdd.map(row).toDF()

In [None]:
parsed_name_rdd.printSchema() # RDD는 스키마가 존재하지 않는다. 

AttributeError: ignored

In [None]:
df_name.printSchema()

In [None]:
df_name.select("name").collect()

#### 애플 주식 CSV 파일 로딩하기: https://pyspark-test-sj.s3-us-west-2.amazonaws.com/appl_stock.csv
일단 pandas 데이터프레임으로 로딩해서 Spark 데이터프레임으로 변경한다

In [None]:
import pandas as pd

apple_pandas_df = pd.read_csv("https://pyspark-test-sj.s3-us-west-2.amazonaws.com/appl_stock.csv")
apple_spark_df = spark.createDataFrame(apple_pandas_df)

#### 1> 어떤 컬럼 이름들이 있는가?

In [None]:
apple_spark_df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

#### 2> 스키마를 프린트해보기

In [None]:
apple_spark_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Adj Close: double (nullable = true)



#### 3> 처음 5개의 레코드를 출력해보기

In [None]:
apple_spark_df.show(n=5)

+----------+----------+----------+----------+----------+---------+------------------+
|      Date|      Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+----------+----------+----------+----------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.380001|214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|213.249994|214.379993|150476200|         27.774976|
|2010-01-06|214.379993|    215.23|210.750004|210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|209.050005|    210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.060005|211.980005|111902700|         27.464034|
+----------+----------+----------+----------+----------+---------+------------------+
only showing top 5 rows



#### 4> describe를 사용하여 데이터프레임의 컬럼별 통계보기

In [None]:
apple_spark_df.describe().show()

+-------+----------+------------------+------------------+------------------+-----------------+-------------------+-----------------+
|summary|      Date|              Open|              High|               Low|            Close|             Volume|        Adj Close|
+-------+----------+------------------+------------------+------------------+-----------------+-------------------+-----------------+
|  count|      1762|              1762|              1762|              1762|             1762|               1762|             1762|
|   mean|      null| 313.0763111589103| 315.9112880164581| 309.8282405079457|312.9270656379113|9.422577587968218E7|75.00174115607275|
| stddev|      null|185.29946803981522|186.89817686485767|183.38391664371008|185.1471036170943|6.020518776592709E7|28.57492972179906|
|    min|2010-01-04|              90.0|         90.699997|         89.470001|        90.279999|           11475900|        24.881912|
|    max|2016-12-30|        702.409988|        705.070023|    

#### 5> Close 컬럼의 평균값은 얼마인가?

In [None]:
from pyspark.sql.functions import mean

apple_spark_df.select(mean("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|312.9270656379113|
+-----------------+



#### 6> Volume 컬럼의 최대값과 최소값은?

In [None]:
from pyspark.sql.functions import min, max

apple_spark_df.select(max("Volume"), min("Volume")).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|  470249500|   11475900|
+-----------+-----------+



#### 보너스 질문: HV ratio라는 이름의 새로운 컬럼을 추가한 데이터프레임을 만들기. 이 컬럼의 값은 High/Volume으로 계산된다

In [None]:
apple_spark_df_with_hv = apple_spark_df.withColumn("hv ratio", apple_spark_df.High/apple_spark_df.Volume) 

In [None]:
apple_spark_df_with_hv.show(5)

#### 보너스 질문: 월별 Close 컬럼의 평균값은?

In [None]:
from pyspark.sql.functions import month

monthdf = apple_spark_df.withColumn("Month", month("Date"))

In [None]:
monthavgdf = monthdf.select(["Month", "Close"]).groupBy("Month").mean("Close")

In [None]:
monthavgdf.show()

+-----+------------------+
|Month|        avg(Close)|
+-----+------------------+
|   12|302.35053626845644|
|    1|322.20971425714276|
|    6|      288.12546566|
|    3|332.91156731372547|
|    5|351.62102085714304|
|    9| 301.0763195902777|
|    4|340.51041081506827|
|    8| 300.4385809612901|
|    7|281.72216211486483|
|   10|308.30552563157886|
|   11| 306.2725174895104|
|    2| 321.3595563037038|
+-----+------------------+



In [None]:
monthavgdf.select(["Month", "avg(Close)"]).orderBy("Month").show()

+-----+------------------+
|Month|        avg(Close)|
+-----+------------------+
|    1|322.20971425714276|
|    2| 321.3595563037038|
|    3|332.91156731372547|
|    4|340.51041081506827|
|    5|351.62102085714304|
|    6|      288.12546566|
|    7|281.72216211486483|
|    8| 300.4385809612901|
|    9| 301.0763195902777|
|   10|308.30552563157886|
|   11| 306.2725174895104|
|   12|302.35053626845644|
+-----+------------------+

