In [4]:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

from pyspark.sql.functions import when, udf, col, regexp_replace
from pyspark.sql.types import DoubleType,IntegerType, StringType
import pyspark.sql.functions as f

In [2]:
sc = SparkContext( 'local' )
sqlCtx = SQLContext( sc )

In [3]:
df = sqlCtx.read.csv( 'data/crime_in_seoul.csv',encoding='euc-kr',
                    header=True, inferSchema=True)
df

DataFrame[관서명: string, 살인 발생: int, 살인 검거: int, 강도 발생: int, 강도 검거: int, 강간 발생: int, 강간 검거: int, 절도 발생: string, 절도 검거: string, 폭력 발생: string, 폭력 검거: string]

In [5]:
df.select('관서명').show()

+--------+
|  관서명|
+--------+
|  중부서|
|  종로서|
|남대문서|
|서대문서|
|  혜화서|
|  용산서|
|  성북서|
|동대문서|
|  마포서|
|영등포서|
|  성동서|
|  동작서|
|  광진서|
|  서부서|
|  강북서|
|  금천서|
|  중랑서|
|  강남서|
|  관악서|
|  강서서|
+--------+
only showing top 20 rows



In [6]:
df.select( df['관서명'] ).show()

+--------+
|  관서명|
+--------+
|  중부서|
|  종로서|
|남대문서|
|서대문서|
|  혜화서|
|  용산서|
|  성북서|
|동대문서|
|  마포서|
|영등포서|
|  성동서|
|  동작서|
|  광진서|
|  서부서|
|  강북서|
|  금천서|
|  중랑서|
|  강남서|
|  관악서|
|  강서서|
+--------+
only showing top 20 rows



In [7]:
df.select( df['관서명'].alias('kw') ).show()

+--------+
|      kw|
+--------+
|  중부서|
|  종로서|
|남대문서|
|서대문서|
|  혜화서|
|  용산서|
|  성북서|
|동대문서|
|  마포서|
|영등포서|
|  성동서|
|  동작서|
|  광진서|
|  서부서|
|  강북서|
|  금천서|
|  중랑서|
|  강남서|
|  관악서|
|  강서서|
+--------+
only showing top 20 rows



In [10]:
df.select( f.max( df['살인 발생'] ) ).show()

+--------------+
|max(살인 발생)|
+--------------+
|            14|
+--------------+



In [11]:
df.select( f.avg( df['살인 발생'] ) ).show()

+-----------------+
|   avg(살인 발생)|
+-----------------+
|5.258064516129032|
+-----------------+



In [12]:
df.select( f.max( df['살인 발생'] ), f.avg( df['살인 발생'] ) ).show()

+--------------+-----------------+
|max(살인 발생)|   avg(살인 발생)|
+--------------+-----------------+
|            14|5.258064516129032|
+--------------+-----------------+



In [14]:
df.agg( f.avg( df['살인 발생'] ) ).show()

+-----------------+
|   avg(살인 발생)|
+-----------------+
|5.258064516129032|
+-----------------+



In [17]:
df.agg( {'살인 발생' : 'max', '살인 검거' : 'avg'} ).show() #딕셔너리 형식에선 동일한 컬럼 2개 이상 출력 불가

+--------------+-----------------+
|max(살인 발생)|   avg(살인 검거)|
+--------------+-----------------+
|            14|4.935483870967742|
+--------------+-----------------+



In [18]:
gdf = df.agg( {'살인 발생' : 'max', '살인 검거' : 'avg'} )
gdf.show()

+--------------+-----------------+
|max(살인 발생)|   avg(살인 검거)|
+--------------+-----------------+
|            14|4.935483870967742|
+--------------+-----------------+



In [20]:
gdf.rdd.collect() #row 객체

[Row(max(살인 발생)=14, avg(살인 검거)=4.935483870967742)]

In [22]:
df.select( f.max('살인 발생').alias('최고'), f.round( f.avg( '살인 발생' ), 2 ).alias('평균') ).show()

+----+----+
|최고|평균|
+----+----+
|  14|5.26|
+----+----+



In [23]:
rw = Row(a=1, b=2, c=3) #딕셔너리는 아니지만 딕셔너리처럼 키:값 으로 이루어짐
rw

Row(a=1, b=2, c=3)

In [24]:
type(rw)

pyspark.sql.types.Row

In [25]:
rw['a']

1

In [26]:
rw.asDict()

{'a': 1, 'b': 2, 'c': 3}

In [30]:
gdf = df.select( f.max('살인 발생').alias('최고'), 
                f.round( f.avg( '살인 발생' ), 2 ).alias('평균') )

In [31]:
gdf.rdd.collect() #row로 구성된 분산 리스트 반환

[Row(최고=14, 평균=5.26)]

In [36]:
gdf.rdd.map(lambda v: v['평균']).collect()[0]

5.26

In [32]:
my = [10,20,30,40,50]
nRdd = sc.parallelize(my)
nRdd

ParallelCollectionRDD[121] at readRDDFromFile at PythonRDD.scala:262

In [34]:
nRdd.map(lambda v: v+1).collect()

[11, 21, 31, 41, 51]

In [37]:
ddf = df.select('관서명', '살인 발생')

In [39]:
for r in ddf.head(5):
    print( r['관서명'], r['살인 발생'] )

중부서 2
종로서 3
남대문서 1
서대문서 2
혜화서 3


In [42]:
ddf.rdd.map(lambda v: v['살인 발생']).sum()

163

In [43]:
sc.stop()