# 26-7. PySpark 설치하기

In [3]:
import pyspark
pyspark.__version__

'3.1.1'

# 26-8. SparkContext를 통한 스파크 초기화

In [4]:
from pyspark import SparkConf, SparkContext

sc = SparkContext()
sc

In [5]:
type(sc)

pyspark.context.SparkContext

In [5]:
#에러 발생을 확인하세요!
new_sc = SparkContext()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-3-09e2fe521565>:3 

In [7]:
sc.stop()

In [8]:
sc = SparkContext(master='local', appName='PySpark Basic')
sc

In [9]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.host', '192.168.0.12'),
 ('spark.app.name', 'PySpark Basic'),
 ('spark.app.startTime', '1614913607319'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '35367'),
 ('spark.app.id', 'local-1614913607379'),
 ('spark.ui.showConsoleProgress', 'true')]

In [10]:
sc.master

'local'

In [11]:
sc.appName

'PySpark Basic'

In [12]:
sc.stop()

In [13]:
conf = SparkConf().setAppName('PySpark Basic').setMaster('local')
sc = SparkContext(conf=conf)
sc

# 26-9. RDD Creation

In [6]:
rdd = sc.parallelize([1,2,3])
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [7]:
type(rdd)

pyspark.rdd.RDD

In [8]:
rdd.take(3)

[1, 2, 3]

# 26-10. RDD Operation (1) Transformations

In [9]:
x = sc.parallelize(["b", "a", "c", "d"])
y = x.map(lambda z: (z, 1))
print(x.collect()) #collect()는 actions입니다. 
print(y.collect())

['b', 'a', 'c', 'd']
[('b', 1), ('a', 1), ('c', 1), ('d', 1)]


In [10]:
nums = sc.parallelize([1, 2, 3])
squares = nums.map(lambda x: x*x)
print(squares.collect())

[1, 4, 9]


In [11]:
x = sc.parallelize([1,2,3,4,5])
y = x.filter(lambda x: x%2 == 0) 
print(x.collect())
print(y.collect())

[1, 2, 3, 4, 5]
[2, 4]


In [12]:
text = sc.parallelize(['a', 'b', 'c', 'd'])
capital = text.map(lambda x: x.upper())
A = capital.filter(lambda x: 'A' in x)
print(text.collect())
print(A.collect())

['a', 'b', 'c', 'd']
['A']


In [15]:
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*10, 30))
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 10, 30, 2, 20, 30, 3, 30, 30]


In [16]:
wordsDataset = sc.parallelize(["Spark is funny", "It is beautiful", "And also It is implemented by python"])
result = wordsDataset.flatMap(lambda x: x.split()).filter(lambda x: x != " ").map(lambda x: x.lower())
# 공백은 제거합니다.
# 단어를 공백기준으로 split 합니다. 
result.collect()

['spark',
 'is',
 'funny',
 'it',
 'is',
 'beautiful',
 'and',
 'also',
 'it',
 'is',
 'implemented',
 'by',
 'python']

In [18]:
import os
csv_path = 'bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_0.take(5)

['survived,sex,age,n_siblings_spouses,parch,fare,class,deck,embark_town,alone',
 '0,male,22.0,1,0,7.25,Third,unknown,Southampton,n',
 '1,female,38.0,1,0,71.2833,First,C,Cherbourg,n',
 '1,female,26.0,0,0,7.925,Third,unknown,Southampton,y',
 '1,female,35.0,1,0,53.1,First,C,Southampton,n']

In [19]:
# 비어있는 라인은 제외하고, delimeter인 ,로 line을 분리해 줍니다. 
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))   
csv_data_1.take(5)

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone'],
 ['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n']]

In [20]:
columns = csv_data_1.take(1)
columns

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone']]

In [21]:
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())  # 첫 번째 컬럼이 숫자인 것만 필터링
csv_data_2.take(5)

[['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n'],
 ['0',
  'male',
  '28.0',
  '0',
  '0',
  '8.4583',
  'Third',
  'unknown',
  'Queenstown',
  'y']]

In [22]:
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])
csv_data_3.take(5)

[[('survived', '0'),
  ('sex', 'male'),
  ('age', '22.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '7.25'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '38.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '71.2833'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Cherbourg'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '26.0'),
  ('n_siblings_spouses', '0'),
  ('parch', '0'),
  ('fare', '7.925'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'y')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '35.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '53.1'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '0'),
  ('sex', 'male'),
  ('age', '28.0'),
  ('n_siblings_spouses', '0'),
  ('parch'

In [23]:
from pyspark import SparkConf, SparkContext, SQLContext

url = 'https://storage.googleapis.com/tf-datasets/titanic/train.csv'
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

df = sqlContext.read.csv(SparkFiles.get("train.csv"), header=True, inferSchema= True)
df.show(5, truncate = False)

+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class|deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|0       |male  |22.0|1                 |0    |7.25   |Third|unknown|Southampton|n    |
|1       |female|38.0|1                 |0    |71.2833|First|C      |Cherbourg  |n    |
|1       |female|26.0|0                 |0    |7.925  |Third|unknown|Southampton|y    |
|1       |female|35.0|1                 |0    |53.1   |First|C      |Southampton|n    |
|0       |male  |28.0|0                 |0    |8.4583 |Third|unknown|Queenstown |y    |
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
only showing top 5 rows



In [24]:
# 위에서 얻은 데이터에서 40세 이상인 사람들의 데이터만 필터링해 봅시다. 
df2 = df[df['age']>40]
df2.show(5, truncate = False)

+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class |deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|0       |male  |66.0|0                 |0    |10.5   |Second|unknown|Southampton|y    |
|0       |male  |42.0|1                 |0    |52.0   |First |unknown|Southampton|n    |
|1       |female|49.0|1                 |0    |76.7292|First |D      |Cherbourg  |n    |
|0       |male  |65.0|0                 |1    |61.9792|First |B      |Cherbourg  |n    |
|0       |male  |45.0|1                 |0    |83.475 |First |C      |Southampton|n    |
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
only showing top 5 rows



# 26-11. RDD Operation (2) Actions

In [25]:
nums = sc.parallelize(list(range(10)))
nums.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [26]:
nums.take(3)

[0, 1, 2]

In [27]:
nums.count()

10

In [37]:
nums.reduce(lambda x, y: x+y)

45

In [38]:
file_path = 'bigdata_ecosystem/file.txt'
nums.saveAsTextFile(file_path)

!ls -l ~/aiffel/bigdata_ecosystem

합계 32
-rw-r--r-- 1 thuban thuban 30874  2월 21  2019 train.csv


In [41]:
# RDD 생성
rdd = sc.parallelize(range(1,100))

# RDD Transformation 
rdd2 = rdd.map(lambda x: 0.5*x - 10).filter(lambda x: x > 0)

print(rdd2.collect())

# RDD Action 
rdd2.reduce(lambda x, y: x + y)

[0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10.0, 10.5, 11.0, 11.5, 12.0, 12.5, 13.0, 13.5, 14.0, 14.5, 15.0, 15.5, 16.0, 16.5, 17.0, 17.5, 18.0, 18.5, 19.0, 19.5, 20.0, 20.5, 21.0, 21.5, 22.0, 22.5, 23.0, 23.5, 24.0, 24.5, 25.0, 25.5, 26.0, 26.5, 27.0, 27.5, 28.0, 28.5, 29.0, 29.5, 30.0, 30.5, 31.0, 31.5, 32.0, 32.5, 33.0, 33.5, 34.0, 34.5, 35.0, 35.5, 36.0, 36.5, 37.0, 37.5, 38.0, 38.5, 39.0, 39.5]


1580.0

# 26-12. RDD Operation (3) 실습:MapReduce

In [44]:
text = sc.parallelize('hello python')

# map 함수를 적용한 RDD 구하기
text_1 = text.filter(lambda x: x != " ")
text_2 = text_1.map(lambda x:(x, 1))

#reduceByKey 함수를 적용한 Word Counter 출력
word_count = text_2.reduceByKey(lambda accum, n: accum + n)  
word_count.collect()

[('l', 2),
 ('o', 2),
 ('t', 1),
 ('n', 1),
 ('y', 1),
 ('e', 1),
 ('p', 1),
 ('h', 2)]

In [45]:
word_count2 = text_2.reduceByKey(lambda accum, n: accum + 5)  
word_count2.collect()

[('l', 6),
 ('o', 6),
 ('t', 1),
 ('n', 1),
 ('y', 1),
 ('e', 1),
 ('p', 1),
 ('h', 6)]

In [58]:
text_2.take(3)

[('h', 1), ('e', 1), ('l', 1)]

2) Titanic 데이터 분석하기


In [52]:
# 이전 스텝에서 CSV 파일을 로딩했던 내역입니다. 
csv_path = 'bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))   
columns = csv_data_1.take(1)
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])

csv_data_3.take(3)

[[('survived', '0'),
  ('sex', 'male'),
  ('age', '22.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '7.25'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '38.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '71.2833'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Cherbourg'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '26.0'),
  ('n_siblings_spouses', '0'),
  ('parch', '0'),
  ('fare', '7.925'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'y')]]

In [54]:
# 생존자와 사망자의 연령 총합 구하기
csv_data_4 = csv_data_3.map(lambda line:(line[0][1], line[2][1]))   # (생존여부, 연령)
age_sum_data = csv_data_4.reduceByKey(lambda accum, age: float(accum) + float(age))  
age_sum = age_sum_data.collect()

# 생존자와 사망자의 사람 수 구하기
csv_data_5 = csv_data_3.map(lambda line:(line[0][1], 1))
survived_data = csv_data_5.reduceByKey(lambda accum, count: int(accum) + int(count)) 
survived_count = survived_data.collect()

age_sum_dict = dict(age_sum)
survived_dict = dict(survived_count)
avg_age_survived = age_sum_dict['1']/survived_dict['1']
print('생존자 평균 연령:' ,avg_age_survived)
avg_age_died = age_sum_dict['0']/survived_dict['0']
print('사망자 평균 연령:' ,avg_age_died)

생존자 평균 연령: 29.110411522633743
사망자 평균 연령: 29.9609375


In [56]:
csv_data_4.take(3)

[('0', '22.0'), ('1', '38.0'), ('1', '26.0')]

In [61]:
age_sum

[('0', 11505.0), ('1', 7073.83)]

In [62]:
survived_count

[('0', 384), ('1', 243)]