# Spark Streaming
- 내가 원하는 시간 마다 rdd를 가져온다. 
- 스트리밍을 쓰는 이유: 데이터 프레임이라고 가정하면 바로 머신러닝이나 SQL를 통해 분석이 가능하다. 

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
from pyspark.streaming import StreamingContext

In [3]:
streaming = StreamingContext(sc, 5) 
# batch 하는 시간을 정할 수 있다. 

## stream을 input에 따라 받아온다.
- streaming.socketTextStream

In [4]:
# 텍스트를 5초 간격으로 받아올 것이고 그것을 lines을 하라고 한다.
lines = streaming.socketTextStream("localhost", 9999)

In [5]:
# 한 줄을 읽으면 단어 별로 분리 
words = lines.flatMap(lambda line:line.split())

In [6]:
# 각 단어들을 1의 튜플로 만든다.
pairs = words.map(lambda w:(w,1))

In [7]:
# 단어에 따라서 같은 단어라면 1을 다 더하면 각 단어의 count가 된다. 
counts = pairs.reduceByKey(lambda a,b:a+b)

## nc -lk 9999
- 9999 port local host를 listening 과 keep alive하라는 것의 명령어를 실행한다.
- netcat이 서버를 만든 것이다.

In [8]:
counts.pprint()

In [11]:
streaming.start()
streaming.awaitTermination(timeout=20)

-------------------------------------------
Time: 2018-08-06 14:36:35
-------------------------------------------

-------------------------------------------
Time: 2018-08-06 14:36:40
-------------------------------------------

-------------------------------------------
Time: 2018-08-06 14:36:45
-------------------------------------------
('apple', 1)

-------------------------------------------
Time: 2018-08-06 14:36:50
-------------------------------------------
('orange', 1)

-------------------------------------------
Time: 2018-08-06 14:36:55
-------------------------------------------
('blueberrt', 1)

-------------------------------------------
Time: 2018-08-06 14:37:00
-------------------------------------------

-------------------------------------------
Time: 2018-08-06 14:37:05
-------------------------------------------
('blueberry', 1)

-------------------------------------------
Time: 2018-08-06 14:37:10
-------------------------------------------
('wow', 1)

--------

## 구조적으로 stream을 읽어온다.
- spark.readStream.format

In [3]:
lines = spark.readStream.format("socket")\
            .option("host", "localhost")\
            .option("port", 9999)\
            .load()

In [4]:
from pyspark.sql.functions import explode, split

In [5]:
# word라는 컬럼 안에 단어의 table 생성한다.  
words = lines.select(
    explode(split(lines.value, " ")).alias("word")
)

In [6]:
# 중복되는 단어를 묶어서 count를 센다
wordCount = words.groupBy("word").count()

### jupyter notebook console 창에 나오게 만들었다.

In [12]:
# complete: 모든 streaming을 전부다 보여준다. 
stream = wordCount.writeStream\
        .outputMode("complete")\
        .format("console")\
        .start() 1
stream.awaitTermination(timeout=20)

False

![Image](readstreamformat.png)

# Data Modeling

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [16]:
df = spark.createDataFrame(
    [
        (1, 144.5, 5.9, 33, "M"),
        (2, 167.2, 5.4, 45, "M"),
        (3, 124.1, 5.2, 23, "F"),
        (4, 144.5, 5.9, 33, "M"),
        (5, 133.2, 5.7, 54, "F"),
        (3, 124.1, 5.2, 23, "F"),
        (5, 129.2, 5.3, 42, "M"),
    ],
    ['id', 'weight', 'height', 'age', 'gender']
)

## 중복 값 제거
- 모든 열이 동일한 id= 3

In [17]:
df.count(), df.distinct().count()

(7, 6)

In [18]:
df = df.dropDuplicates()
df.orderBy("id").show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  4| 144.5|   5.9| 33|     M|
|  5| 129.2|   5.3| 42|     M|
|  5| 133.2|   5.7| 54|     F|
+---+------+------+---+------+



## id는 다르지만 동일한 data 제거
- data 가 겹치는 것을 제거 (id = 1 과 id = 4)

In [20]:
df = df.dropDuplicates([c for c in df.columns if c != "id"]).distinct()

In [21]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



## id만 같을 뿐 데이터가 다를 경우
- 새로운 id 생성

In [23]:
df.dropDuplicates([c for c in df.columns if c == "id"]).distinct().show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  3| 124.1|   5.2| 23|     F|
|  2| 167.2|   5.4| 45|     M|
+---+------+------+---+------+



In [24]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [25]:
from pyspark.sql import functions as f

In [28]:
# id 중복되어 있는 것과 중복을 제거한 각각의 count를 return
df.agg(
    f.count("id").alias("count"),
    f.countDistinct("id").alias("distinct")
).show()

+-----+--------+
|count|distinct|
+-----+--------+
|    5|       4|
+-----+--------+



In [30]:
df = df.withColumn("oldId", df.id)

In [31]:
df.show()

+---+------+------+---+------+-----+
| id|weight|height|age|gender|oldId|
+---+------+------+---+------+-----+
|  5| 133.2|   5.7| 54|     F|    5|
|  1| 144.5|   5.9| 33|     M|    1|
|  2| 167.2|   5.4| 45|     M|    2|
|  3| 124.1|   5.2| 23|     F|    3|
|  5| 129.2|   5.3| 42|     M|    5|
+---+------+------+---+------+-----+



In [33]:
# 자동으로 id를 증가하여 생성
df = df.withColumn("id", f.monotonically_increasing_id())

In [36]:
df.show()

+-------------+------+------+---+------+-----+
|           id|weight|height|age|gender|oldId|
+-------------+------+------+---+------+-----+
|  25769803776| 133.2|   5.7| 54|     F|    5|
| 171798691840| 144.5|   5.9| 33|     M|    1|
| 592705486848| 167.2|   5.4| 45|     M|    2|
|1236950581248| 124.1|   5.2| 23|     F|    3|
|1365799600128| 129.2|   5.3| 42|     M|    5|
+-------------+------+------+---+------+-----+



## Missing Data

In [43]:
df = spark.createDataFrame(
    [
        (1, 144.5, 5.9, 33, "M", 100000),
        (2, 167.2, 5.4, 45, "M", None),
        (3, None, 5.2, None, None, None),
        (4, 144.5, 5.9, 33, "M", None),
        (5, 133.2, 5.7, 54, "F", None),
        (6, 124.1, 5.2, None, "F", None),
        (7, 129.2, 5.3, 42, "M", 76000),
    ],
    ['id', 'weight', 'height', 'age', 'gender', 'income']
)

In [44]:
# 각 row에 missing value가 몇 개인지 확인 할 수 있다.
df.rdd.map(
    lambda row:(row["id"], sum([c == None for c in row]))
).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

In [45]:
df.where("id=3").show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+



In [50]:
df.agg(f.count("weight")).count() 
# missing value 가 몇 개인지 확인 할 수 있다.

1

## missing 의 비율 확인

In [58]:
# missing percentage
df.agg(
    *[(1 - f.count(c)/f.count("*")).alias(c+" Rate") for c in df.columns]
).show()
# income의 missing value가 굉장히 많다.

+-------+------------------+-----------+------------------+------------------+------------------+
|id Rate|       weight Rate|height Rate|          age Rate|       gender Rate|       income Rate|
+-------+------------------+-----------+------------------+------------------+------------------+
|    0.0|0.1428571428571429|        0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+-------+------------------+-----------+------------------+------------------+------------------+



## missing이 많은 열을 제거

In [59]:
df = df.select([c for c in df.columns if c != "income"])

In [60]:
df.show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 144.5|   5.9|  33|     M|
|  2| 167.2|   5.4|  45|     M|
|  3|  null|   5.2|null|  null|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [62]:
df.dropna(thresh=3).show() 
# 한 row에 null값이 3개 이상인 경우의 row를 제거한다.

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 144.5|   5.9|  33|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [70]:
# 평균 값을 구해서 null값을 대체한다.
means = df.agg(
    *[f.mean(c).alias(c) for c in df.columns if c != "gender"]
)

In [68]:
means.show()

+---+------------------+-----------------+----+
| id|            weight|           height| age|
+---+------------------+-----------------+----+
|4.0|140.45000000000002|5.514285714285713|41.4|
+---+------------------+-----------------+----+



## pandas로 변경을 한 다음에 다시 dataframe으로 변경
- 값이 null일 경우 이 값을 채운다.

In [71]:
mpd = means.toPandas()

In [72]:
type(mpd)

pandas.core.frame.DataFrame

In [73]:
mpd.to_dict("records")

[{'id': 4.0,
  'weight': 140.45000000000002,
  'height': 5.5142857142857133,
  'age': 41.399999999999999}]

In [77]:
mpd = mpd.to_dict("records")[0]

In [78]:
mpd

{'id': 4.0,
 'weight': 140.45000000000002,
 'height': 5.5142857142857133,
 'age': 41.399999999999999}

In [81]:
mpd["gender"] = "X" # gender의 초기값도 생성

In [80]:
mpd

{'id': 4.0,
 'weight': 140.45000000000002,
 'height': 5.5142857142857133,
 'age': 41.399999999999999,
 'gender': 'X'}

## missing data 정제 

In [82]:
df = df.fillna(mpd)
df.show()

+---+------------------+------+---+------+
| id|            weight|height|age|gender|
+---+------------------+------+---+------+
|  1|             144.5|   5.9| 33|     M|
|  2|             167.2|   5.4| 45|     M|
|  3|140.45000000000002|   5.2| 41|     X|
|  4|             144.5|   5.9| 33|     M|
|  5|             133.2|   5.7| 54|     F|
|  6|             124.1|   5.2| 41|     F|
|  7|             129.2|   5.3| 42|     M|
+---+------------------+------+---+------+



## Outlier

In [83]:
df = spark.createDataFrame(
    [
        (1, 143.5, 5.3, 28),
        (2, 154.2, 5.5, 45),
        (3, 342.3, 5.1, 99),
        (4, 144.5, 5.5, 33),
        (5, 133.2, 5.4, 54),
        (6, 124.1, 5.1, 21),
        (7, 129.2, 5.3, 42),
    ],
    ['id', 'weight', 'height', 'age']
) 

In [85]:
quantile = df.approxQuantile("weight", [0.25,0.75], 0.05)

In [87]:
quantile

[129.2, 154.2]

In [88]:
IQR = quantile[1] - quantile[0]

In [89]:
IQR

25.0

In [90]:
bounds = [quantile[0]-1.5*IQR, quantile[1]+1.5*IQR]

In [98]:
cols = ["weight", "height", "age"]
bounds = {}

for c in cols:
    quantiles = df.approxQuantile(c, [0.25,0.75], 0.05)
    IQR = quantiles[1] - quantiles[0]
    bounds[c] = [quantiles[0]-1.5*IQR, quantiles[1]+1.5*IQR]

In [99]:
bounds

{'weight': [91.69999999999999, 191.7],
 'height': [4.499999999999999, 6.1000000000000005],
 'age': [-11.0, 93.0]}

In [100]:
df.select(df["weight"] < bounds["weight"][0]).show()
df.select(df["weight"] > bounds["weight"][1]).show()

+----------------------------+
|(weight < 91.69999999999999)|
+----------------------------+
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
+----------------------------+

+----------------+
|(weight > 191.7)|
+----------------+
|           false|
|           false|
|            true|
|           false|
|           false|
|           false|
|           false|
+----------------+



In [103]:
outlier = df.select(
    *["id"] + [((df[c] < bounds[c][0]) |
    (df[c] > bounds[c][1])).alias(c+"_Outlier") for c in cols ]
)

In [109]:
outlier.show()

+---+--------------+--------------+-----------+
| id|weight_Outlier|height_Outlier|age_Outlier|
+---+--------------+--------------+-----------+
|  1|         false|         false|      false|
|  2|         false|         false|      false|
|  3|          true|         false|       true|
|  4|         false|         false|      false|
|  5|         false|         false|      false|
|  6|         false|         false|      false|
|  7|         false|         false|      false|
+---+--------------+--------------+-----------+



In [106]:
joinTable = df.join(outlier, on="id").orderBy("id")

In [107]:
joinTable.show()

+---+------+------+---+--------------+--------------+-----------+
| id|weight|height|age|weight_Outlier|height_Outlier|age_Outlier|
+---+------+------+---+--------------+--------------+-----------+
|  1| 143.5|   5.3| 28|         false|         false|      false|
|  2| 154.2|   5.5| 45|         false|         false|      false|
|  3| 342.3|   5.1| 99|          true|         false|       true|
|  4| 144.5|   5.5| 33|         false|         false|      false|
|  5| 133.2|   5.4| 54|         false|         false|      false|
|  6| 124.1|   5.1| 21|         false|         false|      false|
|  7| 129.2|   5.3| 42|         false|         false|      false|
+---+------+------+---+--------------+--------------+-----------+



In [110]:
joinTable.filter("weight_Outlier").select("id", "weight").show()

+---+------+
| id|weight|
+---+------+
|  3| 342.3|
+---+------+



In [111]:
joinTable.filter("!weight_Outlier").select("id", "weight").show()

+---+------+
| id|weight|
+---+------+
|  1| 143.5|
|  2| 154.2|
|  4| 144.5|
|  5| 133.2|
|  6| 124.1|
|  7| 129.2|
+---+------+



In [115]:
joinTable.filter("!weight_Outlier").filter("!age_Outlier").select("id", "weight", "height", "age").show()

+---+------+------+---+
| id|weight|height|age|
+---+------+------+---+
|  1| 143.5|   5.3| 28|
|  2| 154.2|   5.5| 45|
|  4| 144.5|   5.5| 33|
|  5| 133.2|   5.4| 54|
|  6| 124.1|   5.1| 21|
|  7| 129.2|   5.3| 42|
+---+------+------+---+

