# DataFrame 마무리 & Covariance Matrix(1)
- - -

#### 1. Performing a `Map` command in DataFrame
* In order to perform a map on a dataframe, you first need to transform it into an RDD!
* Not the recommended way. Better to use built-in sparkSQL functions

<br>

#### 2. Covariance Matrix

* Calculating the mean of Sample Vectors

* Outer product of sample vectors

* Covariance Matrix
- - -

### 1. Performing a `Map` command in DataFrame

#### pyspark import & SparkContext 생성

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

sc = SparkContext(master="local[*]")
print(sc)

# Just like using Spark requires having a SparkContext, using SQL requires an SQLContext
sqlContext = SQLContext(sc)
print(sqlContext)

<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.context.SQLContext object at 0x000001636DC895C8>


In [3]:
!pip install GoogleDriveDownloader



In [4]:
#### 예제 파일 다운로드
from os.path import exists
from google_drive_downloader import GoogleDriveDownloader as gdd
import tarfile

if exists("./NY.tgz"):
    !rm -rf ./NY.tgz
if exists("./NY.parquet"):
    !rm -rf ./NY.parquet
    
gdd.download_file_from_google_drive(file_id='1hAHV6vC6FvVgrYnoN-lR-IfH488-H121',
                                   dest_path = './NY.tgz')
!tar -xzvf NY.tgz
df = sqlContext.read.load("NY.parquet")

'rm'은(는) 내부 또는 외부 명령, 실행할 수 있는 프로그램, 또는
배치 파일이 아닙니다.
'rm'은(는) 내부 또는 외부 명령, 실행할 수 있는 프로그램, 또는
배치 파일이 아닙니다.
x NY.parquet/
x NY.parquet/_SUCCESS
x NY.parquet/part-00022-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00000-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00021-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00001-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00023-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00002-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00024-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00003-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00025-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00004-89caf7c0-9733-40ec-a650-7f368529dd01-c000.snappy.parquet
x NY.parquet/part-00027-89caf7c0-9733-40ec-a650-

In [5]:
# printSchema()와 sample를 이용한 데이터 확인
df.printSchema()
# sample 사용법 참조
# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sample
df.sample(False, 0.01).show(3)

root
 |-- Station: string (nullable = true)
 |-- Measurement: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Values: binary (nullable = true)
 |-- dist_coast: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- state: string (nullable = true)
 |-- name: string (nullable = true)

+-----------+-----------+----+--------------------+-----------------+-----------------+------------------+------------------+-----+-----------+
|    Station|Measurement|Year|              Values|       dist_coast|         latitude|         longitude|         elevation|state|       name|
+-----------+-----------+----+--------------------+-----------------+-----------------+------------------+------------------+-----+-----------+
|USW00014743|   TMIN_s20|1923|[2B D8 3D D8 4D D...|388.9079895019531|44.57720184326172|-75.10970306396484|136.60000610351562|   NY|CANTON 4 SE|
|USW00014743|   TMIN_s20|19

* DataFrame to RDD : `[DataFrame].rdd`, 각각의 요소가 `Row`인 RDD 생성

* RDD to DataFrame : `sqlContext.createDataFrame([RDD], schema)`

단, RDD에서 DataFrame으로 변환시, schema를 꼭 정의해줘야 한다!

In [6]:
some_rdd = df.rdd.takeSample(False, 1)
some_rdd

[Row(Station='USC00309072', Measurement='SNWD_s20', Year=2005, Values=bytearray(b"\x0cT/TQTsT\x95T\xb7T\xd8T\xf8T\x17U7UVUtU\x90U\xacU\xc6U\xdeU\xf4U\tV\x1dV/V?VNVZVeVoVvV|V\x80V\x83V\x83V\x83V\x80V~VwVqVhV_VUVKV?V2V$V\x17V\tV\xf9U\xe9U\xd7U\xc5U\xb3U\xa1U\x8eUzUfUQU;U%U\x0cU\xf4T\xdbT\xc2T\xa8T\x8eTsTYT?T%T\nT\xdcS\xa4SkS/S\xf4R\xbcR~R@R\x03R\xc6Q\x8bQPQ\x16Q\xdeP\xa9PuPBP\x10P\xbcO[O\xfeN\xa4NMN\xfbM\xabM]M\x12M\xc9L\x83LAL\x01L\x89K\x12K\x9cJ+J\xbcIUI\xf1H\x94H9H\xcdG2G\xa1F\x0eF\x81E\x07E\x96D,D\x9dC\xf8BrB\xf3A|A\x0bA\xa1@=@\xbe?\x0e?~>\x0b>\x9d=4=\xd1<s<\x1a<\x8c;I:\xca9S9\xe28y8\x178v7\xcc6.6\x9b5\x1e4\xe90\x890.0\xaf/\x0b/q.\xe1-Y-\xdb,f,\xf1+(+o*\xc4)))\x9a(\x19(F\'q&\xb0%\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x

* Lec 6의 예제

    1. 주어진 DataFrame에서 `Year가 1900 미만인 경우 '19th'`, `2000 미만인 경우 '20th'`, `2010 미만인 경우 '21st'`, `모두 아닐 경우 'possibly_bad_data'로 값을 치환`하여라

 여기서 `map`에 들어가는 `input`이 무엇인지 반드시 숙지하여야 한다.
    


In [7]:
def find_century(row):
    if row.Year < 1900:
        return "19th"
    elif row.Year < 2000:
        return "20th"
    elif row.Year < 2010:
        return "21st"
    else:
        return "possibly_bad_data"
    
df.rdd.map(find_century).take(5) # map에 들어가는 input값은 rdd list가 되는게 맞는지 ? 

['20th', '20th', '20th', '20th', '20th']

* Lec 6의 예제

    2. 주어진 DataFrame에서 각각의 요소 중 `longitude`와 `latitude`를 추출하여 (longitude, latitude)의 형태로 값을 출력하여라.

In [8]:
df.rdd.map(lambda row : (row.longitude, row.latitude)).take(5) # 데이터 프레임의 필드의 종류인 위도와 경도를 rdd 코드로 row 출력

[(-77.71330261230469, 42.57080078125),
 (-77.71330261230469, 42.57080078125),
 (-77.71330261230469, 42.57080078125),
 (-77.71330261230469, 42.57080078125),
 (-77.71330261230469, 42.57080078125)]

### Excercise 1 - RDD function in DataFrame (60 point)

- - -
task 별로 ``위에서 사용한 DataFrame(df)``에서 ``RDD``로 변환 후 ``RDD function``을 적용하여 해결합니다

(task 당 20 point)

---

**task**

* 1 : ``df``에서 ``name``별 가장 최근 ``Year``를 **map과 reduce, 또는 reduceByKey 등**을 활용하여 구한 후, take(10)을 통하여 출력합니다.(20 point)

<br>

* 2 : ``df``에서 ``Year``가 ``2000 이상``인 결과에 대해, ``name``별 ``Year``, ``dist_coast``, ``elevation``의 평균을 구하고 ``name``를 기준으로 정렬(Z->A)한 후, take(5)을 통하여 출력합니다.(20 point)

<br>

* 3 : ``df``에서 ``Measurement`` 별 `Values`의 1일부터 10일까지(``np.frombuffer(row.Values[:20], dtype='float16')`` 또는 ``np.frombuffer(row.Values, dtype = 'float16')[:10])의 합이 가장 큰 ``Year``와 ``그 값을 구한 후``, ``Measurement``를 기준으로 정렬(A->Z)합니다. 마지막으로 collect를 하여 출력합니다.(20 point) 

- - -

**task**

* 1 : ``df``에서 ``name``별 가장 최근 ``Year``를 **map과 reduce, 또는 reduceByKey 등**을 활용하여 구한 후, take(10)을 통하여 출력합니다.(20 point)

**★ DataFrame이 아닌 RDD로 작업할 것**

```
#task1 output
[('DANSVILLE MUNI AP', 2013),
 ('BRIDGEHAMPTON', 2013),
 ('MIDDLETOWN 2 NW', 2011),
 ('BERLIN 5 S', 2000),
 ('ELMIRA CORNING RGNL AP', 2013),
 ('UNADILLA 2 N', 2013),
 ('SUFFERN 2 E', 1955),
 ('ROXBURY', 1972),
 ('LOWVILLE', 2013),
 ('GABRIELS', 1978)]

```

In [9]:
# 1-1 답 작성
# map을 이용하여 (name, Year) 형태로 mapping 하고
# reduceByKey에 가장 최근, 즉 가장 큰값을 찾기위해 
# max 함수를 인자로 넘겨주어 최대값을 찾음
# 마지막으로 take(10)으로 10개를 보여줌
df.rdd.map(lambda row : (row.name, row.Year)).reduceByKey(max).take(10)

[('DANSVILLE MUNI AP', 2013),
 ('BRIDGEHAMPTON', 2013),
 ('MIDDLETOWN 2 NW', 2011),
 ('BERLIN 5 S', 2000),
 ('ELMIRA CORNING RGNL AP', 2013),
 ('UNADILLA 2 N', 2013),
 ('SUFFERN 2 E', 1955),
 ('ROXBURY', 1972),
 ('LOWVILLE', 2013),
 ('GABRIELS', 1978)]

**task**

* 2 : ``df``에서 ``Year``가 ``2000 이상``인 결과에 대해, ``name``별 ``Year``, ``dist_coast``, ``elevation``의 평균을 구하고 ``name``를 기준으로 정렬(Z->A)한 후, take(10)을 통하여 출력합니다.(20 point)

**★ DataFrame이 아닌 RDD로 작업할 것**

```
# task2 output
[('YOUNGSTOWN 2 NE', (2008.5, 476.80999755859375, 85.30000305175781)),
 ('YORKTOWN HEIGHTS 1W',
  (2006.421686746988, 28.945999145507812, 204.1999969482422)),
 ('WINDHAM 3 E', (2004.8387096774193, 147.28700256347656, 512.0999755859375)),
 ('WILLSBORO 1 N',
  (2005.360655737705, 255.24200439453125, 54.900001525878906)),
 ('WHITNEY POINT DAM', (2009.2826086956522, 235.35800170898438, 317.0))]
 ```

In [10]:
# 1-2 답 작성
# name을 key로 하여 Year, dist_coast, elevation을 value로 가지는 rdd를 3개 mapping함
# map에는 2000>=Year 경우만 처리하는 함수를 인자로 넘겨줌
# filter를 통해 None을 제거해줌
# 그리고 groupByKey를 통해 같은 key끼리 grouping 해줌
# 만들어진 각각 rdd를 mapValues와 적절한 lambda로 평균을 구함
# 그후에 join을 통하여 합침
# 합치고 map을 통해 적절한 형태로 mapping 해줌
def func1(row):
    if row.Year>=2000:
        return row.name, row.Year
def func2(row):
    if row.Year>=2000:
        return row.name, row.dist_coast
def func3(row):
    if row.Year>=2000:
        return row.name, row.elevation
temp1 = df.rdd.map(func1).filter(lambda x: x is not None).groupByKey().mapValues(lambda row : sum(row)/len(row))
temp2 = df.rdd.map(func2).filter(lambda x: x is not None).groupByKey().mapValues(lambda row : sum(row)/len(row))
temp3 = df.rdd.map(func3).filter(lambda x: x is not None).groupByKey().mapValues(lambda row : sum(row)/len(row))
ret = temp1.join(temp2).join(temp3).map(lambda x: (x[0], (x[1][0][0], x[1][0][1], x[1][1]) ) ).sortByKey(ascending=False)
ret.take(10)

[('YOUNGSTOWN 2 NE', (2008.5, 476.80999755859375, 85.30000305175781)),
 ('YORKTOWN HEIGHTS 1W',
  (2006.421686746988, 28.945999145507812, 204.1999969482422)),
 ('WINDHAM 3 E', (2004.8387096774193, 147.28700256347656, 512.0999755859375)),
 ('WILLSBORO 1 N',
  (2005.360655737705, 255.24200439453125, 54.900001525878906)),
 ('WHITNEY POINT DAM', (2009.2826086956522, 235.35800170898438, 317.0)),
 ('WHITEHALL', (2006.421686746988, 220.0189971923828, 36.29999923706055)),
 ('WESTHAMPTN GABRESKI AP',
  (2006.1333333333334, 4.3188700675964355, 20.399999618530273)),
 ('WESTFIELD 2 SSE', (2001.5, 418.3210144042969, 215.5)),
 ('WESTCHESTER CO AP', (2005.734693877551, 7.586299896240234, 115.5)),
 ('WEST POINT', (2006.0, 47.6151008605957, 97.5))]

**task**

* 3 : ``df``에서 ``Measurement`` 별 `Values`의 1일부터 10일까지(``np.frombuffer(row.Values[:20], dtype='float16')`` 또는 ``np.frombuffer(row.Values, dtype = 'float16')[:10])의 합이 가장 큰 ``Year``와 ``그 값을 구한 후``, ``Measurement``를 기준으로 정렬(A->Z)합니다. 마지막으로 collect를 하여 출력합니다.(20 point) 

<br>

**★ DataFrame이 아닌 RDD로 작업할 것**

★★여기서 ``Values``는 ``bytearray`` type입니다. ★★

``numpy``의 **frombuffer**를 이용하여 ``float16``으로 바꿉니다.
자세한 사용법은 [여기](https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html)를 참고합니다

★★또한, ``numpy``의 ``nansum``을 이용하여 값이 **nan**이 아닌 Values의 합을 구합니다.★★

```
# task3 output
[('PRCP', (12824.0, 1946)),
 ('PRCP_s20', (4264.0, 1983)),
 ('SNOW', (2912.0, 1954)),
 ('SNOW_s20', (4140.0, 1895)),
 ('SNWD', (16590.0, 1970)),
 ('SNWD_s20', (12320.0, 1976)),
 ('TMAX', (2216.0, 1897)),
 ('TMAX_s20', (2050.0, 1897)),
 ('TMIN', (623.0, 2007)),
 ('TMIN_s20', (21360.0, 1987)),
 ('TOBS', (1000.0, 1998)),
 ('TOBS_s20', (969.5, 1992))]
 ```

In [36]:
import numpy as np

# 1-3 답 작성
#np.frombuffer(row.Values[:20], dtype='float16') # 또는 
#np.frombuffer(row.Values, dtype = 'float16')[:10]
# 먼저 2개의 (Measurement, sum of Values), (sum of Values, Year) 형태의 2개의 rdd를 만듬
# 그리고 전자에 reduceByKey에 max를 인자로 넘겨주어 sum of Values의 최대값으로 reduce함
# 그리고 map을 통해 적절한 형태로 mapping 해줌
# join을 통해 합침
# Measurement와 sum of values은 동일하나 Year 다른 경우가 존재함
# 때문에 다시한번 reduceByKey에 min을 인자로 넘겨주어 가장 작은 Year를 구함
# 다시한번 map을 통해 적절한 형태로 mapping 해주고
# sortByKey로 정렬해주고
# collect 해줌
temp1 = df.rdd.map(lambda row: (row.Measurement, np.nansum(np.frombuffer(row.Values, dtype = 'float16')[:10]) ))
temp2 = df.rdd.map(lambda row: (np.nansum(np.frombuffer(row.Values, dtype = 'float16')[:10]), row.Year))
temp1 = temp1.reduceByKey(max).map(lambda x: (x[1], x[0]))
ret = temp1.join(temp2).map(lambda x: ((x[1][0], x[0]), x[1][1])).reduceByKey(min).map(lambda x: (x[0][0], (x[0][1], x[1]))).sortByKey().collect()
ret

[('PRCP', (12824.0, 1946)),
 ('PRCP_s20', (4264.0, 1964)),
 ('SNOW', (2912.0, 1904)),
 ('SNOW_s20', (4140.0, 1895)),
 ('SNWD', (16590.0, 1970)),
 ('SNWD_s20', (12320.0, 1976)),
 ('TMAX', (2216.0, 1897)),
 ('TMAX_s20', (2050.0, 1897)),
 ('TMIN', (623.0, 1898)),
 ('TMIN_s20', (21360.0, 1987)),
 ('TOBS', (1000.0, 1902)),
 ('TOBS_s20', (969.5, 1992))]

### 2. Spark와 Numpy를 사용하여 Covariance Matrix 구해보자!

#### (1) numpy 기초 
아래 참고자료를 활용하여 numpy 기초 학습

* 참고자료 
  * [참고자료 1](http://taewan.kim/post/numpy_cheat_sheet/)
  * [참고자료 2](https://docs.scipy.org/doc/numpy/user/quickstart.html)
  * [참고자료 3](https://scipy-lectures.org/intro/numpy/array_object.html#what-are-numpy-and-numpy-arrays)
  * [참고자료 4](https://doorbw.tistory.com/171)
  * [참고자료 5](https://datascienceschool.net/view-notebook/17608f897087478bbeac096438c716f6/)

* 위 자료에서중에서도 
  * ndarray 생성법
  * vector, matrix 연산
  * 인덱싱 (slicing)
  * 행렬 합치기 (vstack, dstack, hstack)
  * sum, mean
  * np.nan 자료형
  * reshape
  * matmul


#### (2)  Calculating the mean of Sample Vectors 
다음 벡터들을 샘플 벡터로 가정합니다.
* $n$ 은 샘플의 수 이고
* $d$는 각 데이터 벡터의 길이 입니다 (예를 들어서 날씨데이터의 경우 $d=365$)
$$
\mathbf{x}_i = \left[\begin{array}{cccc}
x_{i1} & x_{i2}& \ldots & x_{id}, 
\end{array}\right], \quad i=1,\ldots, n
$$
Sample vector 들의 mean (평균) 벡터 $\bar{\mathbf{x}}$는 다음과 같이 구합니다 
$$
\bar{\mathbf{x}} = \frac{1}{n}\sum_{i=1}^n \mathbf{x}_i
$$


In [15]:
# 평균 벡터 구하는 문제 (sum, mean 활용 둘다 해도 괜찮습니다))

In [16]:
# in python
import numpy as np

sample1 = np.array([1,2,3])
sample2 = np.array([4,5,6])
sample3 = np.array([7,8,9])
print("각 sample의 값 : ", sample1, sample2, sample3)
print("sample vector sum : ", sample1 + sample2 + sample3)
print("sample vector mean : ", (sample1 + sample2 + sample3) / 3)

각 sample의 값 :  [1 2 3] [4 5 6] [7 8 9]
sample vector sum :  [12 15 18]
sample vector mean :  [4. 5. 6.]


In [17]:
# in spark
vector_list = sc.parallelize([np.array([1,2,3]),np.array([4,5,6]),np.array([7,8,9])])
print("각 sample의 값 : ", vector_list.collect())
print("sample vector sum : ", vector_list.reduce(lambda ndarr1, ndarr2 : ndarr1 + ndarr2))
print("sample vector mean : ",
      vector_list.reduce(lambda ndarr1, ndarr2 : ndarr1 + ndarr2) / vector_list.count())

각 sample의 값 :  [array([1, 2, 3]), array([4, 5, 6]), array([7, 8, 9])]
sample vector sum :  [12 15 18]
sample vector mean :  [4. 5. 6.]


### Excercise 2 - Calculating the mean of Sample Vectors (40 point)

- - -
 
다음 데이터에 대하여 다음 과제를 수행하세요.

- regular.csv : KBO에서 활약한 타자들의 역대 정규시즌 성적을 포함하여 몸무게, 키 ,생년월일 등의 기본정보

**위의 두 데이터는 모두 `,`로 구분되어 있습니다.**

 - **데이터의 자세한 설명은 다음의 링크를 참조해주세요.([여기를 눌러서 12. 데이터 설명 참고](https://dacon.io/cpt6/62885))**
 - 또한 regular.csv를 직접 열어서 데이터가 어떻게 저장되어 있는지 확인해주세요.

- - -

**task**

- 1. filter를 사용하여 팀 이름이 ``두산``인 선수에 대해, ``(batter_id, np.array([G,R,H,RBI,BB]))``의 형태로 Key/Value RDD를 생성합니다. (20 point)

    1. G R H RBI BB의 경우 초기 설정값이 ``stirng``.  이 값들을 ``float64``으로 변경할 것. ex) ``np.array([1,2,3], dtype = 'float64')``
    2. ★ 각 값이 `' '`일 경우, 0 으로 변경할 것. 
    3. ``map``에서 바로 적용 또는 그러한 함수를 작성
    

<br>

- 2. (20 point)
    1. ``reduceByKey``를 사용하여 ``batter_id``(Key)가 동일한 선수의 ``G, R, H, RBI, BB``(Value)를 각각 더해준 후 ``batter_id``(Key)를 기준으로 ``sortByKey``를 적용합니다. 그 후, ``map``을 사용하여 ``G, R, H, RBI, BB``(Value)만 선택 후 새로운 RDD로 만듭니다.
    2. 위에서 생성된 RDD에 대해, 중복되지 않는 sample의 수를 ``count``를 이용하여 구합니다.
    3. ``reduce``를 이용하여 ``G, R, H, RBI, BB``(Value)를 모두 더한 후 위에서 구한 sample의 수(`count`)로 나누어서 sample vector의 평균을 구합니다.



---

In [38]:
import urllib.request
import re

f = urllib.request.urlretrieve ("https://docs.google.com/uc?export=download&id=1b_L-rJYJC9Oqga0fQ2zh2M763CTM8jzR", "regular.csv")
regular = sc.textFile("./regular.csv").map(lambda x : x.split(","))

**task**

- 1. filter를 사용하여 팀 이름이 ``두산``인 선수에 대해, ``(batter_id, np.array([G,R,H,RBI,BB]))``의 형태로 Key/Value RDD를 생성합니다. (20 point)

    1. G R H RBI BB의 경우 초기 설정값이 ``stirng``.  이 값들을 ``float64``으로 변경할 것. ex) ``np.array([1,2,3], dtype = 'float64')``
    2. ★ 각 값이 `' '`일 경우, 0 으로 변경할 것. 
    3. ``map``에서 바로 적용 또는 그러한 함수를 작성
    
```
#output
[(7, array([1., 0., 0., 0., 0.])),
 (7, array([64., 16., 21., 11.,  6.])),
 (7, array([93., 30., 40., 25., 16.])),
 (7, array([6., 3., 3., 3., 2.])),
 (7, array([40., 14., 17.,  5., 10.])),
 (7, array([48., 12., 24., 10., 18.])),
 (17, array([16.,  1.,  1.,  1.,  0.])),
 (17, array([32.,  2.,  3.,  0.,  0.])),
 (17, array([16.,  2.,  2.,  0.,  0.])),
 (17, array([116.,  38.,  85.,  29.,  24.]))]
 ```

In [45]:
def stringToFloat(element): # G R H RBI BB
    # 함수를 작성해서 사용해도 되고, 안 사용해도 됩니다.
    return (int(element[0]), np.array(element[1:], dtype = 'float64'))

# output
# 먼저 데이터를 보면 row의 3번째에 이름이 온다는 것을 알 수 있음
# 그리고 G R H RBI BB는 각각 5, 7, 8 ,13, 16번째에 온다는 것을 알 수 있음
# 그러면 filter로 이름이 두산인 경우만 filtering 해주고
# map을 통해 G R H RBI BB를 적절히 mapping 시켜줌
# 또한 원래는 string 형태이지만 함수를 통해 G는 int로, 나머지는 float64로 변경시켜줌
task1 = regular.filter(lambda row : row[3] == "두산").map(lambda row : stringToFloat([row[0], row[5], row[7], row[8], row[13], row[16]]))
task1.take(10)

[(7, array([1., 0., 0., 0., 0.])),
 (7, array([64., 16., 21., 11.,  6.])),
 (7, array([93., 30., 40., 25., 16.])),
 (7, array([6., 3., 3., 3., 2.])),
 (7, array([40., 14., 17.,  5., 10.])),
 (7, array([48., 12., 24., 10., 18.])),
 (17, array([16.,  1.,  1.,  1.,  0.])),
 (17, array([32.,  2.,  3.,  0.,  0.])),
 (17, array([16.,  2.,  2.,  0.,  0.])),
 (17, array([116.,  38.,  85.,  29.,  24.]))]

**task**

- 2. (20 point)
    1. ``reduceByKey``를 사용하여 ``batter_id``(Key)가 동일한 선수의 ``G, R, H, RBI, BB``(Value)를 각각 더해준 후 ``batter_id``(Key)를 기준으로 ``sortByKey``를 적용합니다. 그 후, ``map``을 사용하여 ``G, R, H, RBI, BB``(Value)만 선택 후 새로운 RDD로 만듭니다.
    2. 위에서 생성된 RDD에 대해, 중복되지 않는 sample의 수를 ``count``를 이용하여 구합니다.
    3. ``reduce``를 이용하여 ``G, R, H, RBI, BB``(Value)를 모두 더한 후 위에서 구한 sample의 수(`count`)로 나누어서 sample vector의 평균을 구합니다.

```
# output
[G, R, H, RBI, BB] mean :  [427.8846  186.32692 344.05768 178.0577  129.48077]
```

In [46]:
# reduceByKey로 batter_id가 동일한 선수의 G R H RBI BB를 더해줌
# sortByKey로 정렬
# map을 통해 G R H RBI BB만 mapping 시켜주고 task2로 변수 지정
# count를 통해 task2의 갯수를 구해주고 cnt로 변수 지정
# reduce에 sum을 인자로 넘겨주어 총 합을 구하고 cnt로 나누어줌
# 그리고 이 값을 task2_mean_vector로 지정함
# output 예시에는 float32로 주어졌을때의 결과로 보여지지만
# 위의 task1에서는 float64로 계산하라고 하였기 때문에 상이한 결과가 나타날 수 있음
task2 = task1.reduceByKey(lambda x,y : np.asarray([x[i]+y[i] for i in range(len(x))])).sortByKey().map(lambda row : row[1:])
cnt = task2.count()
task2_mean_vector = (task2.reduce(sum)/cnt)[0]
#output
print("[G, R, H, RBI, BB] mean : ", task2_mean_vector)

[G, R, H, RBI, BB] mean :  [427.88461538 186.32692308 344.05769231 178.05769231 129.48076923]
