- RDD는 두가지의 연산으로 이루어져 있다
- Transformation
- Action

- Transformation -> Lazy Execution 또는 Lazy Loading
- 트랜스포메이션이 행해지면 RDD가 수행되는 것이 아니라, 새로운 RDD를 만들어 내고 
- 그 새로운 RDD에 수행결과를 저장하게 된다

- Action
- 메서드로 이루어진 실행작업이며, 트랜스포메이션이 행해지고 나서 이루어지는 Evaluation작업

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [3]:
conf = SparkConf().setMaster('local').setAppName('sparkApp')
spark = SparkContext(conf=conf)
spark

In [5]:
rdd = spark.textFile('./data/test.txt') # transformation
rdd

./data/test.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [6]:
lines = rdd.filter(lambda x :'spark' in x) # action
lines

PythonRDD[4] at RDD at PythonRDD.scala:53

- RDD 생성
- 데이터를 직접 만든는 방법(parallelize), 외부 데이터를 로드 방법으로

In [7]:
sample_rdd = spark.parallelize(['test','this is a test rdd']) # rdd하나가 rom?에 저장
sample_rdd

ParallelCollectionRDD[5] at readRDDFromFile at PythonRDD.scala:262

In [9]:
sample_rdd.collect() # 연산을 수행하는 함수 

['test', 'this is a test rdd']

- RDD 자주 쓰는 연산 함수
- collect() : RDD에 트랜스포메이션된 결과를 리턴하는 함수
- map() : 연산을 수행하고 싶을때 사용하는 함수

In [10]:
numbers = spark.parallelize(list(range(5)))
numbers

ParallelCollectionRDD[6] at readRDDFromFile at PythonRDD.scala:262

In [12]:
s=numbers.map(lambda x : x * x).collect()
s

[0, 1, 4, 9, 16]

- flatmap() : 리스트들의 원소를 하나의 리스트로 flatten해서 리턴하는 함수

In [14]:
strings = spark.parallelize(['hi spark', 'hi python', 'hi sklearn','hi django'])
unique_string = strings.flatMap(lambda x : x.split(' ')).collect()
unique_string

['hi', 'spark', 'hi', 'python', 'hi', 'sklearn', 'hi', 'django']

- filter() : 조건으로 필터링하는 연산자

In [15]:
num = spark.parallelize(list(range(1,30,3)))
num

ParallelCollectionRDD[13] at readRDDFromFile at PythonRDD.scala:262

In [16]:
result = num.filter(lambda x : x % 2 == 0).collect()
result

[4, 10, 16, 22, 28]

### Pair RDD

- pair rdd란 key-value 쌍으로 이루어진 RDD
- python tuple를 의미

In [17]:
# pair RDD 생성
pairRDD = spark.parallelize([(1,3),(1,5),(2,4),(3,3)])
pairRDD

ParallelCollectionRDD[15] at readRDDFromFile at PythonRDD.scala:262

- reduceByKey()

In [18]:
{
    i:j
    for i, j in pairRDD.reduceByKey(lambda x,y : x+y).collect()
}

{1: 8, 2: 4, 3: 3}

In [19]:
{
    i:j
    for i, j in pairRDD.mapValues(lambda x : x**2).collect()
}

{1: 25, 2: 16, 3: 9}

In [20]:
pairRDD.keys().collect()

[1, 1, 2, 3]

In [21]:
pairRDD.groupByKey().collect()

[(1, <pyspark.resultiterable.ResultIterable at 0x278dd6045c0>),
 (2, <pyspark.resultiterable.ResultIterable at 0x278dd604630>),
 (3, <pyspark.resultiterable.ResultIterable at 0x278dd6046a0>)]

In [22]:
pairRDD.values().collect()

[3, 5, 4, 3]

In [24]:
pairRDD.sortByKey().collect()

[(1, 3), (1, 5), (2, 4), (3, 3)]

### 외부 데이터를 로드해서 RDD 생성하는 방법

In [25]:
customerRDD = spark.textFile('./data/spark-rdd-name-customers.csv')
customerRDD # 불변이라 건들지를 못함 rdd라는 새로운 객체 생성

./data/spark-rdd-name-customers.csv MapPartitionsRDD[32] at textFile at NativeMethodAccessorImpl.java:0

In [26]:
customerRDD.first()

'Alfreds Futterkiste,Germany'

In [30]:
# map연산자를 이용해서 콤마로 split하고 튜플로 리턴하는 구문을 작성해보자
cusPairs = customerRDD.map(lambda x : (x.split(',')[1],x.split(',')[0]))
cusPairs.collect()

[('Germany', 'Alfreds Futterkiste'),
 ('Mexico', 'Ana Trujillo Emparedados y helados'),
 ('Mexico', 'Antonio Moreno Taqueria'),
 ('UK', 'Around the Horn'),
 ('Sweden', 'Berglunds snabbkop'),
 ('Germany', 'Blauer See Delikatessen'),
 ('France', 'Blondel pere et fils'),
 ('Spain', 'Bolido Comidas preparadas'),
 ('France', "Bon app'"),
 ('Canada', 'Bottom-Dollar Marketse'),
 ('UK', "B's Beverages"),
 ('Argentina', 'Cactus Comidas para llevar'),
 ('Mexico', 'Centro comercial Moctezuma'),
 ('Switzerland', 'Chop-suey Chinese'),
 ('Brazil', 'Comercio Mineiro'),
 ('UK', 'Consolidated Holdings'),
 ('Germany', 'Drachenblut Delikatessend'),
 ('France', 'Du monde entier'),
 ('UK', 'Eastern Connection'),
 ('Austria', 'Ernst Handel'),
 ('Brazil', 'Familia Arquibaldo'),
 ('Spain', 'FISSA Fabrica Inter. Salchichas S.A.'),
 ('France', 'Folies gourmandes'),
 ('Sweden', 'Folk och fa HB'),
 ('Germany', 'Frankenversand'),
 ('France', 'France restauration'),
 ('Italy', 'Franchi S.p.A.'),
 ('Portugal', 'Furi

In [32]:
# groupByKey() : 키값을 리스트 형태로 리턴하는 함수 - 키를 그룹잡아서 리턴 (중복도 없애줌)
cusPairs.groupByKey().collect()

[('Germany', <pyspark.resultiterable.ResultIterable at 0x278dd652b00>),
 ('Mexico', <pyspark.resultiterable.ResultIterable at 0x278dd652ac8>),
 ('UK', <pyspark.resultiterable.ResultIterable at 0x278dd652c50>),
 ('Sweden', <pyspark.resultiterable.ResultIterable at 0x278dd652b70>),
 ('France', <pyspark.resultiterable.ResultIterable at 0x278dd652d30>),
 ('Spain', <pyspark.resultiterable.ResultIterable at 0x278dd652da0>),
 ('Canada', <pyspark.resultiterable.ResultIterable at 0x278dd652e10>),
 ('Argentina', <pyspark.resultiterable.ResultIterable at 0x278dd652e48>),
 ('Switzerland', <pyspark.resultiterable.ResultIterable at 0x278dd652e80>),
 ('Brazil', <pyspark.resultiterable.ResultIterable at 0x278dd652ef0>),
 ('Austria', <pyspark.resultiterable.ResultIterable at 0x278dd652f60>),
 ('Italy', <pyspark.resultiterable.ResultIterable at 0x278dd652fd0>),
 ('Portugal', <pyspark.resultiterable.ResultIterable at 0x278dd64f048>),
 ('USA', <pyspark.resultiterable.ResultIterable at 0x278dd64f0b8>),
 ('

In [34]:
# uk에 사는 고객이름만 출력한다면? (dict 형식으로 만들어서 표현)
groupKey = cusPairs.groupByKey().collect()

# python 코드 
# for country, name in groupkey :
#     if country =='UK':
#         for name in names :
#             print(name)

coustomerDict = {
    country : [name for name in names] for country,names in groupKey
    
}

coustomerDict['UK']

['Around the Horn',
 "B's Beverages",
 'Consolidated Holdings',
 'Eastern Connection',
 'Island Trading',
 'North/South',
 'Seven Seas Imports']

In [35]:
# sortByKey : Key오름차순으로 정렬을 해보고 상위 10개만 뽑는다면
cusPairs.sortByKey().keys().collect()[:10]

['Argentina',
 'Argentina',
 'Argentina',
 'Austria',
 'Austria',
 'Belgium',
 'Belgium',
 'Brazil',
 'Brazil',
 'Brazil']

In [38]:
# 나라별 고객이 몇명인지를 카운트 해본다면?  # 딕셔너리 형식
#cusPairs.collect()
mapR = cusPairs.mapValues(lambda x : 1).reduceByKey(lambda x,y :x+y)
# mapR

{
    i:j
    for i, j in mapR.collect()
}


# mapR.collect() # 위 값과 동일하지만 이렇게 하면 튜플형식의 리스트로 가져옴 

{'Germany': 11,
 'Mexico': 5,
 'UK': 7,
 'Sweden': 2,
 'France': 11,
 'Spain': 5,
 'Canada': 3,
 'Argentina': 3,
 'Switzerland': 2,
 'Brazil': 9,
 'Austria': 2,
 'Italy': 3,
 'Portugal': 2,
 'USA': 13,
 'Venezuela': 4,
 'Ireland': 1,
 'Belgium': 2,
 'Norway': 1,
 'Denmark': 2,
 'Finland': 2,
 'Poland': 1}

### Spark DataFrame

- spark dataframe은 RDD의 확장된 구조이다
- 행, 열로 이루어진 내장 RDD

- 생성
- 스파크 세션을 이용한 생성
- SQL 컨텍스트의 테이블을 통한 생성

In [43]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import json

In [44]:
sqlCtx = SQLContext(spark)
sqlCtx

<pyspark.sql.context.SQLContext at 0x278dd66e080>

In [49]:
# json 파일
# json -> RDD -> DataFrame
sample_json = spark.textFile('./data/cars.json')
cars_df=sqlCtx.createDataFrame(sample_json.map(lambda x : json.loads(x)))
cars_df.collect()

[Row(brand='Ford', models={'name': 'Fiesta', 'price': '14260'}),
 Row(brand='Ford', models={'name': 'Focus', 'price': '18825'}),
 Row(brand='Ford', models={'name': 'Mustang', 'price': '26670'}),
 Row(brand='BMW', models={'name': '320', 'price': '40250'}),
 Row(brand='BMW', models={'name': 'X3', 'price': '41000'}),
 Row(brand='BMW', models={'name': 'X5', 'price': '60700'}),
 Row(brand='Fiat', models={'name': '500', 'price': '16495'})]

In [50]:
cars_df.printSchema() # 테이블에 대한 요약정보

root
 |-- brand: string (nullable = true)
 |-- models: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [51]:
cars_df.show() # 2차원의 행렬구조를 가지는 테이블 형식을 볼 수 있다

+-----+--------------------+
|brand|              models|
+-----+--------------------+
| Ford|[name -> Fiesta, ...|
| Ford|[name -> Focus, p...|
| Ford|[name -> Mustang,...|
|  BMW|[name -> 320, pri...|
|  BMW|[name -> X3, pric...|
|  BMW|[name -> X5, pric...|
| Fiat|[name -> 500, pri...|
+-----+--------------------+



In [52]:
cars_df.first() # 첫번째 row의 값들을 가져올 수 있다

Row(brand='Ford', models={'name': 'Fiesta', 'price': '14260'})

In [53]:
# 데이터 프레임에 대한 연산
# select()

cars_df.select('brand').show()

+-----+
|brand|
+-----+
| Ford|
| Ford|
| Ford|
|  BMW|
|  BMW|
|  BMW|
| Fiat|
+-----+



In [55]:
type(cars_df.select('models.price').show())

+-----+
|price|
+-----+
|14260|
|18825|
|26670|
|40250|
|41000|
|60700|
|16495|
+-----+



NoneType

In [56]:
# 컬럼의 타입 변환
from pyspark.sql.types import IntegerType

In [70]:
cars_price_type = cars_df.select('brand', 'models.name', 'models.price')
cars_price_type = car_price_type.withColumn('price', car_price_type['price'].cast(IntegerType()))
cars_price_type.show()

+-----+-------+-----+
|brand|   name|price|
+-----+-------+-----+
| Ford| Fiesta|14260|
| Ford|  Focus|18825|
| Ford|Mustang|26670|
|  BMW|    320|40250|
|  BMW|     X3|41000|
|  BMW|     X5|60700|
| Fiat|    500|16495|
+-----+-------+-----+



In [71]:
cars_price_type.first()

Row(brand='Ford', name='Fiesta', price=14260)

In [72]:
# 비교연산
cars_price_type.filter(cars_price_type['price']>20000).show()

+-----+-------+-----+
|brand|   name|price|
+-----+-------+-----+
| Ford|Mustang|26670|
|  BMW|    320|40250|
|  BMW|     X3|41000|
|  BMW|     X5|60700|
+-----+-------+-----+



In [73]:
# 그룹핑
cars_price_type.groupBy('brand').count().show()

+-----+-----+
|brand|count|
+-----+-----+
|  BMW|    3|
| Fiat|    1|
| Ford|    3|
+-----+-----+

