# hive sql

- https://cwiki.apache.org/confluence/display/hive/languagemanual+udf

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
import pandas as pd

from pyspark.sql.functions import when, udf, col, regexp_replace
from pyspark.sql.types import DoubleType,IntegerType, StringType
import pyspark.sql.functions as f

In [2]:
sc = SparkContext('local')
sqlCtx = SQLContext(sc)

In [4]:
df = sqlCtx.read.csv( 'c:/Users/FX506HM/Dropbox/pythonedu/data/crime_in_seoul.csv',encoding='euc-kr',
                    header=True, inferSchema=True) # infer schema를 쓰지 않으면 전부 strtfh dlstlrgksek. 

df

DataFrame[관서명: string, 살인 발생: int, 살인 검거: int, 강도 발생: int, 강도 검거: int, 강간 발생: int, 강간 검거: int, 절도 발생: string, 절도 검거: string, 폭력 발생: string, 폭력 검거: string]

In [5]:
df.select('관서명').show() 

+--------+
|  관서명|
+--------+
|  중부서|
|  종로서|
|남대문서|
|서대문서|
|  혜화서|
|  용산서|
|  성북서|
|동대문서|
|  마포서|
|영등포서|
|  성동서|
|  동작서|
|  광진서|
|  서부서|
|  강북서|
|  금천서|
|  중랑서|
|  강남서|
|  관악서|
|  강서서|
+--------+
only showing top 20 rows



# alias
- column의 이름을 바꿔준다.

In [6]:
df.select(df['관서명'].alias('kw')).show()

+--------+
|      kw|
+--------+
|  중부서|
|  종로서|
|남대문서|
|서대문서|
|  혜화서|
|  용산서|
|  성북서|
|동대문서|
|  마포서|
|영등포서|
|  성동서|
|  동작서|
|  광진서|
|  서부서|
|  강북서|
|  금천서|
|  중랑서|
|  강남서|
|  관악서|
|  강서서|
+--------+
only showing top 20 rows



# 기술통계함수
- max: 해당 column의 가장 큰 객체
- avg: 해당 column의 평균

In [7]:
df.select(f.avg(df['살인 발생']), f.max(df['살인 발생'])).show()

+-----------------+--------------+
|   avg(살인 발생)|max(살인 발생)|
+-----------------+--------------+
|5.258064516129032|            14|
+-----------------+--------------+



# agg

- selct대신 agg 함수를 통해서 구하는 것이 더 쉽다.
- 주의: dictionary 형태는 동일한 column에 대해서는 안된다.

In [8]:
df.agg(f.avg(df['살인 발생']), f.max(df['살인 발생'])).show()

+-----------------+--------------+
|   avg(살인 발생)|max(살인 발생)|
+-----------------+--------------+
|5.258064516129032|            14|
+-----------------+--------------+



In [9]:
df.agg({ '살인 발생': 'max', '살인 검거': 'avg'}).show()

+--------------+-----------------+
|max(살인 발생)|   avg(살인 검거)|
+--------------+-----------------+
|            14|4.935483870967742|
+--------------+-----------------+



In [10]:
gdf = df.agg({'살인 발생': 'max', '살인 검거': 'avg'})
gdf.show()

+--------------+-----------------+
|max(살인 발생)|   avg(살인 검거)|
+--------------+-----------------+
|            14|4.935483870967742|
+--------------+-----------------+



# rdd function은 rdd로 형태를 변형한다.

- rdd 객체를 살펴보면, row 객체로 들어가 있음.

In [11]:
gdf.rdd.collect()

[Row(max(살인 발생)=14, avg(살인 검거)=4.935483870967742)]

In [12]:
df.select(f.round(f.avg('살인 발생'), 2).alias('평균'), 
          f.max(df['살인 발생']).alias('총합')).show()

+----+----+
|평균|총합|
+----+----+
|5.26|  14|
+----+----+



# row

In [13]:
rw = Row(a = 1, b = 2, c = 3)
rw

Row(a=1, b=2, c=3)

In [14]:
type(rw)

pyspark.sql.types.Row

In [15]:
rw['a']

1

In [16]:
# asDict()를 사용하면 row를 dictinary로 바꿀 수 있다.
rw.asDict()

{'a': 1, 'b': 2, 'c': 3}

## map을 lambda를 통해 다룰 수 있다.

In [17]:
my = [10,20,30,40,50]
nRdd = sc.parallelize(my)
nRdd

ParallelCollectionRDD[60] at readRDDFromFile at PythonRDD.scala:262

In [18]:
nRdd.collect()

[10, 20, 30, 40, 50]

In [19]:
nRdd.map(lambda v: v+1).collect()

[11, 21, 31, 41, 51]

In [20]:
ddf = df.select('관서명', '살인 발생')

In [21]:
for r in ddf.head(5):
    print(r['관서명'], r['살인 발생'])

중부서 2
종로서 3
남대문서 1
서대문서 2
혜화서 3


In [22]:
ddf.rdd.map(lambda v:v['살인 발생']).sum()  # 살인 발생의 전체 총합을 구할수 있다.

163

In [23]:
sc.stop()