# Spark - Data Processing

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [5]:
sc

In [6]:
df = spark.read.csv('Desktop/sample/ages.csv')

csv 파일을 불러와 dataframe 생성.

In [7]:
df.dtypes

[('_c0', 'string'), ('_c1', 'string')]

dtypes: csv 파일의 스키마 구조를 자동으로 만들어 준다.(column명이 명시되어 있지 않기 때문에 기본값으로 설정됨)

In [8]:
df = spark.read.format('json').load('Desktop/sample/people.json')
df.dtypes

[('age', 'bigint'), ('name', 'string')]

format()을 통해 어떤 형식의 파일을 읽어올지 명시할 수 있다.(json으로 명시하여 json 파일을 읽어왔다)

In [9]:
rdd = sc.textFile('Desktop/sample/people.json')
rdd.collect()

['{"name":"Michael"}',
 '{"name":"Andy", "age":30}',
 '{"name":"Justin", "age":19}']

RDD로 가져왔으므로 별도의 스키마 없이 데이터를 저장된 그대로 읽어왔다.

In [10]:
df = spark.read.json(rdd)
df.dtypes

[('age', 'bigint'), ('name', 'string')]

RDD를 read로 읽어 dataframe으로 만드는 방법으로 스키마를 설정하는 것도 가능하다.

In [11]:
rdd = sc.parallelize((
    """
        {
            "id": "123",
            "name": "Katie",
            "age": 19,
            "eyeColor": "brown"
        }
    """,
    """
        {
            "id": "234",
            "name": "Michael",
            "age": 22,
            "eyeColor": "green"
        }
    """,
    """
        {
            "id": "345",
            "name": "Simone",
            "age": 23,
            "eyeColor": "blue"
        }
    """
))

In [12]:
rdd.collect()

['\n        {\n            "id": "123",\n            "name": "Katie",\n            "age": 19,\n            "eyeColor": "brown"\n        }\n    ',
 '\n        {\n            "id": "234",\n            "name": "Michael",\n            "age": 22,\n            "eyeColor": "green"\n        }\n    ',
 '\n        {\n            "id": "345",\n            "name": "Simone",\n            "age": 23,\n            "eyeColor": "blue"\n        }\n    ']

RDD로 json type의 데이터를 만들었다.

In [13]:
df = spark.read.json(rdd)
df

DataFrame[age: bigint, eyeColor: string, id: string, name: string]

RDD에 저장된 json type의 데이터를 읽어 dataframe으로 만들었다. => 자동으로 데이터 형식을 추론하여 type을 맞춰 저장.

In [14]:
df.createOrReplaceTempView("test")
df.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



가지고 있는 스키마 구조에 맞춰 데이터를 보여준다.

In [15]:
spark.sql("SELECT * FROM test").collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

view에 대한 sql문을 실행하고 결과를 Row type으로 가져온다.

In [16]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



스키마에 대한 더 자세한 정보 출력.

In [17]:
rdd = sc.parallelize(
    [
        (123, "Katie", 19, "brown"),
        (234, "Michael", 22, "green"),
        (345, "Simone", 23, "blue")
    ]
)

In [18]:
from pyspark.sql.types import *

In [19]:
scheme = StructType(
    [
        StructField("id", LongType(), True),
        StructField("name", StringType(), True),
        StructField("age", LongType(), True),
        StructField("eyeColor", StringType(), True)
    ]
)

In [20]:
df = spark.createDataFrame(rdd, scheme)

In [21]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



명시된 조건에 맞게 RDD의 스키마를 설정한다.

In [22]:
spark.sql('SELECT COUNT(*) FROM test').show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [23]:
df.count()

3

In [24]:
df.select('id', 'age').show()

+---+---+
| id|age|
+---+---+
|123| 19|
|234| 22|
|345| 23|
+---+---+



In [25]:
spark.sql('SELECT id, age FROM test').show()

+---+---+
| id|age|
+---+---+
|123| 19|
|234| 22|
|345| 23|
+---+---+



In [26]:
df.select('id', 'age').filter('age=22').show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [27]:
spark.sql('SELECT id, age FROM test WHERE age=22').show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



dataframe에 정의된 함수를 사용해도 sql문을 사용한 결과와 같다.

In [28]:
df.select(df.id, df.age).filter(df.age == 22).show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



파이썬 방식으로도 같은 결과를 만들 수 있다.

- dataframe 방식: sql문과 파이썬의 혼합 or 순수 파이썬 문법
- spark.sql 방식: 순수 sql문

___

### Join

In [29]:
flightPath = 'Desktop/sample/departuredelays.csv'
airportPath = 'Desktop/sample/airport-codes-na.txt'

In [30]:
flight = spark.read.csv(flightPath, header=True)
flight.take(1)

[Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL')]

In [31]:
flight.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [32]:
airport = spark.read.csv(airportPath, header=True, inferSchema=True, sep='\t')
airport.take(1)

[Row(City='Abbotsford', State='BC', Country='Canada', IATA='YXX')]

In [33]:
airport.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)



In [34]:
flight.createOrReplaceTempView('flight')
airport.createOrReplaceTempView('airport')

두 dataframe을 join하기 위해 view 생성.

In [35]:
spark.sql(
    """
        SELECT a.City, f.origin, SUM(f.delay) as delay 
        FROM flight f JOIN airport a ON a.IATA = f.origin
        WHERE a.State = "WA" 
        GROUP BY a.City, f.origin
        ORDER BY delay DESC
    """
).show()

+-------+------+--------+
|   City|origin|   delay|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [36]:
from pyspark.sql import functions as F

airport.join(flight, airport.IATA == flight.origin)\
        .where(airport.State == "WA")\
        .select(airport.City, flight.origin, flight.delay)\
        .groupBy(airport.City, flight.origin)\
        .agg(F.sum(flight.delay))\
        .orderBy('sum(delay)', ascending=False)\
        .show()

+-------+------+----------+
|   City|origin|sum(delay)|
+-------+------+----------+
|Seattle|   SEA|  159086.0|
|Spokane|   GEG|   12404.0|
|  Pasco|   PSC|     949.0|
+-------+------+----------+



dataframe 방식, spark.sql 방식으로 같은 결과를 만들었다.

Spark의 Catalyst Optimization을 통해 실행시간을 최적화하므로 어떤 방법을 사용해도 성능 차이는 없다.

___

## Spark - Streaming

In [51]:
from pyspark.sql import functions as F
from pyspark.streaming import StreamingContext

streaming = StreamingContext(sc, 5)

lines = streaming.socketTextStream('localhost', 9999)
words = lines.flatMap(lambda line: line.split())
pairs = words.map(lambda w:(w, 1))
counts = pairs.reduceByKey(lambda x, y: x+y)
counts.pprint()

streaming.start()
streaming.awaitTermination()

-------------------------------------------
Time: 2018-08-08 21:19:45
-------------------------------------------

-------------------------------------------
Time: 2018-08-08 21:19:50
-------------------------------------------
('hi', 2)

-------------------------------------------
Time: 2018-08-08 21:19:55
-------------------------------------------
('hi', 2)
('hello', 1)

-------------------------------------------
Time: 2018-08-08 21:20:00
-------------------------------------------
('are', 1)
('wo', 1)
('who', 1)
('you', 1)

-------------------------------------------
Time: 2018-08-08 21:20:05
-------------------------------------------

-------------------------------------------
Time: 2018-08-08 21:20:10
-------------------------------------------

-------------------------------------------
Time: 2018-08-08 21:20:15
-------------------------------------------
('yo', 3)

-------------------------------------------
Time: 2018-08-08 21:20:20
---------------------------------------

KeyboardInterrupt: 

-------------------------------------------
Time: 2018-08-08 21:20:25
-------------------------------------------

-------------------------------------------
Time: 2018-08-08 21:20:30
-------------------------------------------

-------------------------------------------
Time: 2018-08-08 21:20:35
-------------------------------------------



쉘에서 nc -lk 9999 입력한 후 메시지 입력하면 5초마다 메시지 읽어 출력. - jupyter notebook에서 실행시 키보드 interrupt 줘도 java쪽에서 무한히 실행되는 것을 막을 수 없으므로 실행하지 말것.

___

### Dataframe을 통한 구조적 streaming

In [37]:
lines = spark.readStream.format("socket")\
    .option("host", "localhost")\
    .option("port", 9999)\
    .load()

In [38]:
from pyspark.sql.functions import explode, split

In [39]:
type(lines)

pyspark.sql.dataframe.DataFrame

In [40]:
words = lines.select(
    explode(split(lines.value, " ")).alias("word")
)
wordCount = words.groupBy("word").count()

In [42]:
stream = wordCount.writeStream\
        .outputMode("complete")\
        .format("console")\
        .start()

stream.awaitTermination()

KeyboardInterrupt: 

shell에서 라인 단위로 입력이 끝나면 받아서 dataframe으로 바꿔주고 shell에 출력한다.

shell에서 ctrl + c로 종료하면 즉시 종료됨.

___

## 데이터 전처리

Preprocessing: 60%의 시간을 여기서 사용.
- Raw data => Structure data.
- 데이터가 완전하지 않기에 필요.

___

### 중복 처리

In [43]:
from pyspark.sql import SparkSession

In [44]:
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [
        (1, 144.5, 5.9, 33, "M"),
        (2, 167.2, 5.4, 45, "M"),
        (3, 124.1, 5.2, 23, "F"),
        (4, 144.5, 5.9, 33, "M"),
        (5, 133.2, 5.7, 54, "F"),
        (3, 124.1, 5.2, 23, "F"),
        (5, 129.2, 5.3, 42, "M")
    ],
    ["id", "weight", "height", "age", "gender"]
)

In [45]:
df.count(), df.distinct().count()

(7, 6)

dataframe 내에 중복되는 데이터가 존재함을 알 수 있다.

In [46]:
df = df.dropDuplicates()
df.orderBy('id').show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  4| 144.5|   5.9| 33|     M|
|  5| 133.2|   5.7| 54|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



모든 column에 대해 완벽히 중복되는 row를 삭제하였다.

In [47]:
df.dropDuplicates([c for c in df.columns if c != "id"]).distinct().count()

5

id를 제외한 모든 column에 대해 중복되는 row가 존재함을 알 수 있다.

In [48]:
df = df.dropDuplicates([c for c in df.columns if c != "id"])
df.count()

5

In [49]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



id를 제외한 모든 column에 대해 중복되는 row를 삭제하였다.

In [50]:
df.dropDuplicates([c for c in df.columns if c == "id"]).distinct().count()

4

id column의 값이 중복되는 데이터가 존재함을 알 수 있다.

In [51]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



id는 중복되지만 나머지 column의 값들은 중복되지 않기에 삭제하지 않았다.

In [52]:
from pyspark.sql import functions as f

In [53]:
df.agg(
    f.count("id").alias("Count"),
    f.countDistinct("id").alias("DCount")
).show()

+-----+------+
|Count|DCount|
+-----+------+
|    5|     4|
+-----+------+



aggregation function을 통해 id가 중복된 데이터가 있음을 확인하였다.

In [54]:
df.withColumn("nid", f.monotonically_increasing_id()).show()

+---+------+------+---+------+-------------+
| id|weight|height|age|gender|          nid|
+---+------+------+---+------+-------------+
|  5| 133.2|   5.7| 54|     F|  25769803776|
|  1| 144.5|   5.9| 33|     M| 171798691840|
|  2| 167.2|   5.4| 45|     M| 592705486848|
|  3| 124.1|   5.2| 23|     F|1236950581248|
|  5| 129.2|   5.3| 42|     M|1365799600128|
+---+------+------+---+------+-------------+



id값이 서로 겹치지 않도록 column을 추가하여 새 id값을 부여하였다.

In [55]:
df.withColumn("oid", df.id).show()

+---+------+------+---+------+---+
| id|weight|height|age|gender|oid|
+---+------+------+---+------+---+
|  5| 133.2|   5.7| 54|     F|  5|
|  1| 144.5|   5.9| 33|     M|  1|
|  2| 167.2|   5.4| 45|     M|  2|
|  3| 124.1|   5.2| 23|     F|  3|
|  5| 129.2|   5.3| 42|     M|  5|
+---+------+------+---+------+---+



예전 id column 백업.

In [56]:
df.withColumn("id", f.monotonically_increasing_id()).show()

+-------------+------+------+---+------+
|           id|weight|height|age|gender|
+-------------+------+------+---+------+
|  25769803776| 133.2|   5.7| 54|     F|
| 171798691840| 144.5|   5.9| 33|     M|
| 592705486848| 167.2|   5.4| 45|     M|
|1236950581248| 124.1|   5.2| 23|     F|
|1365799600128| 129.2|   5.3| 42|     M|
+-------------+------+------+---+------+



In [57]:
df = df.withColumn("oid", df.id)
df = df.withColumn("id", f.monotonically_increasing_id())
df.orderBy("oid").show()

+-------------+------+------+---+------+---+
|           id|weight|height|age|gender|oid|
+-------------+------+------+---+------+---+
| 171798691840| 144.5|   5.9| 33|     M|  1|
| 592705486848| 167.2|   5.4| 45|     M|  2|
|1236950581248| 124.1|   5.2| 23|     F|  3|
|  25769803776| 133.2|   5.7| 54|     F|  5|
|1365799600128| 129.2|   5.3| 42|     M|  5|
+-------------+------+------+---+------+---+



새로운 id값을 부여하고 예전 id값은 백업하여 새로운 column으로 추가하였다.

___

### Missing Value 처리

In [58]:
df = spark.createDataFrame(
    [
        (1, 143.5, 5.6, 28, "M", 100000),
        (2, 167.2, 5.4, 45, "M", None),
        (3, None, 5.2, None, None, None),
        (4, 144.5, 5.9, 33, "M", None),
        (5, 133.2, 5.7, 54, "F", None),
        (6, 124.1, 5.2, None, "F", None),
        (7, 129.2, 5.3, 42, "M", 76000)
    ],
    ['id', 'weight', 'height', 'age', 'gender', 'income']
)

In [59]:
df.rdd.map(
    lambda row: (row["id"], sum([c == None for c in row]))
).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

각 row 내에 몇개의 missing value가 존재하는지 확인.

In [60]:
df.where('id=3').show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+



missing value가 가장 많은 id=3인 row 출력.

In [61]:
df.agg(
    1 - f.count("weight") / f.count("*")
).show()

+--------------------------------+
|(1 - (count(weight) / count(1)))|
+--------------------------------+
|              0.1428571428571429|
+--------------------------------+



weight column 내 missing value 비율 출력.

In [62]:
df.agg(
    *[(1 - f.count(c) / f.count('*')).alias(c+"Rate") for c in df.columns]
).show()

+------+------------------+----------+------------------+------------------+------------------+
|idRate|        weightRate|heightRate|           ageRate|        genderRate|        incomeRate|
+------+------------------+----------+------------------+------------------+------------------+
|   0.0|0.1428571428571429|       0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+------+------------------+----------+------------------+------------------+------------------+



column별 missing value 비율 출력. => income column의 missing value가 지나치게 많다.

In [63]:
df = df.select([c for c in df.columns if c != "income"])
df.show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  3|  null|   5.2|null|  null|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



missing value가 지나치게 많아 데이터로서의 의미가 없는 income column 삭제.

In [64]:
df.dropna(thresh = 3).show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



missing value가 3개 이상 존재하는 row 삭제.

In [65]:
means = df.agg(
    *[f.mean(c).alias(c) for c in df.columns if c != "gender"]
)

means.show()

+---+------------------+-----------------+----+
| id|            weight|           height| age|
+---+------------------+-----------------+----+
|4.0|140.28333333333333|5.471428571428572|40.4|
+---+------------------+-----------------+----+



missing value는 자동으로 제외하여 각 column의 평균값을 구한다.(gender column은 categorical data이므로 산출 제외)

In [66]:
mpd = means.toPandas()
type(mpd)

pandas.core.frame.DataFrame

Spark의 특징: immutable => 데이터를 수정하기 위해 pandas로 작업.

In [67]:
mpd = mpd.to_dict('records')[0]
mpd

{'id': 4.0,
 'weight': 140.28333333333333,
 'height': 5.471428571428572,
 'age': 40.4}

In [68]:
mpd['gender'] = 'X'
mpd

{'id': 4.0,
 'weight': 140.28333333333333,
 'height': 5.471428571428572,
 'age': 40.4,
 'gender': 'X'}

gender는 평균값이 없으므로 평균값 대신 임의의 문자 사용.

In [69]:
df = df.fillna(mpd)
df.show()

+---+------------------+------+---+------+
| id|            weight|height|age|gender|
+---+------------------+------+---+------+
|  1|             143.5|   5.6| 28|     M|
|  2|             167.2|   5.4| 45|     M|
|  3|140.28333333333333|   5.2| 40|     X|
|  4|             144.5|   5.9| 33|     M|
|  5|             133.2|   5.7| 54|     F|
|  6|             124.1|   5.2| 40|     F|
|  7|             129.2|   5.3| 42|     M|
+---+------------------+------+---+------+



missing value들을 평균값으로 대체하였다.

___

### Outliers 처리

In [70]:
df = spark.createDataFrame(
    [
        (1, 143.5, 5.3, 28),
        (2, 154.2, 5.5, 45),
        (3, 342.3, 5.1, 99),
        (4, 144.5, 5.5, 33),
        (5, 133.2, 5.4, 54),
        (6, 124.1, 5.1, 21),
        (7, 129.2, 5.3, 42)
    ],
    ['id', 'weight', 'height', 'age']
)

In [71]:
col = ["weight", "height", "age"]
bounds = {}

for c in col:
    quantiles = df.approxQuantile(c, [0.25, 0.75], 0.05)
    IQR = quantiles[1] - quantiles[0]
    bounds[c] = [quantiles[0] - 1.5*IQR, quantiles[1] + 1.5*IQR]

In [72]:
bounds

{'weight': [91.69999999999999, 191.7],
 'height': [4.499999999999999, 6.1000000000000005],
 'age': [-11.0, 93.0]}

각 column에서 outlier가 거의 없어 데이터로서 유의미한 범위를 구하였다.

In [73]:
outlier = df.select(
    *["id"]+[((df[c] < bounds[c][0]) | (df[c] > bounds[c][1])).alias(c+'_O') for c in col]
)
outlier.show()

+---+--------+--------+-----+
| id|weight_O|height_O|age_O|
+---+--------+--------+-----+
|  1|   false|   false|false|
|  2|   false|   false|false|
|  3|    true|   false| true|
|  4|   false|   false|false|
|  5|   false|   false|false|
|  6|   false|   false|false|
|  7|   false|   false|false|
+---+--------+--------+-----+



true: 각 column에 대한 outlier.

In [74]:
joinTable = df.join(outlier, on='id')
joinTable.filter('weight_O').select("id", "weight").show()

+---+------+
| id|weight|
+---+------+
|  3| 342.3|
+---+------+



weight_O가 true인 row의 id, weight 출력.(outlier 출력)

In [75]:
joinTable.filter('!weight_O').select("id", "weight", "height", "age").show()

+---+------+------+---+
| id|weight|height|age|
+---+------+------+---+
|  7| 129.2|   5.3| 42|
|  6| 124.1|   5.1| 21|
|  5| 133.2|   5.4| 54|
|  1| 143.5|   5.3| 28|
|  2| 154.2|   5.5| 45|
|  4| 144.5|   5.5| 33|
+---+------+------+---+



outlier가 존재하는 row는 삭제하였다.

In [76]:
sc.stop()