In [1]:
import os
import sys

home=os.path.expanduser("~")
os.environ["SPARK_HOME"]=os.path.join(home, 'spark-2.0.0-bin-hadoop2.7')
os.environ["PYLIB"]=os.path.join(os.environ["SPARK_HOME"],'python','lib')
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'py4j-0.10.1-src.zip'))
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'pyspark.zip'))

In [2]:
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder.master("local").appName("myApp").config(conf=myConf).getOrCreate()

# Dataframe 읽기
* 외부 => spark.createDataFrame() 또는 spark.read()
* 내부 => spark.createDataFrame()

In [4]:
myList=[('1','kim, js',170),
        ('1','lee, sm', 175),
        ('2','lim, yg',180),
        ('2','lee',170)]
myDf=spark.createDataFrame(myList)

myDf.printSchema()
print myDf.take(1)

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)

[Row(_1=u'1', _2=u'kim, js', _3=170)]


# 컬럼명 설정

In [5]:
myDf2=spark.createDataFrame(myList, ['year','name','height'])
myDf2.printSchema()
print myDf2.take(1)

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)

[Row(year=u'1', name=u'kim, js', height=170)]


In [6]:
names = ["kim","lee","lee","lim"]
items = ["espresso","latte","americano","affocato","long black","macciato"]
df = spark.createDataFrame([(names[i%4], items[i%6]) for i in range(100)],\
                           ["name","item"])
df.show(10)

+----+----------+
|name|      item|
+----+----------+
| kim|  espresso|
| lee|     latte|
| lee| americano|
| lim|  affocato|
| kim|long black|
| lee|  macciato|
| lee|  espresso|
| lim|     latte|
| kim| americano|
| lee|  affocato|
+----+----------+
only showing top 10 rows



# select() 컬럼 골라내기

* item 열의 substr으로 1,3문자를 선택
* alias로 컬럼명을 정할 수 있다.

In [21]:
df.select(df.item.substr(1, 3).alias('short name')).show(10)

+----------+
|short name|
+----------+
|       esp|
|       lat|
|       ame|
|       aff|
|       lon|
|       mac|
|       esp|
|       lat|
|       ame|
|       aff|
+----------+
only showing top 10 rows



# Row 객체를 사용해서 생성

In [23]:
from pyspark.sql import Row
Person = Row('year','name', 'height')
row1=Person('1','kim, js',170)

In [24]:
myRows = [row1,
          Person('1','lee, sm', 175),
          Person('2','lim, yg',180),
          Person('2','lee',170)]
myDf=spark.createDataFrame(myRows)
print myDf.printSchema()
myDf.show()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)

None
+----+-------+------+
|year|   name|height|
+----+-------+------+
|   1|kim, js|   170|
|   1|lee, sm|   175|
|   2|lim, yg|   180|
|   2|    lee|   170|
+----+-------+------+



# schema를 정의하고 생성

In [26]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType
mySchema=StructType([
    StructField("year", StringType(), True),
    StructField("name", StringType(), True),
    StructField("height", IntegerType(), True)
])

myDf=spark.createDataFrame(myRows, mySchema)
myDf.printSchema()
myDf.show()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: integer (nullable = true)

+----+-------+------+
|year|   name|height|
+----+-------+------+
|   1|kim, js|   170|
|   1|lee, sm|   175|
|   2|lim, yg|   180|
|   2|    lee|   170|
+----+-------+------+



# RDD에서 Dataframe 생성하기

In [31]:
from pyspark.sql import Row

myList=[('1','kim, js',170),('1','lee, sm', 175),('2','lim, yg',180),('2','lee',170)]
myRdd = spark.sparkContext.parallelize(myList)

rddDf=myRdd.toDF()
rddDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [32]:
rddDf2=spark.createDataFrame(myRdd, ["number", "name", "height"])
rddDf2.printSchema()

root
 |-- number: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)



In [34]:
rddDf.where(rddDf._3 < 175)\
    .select([rddDf._1, rddDf._2])\
    .show()

rddDf.groupby(rddDf._1).max().show()

+---+-------+
| _1|     _2|
+---+-------+
|  1|kim, js|
|  2|    lee|
+---+-------+

+---+-------+
| _1|max(_3)|
+---+-------+
|  1|    175|
|  2|    180|
+---+-------+



# RDD => Dataframe => scheme변경 (Row이용)

In [35]:
_myRdd=myRdd.map(lambda x:Row(year=int(x[0]), name=x[1], height=int(x[2])))
_myDf=spark.createDataFrame(_myRdd)
_myDf.printSchema()
_myDf.take(1)

root
 |-- height: long (nullable = true)
 |-- name: string (nullable = true)
 |-- year: long (nullable = true)



[Row(height=170, name=u'kim, js', year=1)]

# scheme변경 (StructType 이용)

In [36]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, TimestampType
r1=Row(name="js1",age=10)
r2=Row(name="js2",age=20)
_myRdd=spark.sparkContext.parallelize([r1,r2])

In [37]:
schema=StructType([
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True),
    #StructField("created", TimestampType(), True)
])
_myDf=spark.createDataFrame(_myRdd,schema)
_myDf.printSchema()
_myDf.show()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

+---+----+
|age|name|
+---+----+
| 10| js1|
| 20| js2|
+---+----+



In [38]:
from pyspark.sql.types import *
myRdd=spark.sparkContext.parallelize([(1, 'kim', 50.0), (2, 'lee', 60.0), (3, 'park', 70.0)])
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("height", DoubleType(), True)
])
_myDf = spark.createDataFrame(myRdd, schema)
_myDf.printSchema()
_myDf.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- height: double (nullable = true)

+---+----+------+
| id|name|height|
+---+----+------+
|  1| kim|  50.0|
|  2| lee|  60.0|
|  3|park|  70.0|
+---+----+------+



# CSV파일 읽기 (RDD로 읽기)

In [40]:
from pyspark.sql import Row
cfile= os.path.join(os.environ["SPARK_HOME"],\
           "examples/src/main/resources/people.txt")
# RDD로 읽기
lines = spark.sparkContext.textFile(cfile)

parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1].strip())))

_myDf = spark.createDataFrame(people)
_myDf.printSchema()
_myDf.collect()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



[Row(age=29, name=u'Michael'),
 Row(age=30, name=u'Andy'),
 Row(age=19, name=u'Justin')]

# CSV 직접 읽기

In [41]:
df = spark.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true').load('data/ds_spark.csv')
df.show()

+---+---+---+---+
|  1|  2|  3|  4|
+---+---+---+---+
| 11| 22| 33| 44|
|111|222|333|444|
+---+---+---+---+



# 예제

In [49]:
_bicycle = spark.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true').load('data/seoul_bicycle.csv')

# 컬럼 변경
bicycle=_bicycle\
    .withColumnRenamed("date", "Date")\
    .withColumnRenamed("count", "Count")

# 컬럼 만들기
bicycle=bicycle.withColumn("year",bicycle.Date.substr(1, 4))
bicycle=bicycle.withColumn("month",bicycle.Date.substr(6, 2))

bicycle.printSchema()
bicycle.groupBy('year').agg({"count":"sum"}).show()
bicycle.groupBy('year').pivot('month').agg({"count":"sum"}).show()

root
 |-- Date: timestamp (nullable = true)
 |-- Count: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)

+----+----------+
|year|sum(count)|
+----+----------+
|2019|   1871935|
|2018|  10124874|
+----+----------+

+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+
|year|    01|    02|    03|    04|    05|     06|     07|     08|     09|     10|    11|    12|
+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+
|2019|495573|471543|904819|  null|  null|   null|   null|   null|   null|   null|  null|  null|
|2018|164367|168741|462661|687885|965609|1207123|1100015|1037505|1447993|1420621|961532|500822|
+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+



In [50]:
_bicycle = spark.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true').load('data/seoul_bicycle.csv')

# 컬럼 변경
bicycle=_bicycle\
    .withColumnRenamed("date", "Date")\
    .withColumnRenamed("count", "Count")

# F함수를 이용하여 컬럼 만들기
import pyspark.sql.functions as F
bicycle = bicycle\
    .withColumn('year', F.year('date'))\
    .withColumn('month', F.month('date'))

bicycle.printSchema()
bicycle.groupBy('year').agg({"count":"sum"}).show()
bicycle.groupBy('year').pivot('month').agg({"count":"sum"}).show()

root
 |-- Date: timestamp (nullable = true)
 |-- Count: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+----+----------+
|year|sum(count)|
+----+----------+
|2018|  10124874|
|2019|   1871935|
+----+----------+

+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+
|year|     1|     2|     3|     4|     5|      6|      7|      8|      9|     10|    11|    12|
+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+
|2018|164367|168741|462661|687885|965609|1207123|1100015|1037505|1447993|1420621|961532|500822|
|2019|495573|471543|904819|  null|  null|   null|   null|   null|   null|   null|  null|  null|
+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+

