# SparkEnv_Learning_01

빅데이터 - 스칼라(scala), 스파크(spark)로 시작하기: <https://wikidocs.net/28387>     
\[SPARK\]Tutorial(pyspark) : <https://yujuwon.tistory.com/entry/spark-tutorial>

In [38]:
import pyspark    # pyspark
import findspark  # findspark

# findspark

- findspark 팩키지를 통해서 스파크를 찾아내고 pyspark.SparkContext 명령어로 스파크 접속지점을 특정

# RDD

- RDD는 외부데이터를 읽어서 처리하거나 자체적으로 컬렉션 데이터를 생성하여 처리할 수 있다. 
- 데이터 처리는 파티션 단위로 분리해서 작업을 처리한다. 


- RDD 타입 
    - 트랜스포메이션(transformation)
        - 필터링 같은 작업으로 RDD에서 새로운 RDD를 반환 
    - 액션(action) 
        - RDD로 작업을 처리하여 결과를 반환
        - 실행될 때마다 새로운 연산을 처리 
            - 만약 작업의 처리 결과를 재사용하고 싶으면 persist() 메소드를 사용하여 결과를 메모리에 유지할 수 있다. 
            
            
- RDD는 SparkContext객체를 이용하여 생성이 가능하다. 
    - SparkContext
        - SparkConf 객체를 이용해서 파라메터값을 설정 혹은 생성한다. 
        - 초기화도 가능하다. 


## RDD 데이터 이용

1. 내부 데이터를 이용하는 방법(Parallelized Collections)
    - parallelize() 메소드를 이용
        - 연산 : map(), reduce(), filter() 등의 RDD 연산을 이용해서 처리한다. 


2. 외부데이터 이용 
    - textFile() 메소드를 이용


In [2]:
findspark.init()
findspark.find() 

'C:\\Bigdata\\spark-2.4.5-bin-hadoop2.7'

## spark 세션을 생성해주기위해서 다음과 같이 컴파일을 진행해준다.  

In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [5]:
conf = pyspark.SparkConf().setAppName('appName').setMaster('local[2]')
# sc = SparkContext(master='local[2]', appName='appName')

In [6]:
sc = pyspark.SparkContext(conf=conf)

In [7]:
spark = SparkSession(sc)

### 만약 세션이 끝난다면 다음과 같이 코드를 실행한다. 

### RDD만들기

RDD를 생성하기 위해서는 sc.parallelize()를 사용해야한다. 


In [8]:
# 리스트에서 RDD 생성 
data = list(range(1,6))
data

[1, 2, 3, 4, 5]

In [9]:
# inputdata의 새로운 집합을 생성하기위함 
rdd = sc.parallelize(data, 4) # data를 메모리에 저장될때 4조각으로 쪼개서 메모리에 저장 
rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [20]:
# 
sc.defaultParallelism

2

In [21]:
rdd1 = rdd.map(lambda x: x * 2)
# map() : 데이터를 가공한다. 반환타입이 같지 않아도 된다.

In [22]:
# collect()는 액션이며 실제로 collect()가 호출되면 RDD가 메모리에 올려져 계산이 이루어진다. 
# 각 테스크의 엔트리들을 수집한후 그결과를 다시 SparkContext전송한다.
rdd1.collect()

[2, 4, 6, 8, 10]

In [23]:
rdd2 = rdd.filter(lambda x: x % 2 == 0)
# filter() : 함수의 결과가 참인경우에만 요소들을 통과시키는 함수이다. 결과로 새로운 RDD를 생성한다 
# 액션은 아니다. 

In [24]:
rdd2.collect()

[2, 4]

### start tset
---

In [None]:
# filter를 테스트 해보자 

In [27]:
def ten(val):
    if(val<10):
        return True
    else:
        return False
    

Filter_test = rdd1.filter(ten)
Filter_test.collect()

[2, 4, 6, 8]

In [None]:
# 주어진 조건에 해당하는 데이터만 선별해 오는 것을 알 수 있다.

### End test
---

In [34]:
rdd3 = sc.parallelize([1, 3, 2, 3, 4])
rdd3.distinct().collect()

[2, 4, 1, 3]

In [28]:
rdd3 = sc.parallelize([1, 4, 2, 2, 3])
rdd3.distinct().collect()
# distinct() : 중복을 제거한 RDD를 반환한다. 

# 의문 : 왜 순서가 바뀌었을까? - 포기

[4, 2, 1, 3]

In [29]:
rdd4 = sc.parallelize([1, 2, 3])
rdd4.map(lambda x: [x, x+5]).collect()

[[1, 6], [2, 7], [3, 8]]

In [30]:
rdd4.flatMap(lambda x: [x, x+5]).collect()
# 차원 변경 ?
# iterator 안에 포함된 값으로 RDD를 구성하기 원할 경우에 flatmap()을 사용

[1, 6, 2, 7, 3, 8]

### Action

- reduce(func)  
- take(n)  
- collect()  
- takeOrdered(n, key=func) 

In [31]:
rdd = sc.parallelize([1,2,3])
rdd.reduce(lambda a, b : a * b)

# reduce(func): 계산된 값을 하나로 합쳐준다. 
# reduce은 파티션 레벨 단위로 적용된다.

6

In [32]:
rdd.take(2)
# take(): RDD에서 해당 개수만큼 데이터를 가져온다. 

[1, 2]

In [33]:
rdd.collect()

[1, 2, 3]

In [35]:
rdd5 = sc.parallelize([5, 3, 1, 2])
rdd5.takeOrdered(3, lambda s: -1 * s)
# takeOrdered() : 해당 개수만큼 데이터를 가져오는데 정렬해서 가져온다.(오름차순, 내림차순)

[5, 3, 2]

In [36]:
rdd5

ParallelCollectionRDD[27] at parallelize at PythonRDD.scala:195

### start tset
---

In [None]:
# 데이터 세트를 넘겨주고 RDD를 생성한다. 

In [12]:
# 리스트를 생성
# tmp_data = list(range(1,10001))
tmp_data = range(1,10001)

# 담자 
plc_RDD = sc.parallelize(data,10) # 파티션은 제한이 없는것인가? 
print('type of plc_RDD: {0}'.format(type(plc_RDD)))


type of plc_RDD: <class 'pyspark.rdd.RDD'>


In [13]:
# 해당 RDD의 파티션 숫자를 확인
plc_RDD.getNumPartitions()  

10

In [14]:
print(plc_RDD.toDebugString())  

b'(10) ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:195 []'


In [15]:
print('plc_RDD id: {0}'.format(plc_RDD.id())) # RDD id 확인 

plc_RDD id: 2


### End test
---