# Spark DataFrame

Spark DataFrame:

•  Each row of a DataFrame is a Row object

•  The fields in a Row can be accessed like attributes

In [124]:
from __future__ import unicode_literals
import pandas as pd

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import Row
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

In [16]:
sc = SparkContext() if sc is None else sc
hc = HiveContext(sc)

In [336]:
sqlContext

<pyspark.sql.context.SQLContext at 0x105101410>

# 1. Spark DataFrame Creation

Spark DF建立時必須定義schema，定義每一個欄位名稱與資料型態

You construct DataFrames:

•  by parallelizing existing Python collections (lists) 

•  by transforming an existing Spark RDD or pandas DFs 

•  from files in HDFS or any other storage system

## 1.1 Create DataFrames from Python collections (lists)

In [95]:
data = [('Alice', 18), ('Bob', 20)]  # list

In [168]:
#df = hc.createDataFrame(data)
df = sqlContext.createDataFrame(data)

In [169]:
type(df)

pyspark.sql.dataframe.DataFrame

In [170]:
df

DataFrame[_1: string, _2: string, _3: bigint]

In [171]:
df2 = sqlContext.createDataFrame(data, ['name', 'age'])

In [172]:
df2

DataFrame[name: string, age: string, _3: bigint]

## 1.2 Creating DataFrame from pandas

In [97]:
pandas_df = pd.read_csv("data/user.csv", sep=",", names=["userid", "age", "gender", "occupation", "zipcode"])

In [98]:
type(pandas_df)

pandas.core.frame.DataFrame

In [102]:
#pandas_df

In [100]:
spark_df = sqlContext.createDataFrame(pandas_df)

In [103]:
#spark_df.show()

## 1.3 From HDFS, text files, csv file, parquet file...

### (1). 讀取 text file 成為 DF

In [120]:
# Loads text file and returns a DataFrame with a single string column named "value" Each line in text file is a row
df = sqlContext.read.text("data/user.csv")

In [121]:
df

DataFrame[value: string]

In [130]:
#df.show(truncate=False)

### (2). 讀取 csv file 成為 DF

In [126]:
schema = StructType([
        StructField("userid", IntegerType(), False),
        StructField("age", IntegerType(), False),
        StructField("gender", StringType(), False),
        StructField("occupation", StringType(), False),
        StructField("zipcode", StringType(), False)
        ])

In [127]:
df = sqlContext.read.csv("data/user.csv", schema)

In [128]:
df

DataFrame[userid: int, age: int, gender: string, occupation: string, zipcode: string]

In [131]:
#df.show()

### (3). 讀取 parquet file 成為 DF

將文件（text file）轉換為 Parquet file，可以加速查詢、節省硬碟儲存空間

In [143]:
df = sqlContext.read.parquet("data/ptt.snappy.parquet")  # pyspark.sql.dataframe.DataFrame

In [144]:
df.count()

4522

In [147]:
#df.printSchema()

In [150]:
#df.show()

## 1.4 RDD -->  DataFrame

pickle file: 物件序列化、存入硬碟後的檔案

In [174]:
# 讀取 pickle file
rdd = sc.pickleFile("data/ptt.pickle")

In [175]:
rdd.count()

4522

In [176]:
rdd.take(1)

[(u'ptt',
  u'PTTcreditcard',
  datetime.date(2017, 5, 3),
  1493747939,
  u'',
  u'5%\u55ce\uff1f\u722c\u4e86\u597d\u591a\u56de\u994b\u6587\u90fd\u6c92\u63d0\u5230\u9019\u584a\uff5eQQ',
  u'',
  u'chun65455',
  2,
  u'M.1493746890.A.5F2',
  u'M.1493746890.A.5F2_chun65455_2017-05-03T01:58:59_6',
  u'\u2192')]

In [177]:
rdd_row = rdd.map(lambda x:
        Row(
            website = x[0],
            channel = x[1],
            time = x[2],
            sort_time = x[3],
            title = x[4],
            content = x[5],
            url = x[6],
            writer_id = x[7],
            level = x[8],
            article_id = x[9],
            r_article_id = x[10],
            mood = x[11]
            )
       )

In [178]:
rdd_row.take(1)

[Row(article_id=u'M.1493746890.A.5F2', channel=u'PTTcreditcard', content=u'5%\u55ce\uff1f\u722c\u4e86\u597d\u591a\u56de\u994b\u6587\u90fd\u6c92\u63d0\u5230\u9019\u584a\uff5eQQ', level=2, mood=u'\u2192', r_article_id=u'M.1493746890.A.5F2_chun65455_2017-05-03T01:58:59_6', sort_time=1493747939, time=datetime.date(2017, 5, 3), title=u'', url=u'', website=u'ptt', writer_id=u'chun65455')]

In [179]:
df = rdd_row.toDF()

In [180]:
#df.printSchema()

In [181]:
#df.show()

In [182]:
df2 = sqlContext.createDataFrame(rdd_row)

In [183]:
#df2.printSchema()

In [184]:
#df2.show()

# 2.  Spark DataFrame Transformations

## 2.1 Select Column

In [222]:
df = sqlContext.read.csv("data/user.csv", schema)

In [235]:
df.count()

943

In [223]:
df

DataFrame[userid: int, age: int, gender: string, occupation: string, zipcode: string]

In [224]:
df.age

Column<age>

In [225]:
# * selects all the columns
df.select('*')

DataFrame[userid: int, age: int, gender: string, occupation: string, zipcode: string]

In [226]:
# 選取欄位的三種方式
# selects the userid and age columns
df.select('userid', 'age') #.show()

DataFrame[userid: int, age: int]

In [227]:
df.select(df.userid, df.age) #.show()

DataFrame[userid: int, age: int]

In [228]:
df.select(df["userid"], df["age"]) #.show()

DataFrame[userid: int, age: int]

## 2.2 Add Column

In [231]:
# 增加計算欄位
#df = df.select('*', (2017-df.age).alias("birthyear"))
df = df.withColumn("birthyear", 2017-df.age)

In [232]:
df.show()

+------+---+------+-------------+-------+---------+
|userid|age|gender|   occupation|zipcode|birthyear|
+------+---+------+-------------+-------+---------+
|     1| 24|     M|   technician|  85711|     1993|
|     2| 53|     F|        other|  94043|     1964|
|     3| 23|     M|       writer|  32067|     1994|
|     4| 24|     M|   technician|  43537|     1993|
|     5| 33|     F|        other|  15213|     1984|
|     6| 42|     M|    executive|  98101|     1975|
|     7| 57|     M|administrator|  91344|     1960|
|     8| 36|     M|administrator|  05201|     1981|
|     9| 29|     M|      student|  01002|     1988|
|    10| 53|     M|       lawyer|  90703|     1964|
|    11| 39|     F|        other|  30329|     1978|
|    12| 28|     F|        other|  06405|     1989|
|    13| 47|     M|     educator|  29206|     1970|
|    14| 45|     M|    scientist|  55106|     1972|
|    15| 49|     F|     educator|  97301|     1968|
|    16| 21|     M|entertainment|  10309|     1996|
|    17| 30|

## 2.3 Filter

＊寫法一

In [233]:
# 單一條件
df.filter('gender = "M"').count()

670

In [239]:
# 多個條件
df.filter('gender = "M" and occupation = "technician"').count()

26

In [240]:
df.filter('gender = "M"').filter('occupation = "technician"').count()

26

＊寫法二：

必須使用 &

必須使用 ==

In [241]:
# 單一條件
df.filter(df.gender == 'M').count()

670

In [243]:
# 多個條件
df.filter((df.gender == 'M') & (df.occupation == 'technician')).count()

26

In [244]:
df.filter(df.gender == 'M').filter(df.occupation == 'technician').count()

26

## 2.4 Order By

＊預設為升冪排序

In [246]:
# 預設為升冪排序
df.orderBy("age").show()

+------+---+------+-------------+-------+---------+
|userid|age|gender|   occupation|zipcode|birthyear|
+------+---+------+-------------+-------+---------+
|    30|  7|     M|      student|  55436|     2010|
|   471| 10|     M|      student|  77459|     2007|
|   289| 11|     M|         none|  94619|     2006|
|   142| 13|     M|        other|  48118|     2004|
|   628| 13|     M|         none|  94306|     2004|
|   674| 13|     F|      student|  55337|     2004|
|   609| 13|     F|      student|  55106|     2004|
|   880| 13|     M|      student|  83702|     2004|
|   206| 14|     F|      student|  53115|     2003|
|   813| 14|     F|      student|  02136|     2003|
|   887| 14|     F|      student|  27249|     2003|
|   179| 15|     M|entertainment|  20755|     2002|
|   101| 15|     M|      student|  05146|     2002|
|   281| 15|     F|      student|  06059|     2002|
|   618| 15|     F|      student|  44212|     2002|
|   461| 15|     M|      student|  98102|     2002|
|   849| 15|

In [247]:
df.orderBy(df.age).show()

+------+---+------+-------------+-------+---------+
|userid|age|gender|   occupation|zipcode|birthyear|
+------+---+------+-------------+-------+---------+
|    30|  7|     M|      student|  55436|     2010|
|   471| 10|     M|      student|  77459|     2007|
|   289| 11|     M|         none|  94619|     2006|
|   142| 13|     M|        other|  48118|     2004|
|   628| 13|     M|         none|  94306|     2004|
|   674| 13|     F|      student|  55337|     2004|
|   609| 13|     F|      student|  55106|     2004|
|   880| 13|     M|      student|  83702|     2004|
|   206| 14|     F|      student|  53115|     2003|
|   813| 14|     F|      student|  02136|     2003|
|   887| 14|     F|      student|  27249|     2003|
|   179| 15|     M|entertainment|  20755|     2002|
|   101| 15|     M|      student|  05146|     2002|
|   281| 15|     F|      student|  06059|     2002|
|   618| 15|     F|      student|  44212|     2002|
|   461| 15|     M|      student|  98102|     2002|
|   849| 15|

＊降冪排序

In [249]:
df.orderBy("age", ascending=0).show()

+------+---+------+-------------+-------+---------+
|userid|age|gender|   occupation|zipcode|birthyear|
+------+---+------+-------------+-------+---------+
|   481| 73|     M|      retired|  37771|     1944|
|   803| 70|     M|administrator|  78212|     1947|
|   767| 70|     M|     engineer|  00000|     1947|
|   860| 70|     F|      retired|  48322|     1947|
|   559| 69|     M|    executive|  10022|     1948|
|   585| 69|     M|    librarian|  98501|     1948|
|   349| 68|     M|      retired|  61455|     1949|
|   573| 68|     M|      retired|  48911|     1949|
|   211| 66|     M|     salesman|  32605|     1951|
|   651| 65|     M|      retired|  02903|     1952|
|   564| 65|     M|      retired|  94591|     1952|
|   318| 65|     M|      retired|  06518|     1952|
|   423| 64|     M|        other|  91606|     1953|
|   845| 64|     M|       doctor|  97405|     1953|
|   364| 63|     M|     engineer|  01810|     1954|
|   777| 63|     M|   programmer|  01810|     1954|
|   858| 63|

In [248]:
df.orderBy(df.age.desc()).show()

+------+---+------+-------------+-------+---------+
|userid|age|gender|   occupation|zipcode|birthyear|
+------+---+------+-------------+-------+---------+
|   481| 73|     M|      retired|  37771|     1944|
|   803| 70|     M|administrator|  78212|     1947|
|   767| 70|     M|     engineer|  00000|     1947|
|   860| 70|     F|      retired|  48322|     1947|
|   559| 69|     M|    executive|  10022|     1948|
|   585| 69|     M|    librarian|  98501|     1948|
|   349| 68|     M|      retired|  61455|     1949|
|   573| 68|     M|      retired|  48911|     1949|
|   211| 66|     M|     salesman|  32605|     1951|
|   651| 65|     M|      retired|  02903|     1952|
|   564| 65|     M|      retired|  94591|     1952|
|   318| 65|     M|      retired|  06518|     1952|
|   423| 64|     M|        other|  91606|     1953|
|   845| 64|     M|       doctor|  97405|     1953|
|   364| 63|     M|     engineer|  01810|     1954|
|   777| 63|     M|   programmer|  01810|     1954|
|   858| 63|

＊多欄位排序

In [251]:
df.orderBy(["age", "gender"], ascending=[0,1]).show()

+------+---+------+-------------+-------+---------+
|userid|age|gender|   occupation|zipcode|birthyear|
+------+---+------+-------------+-------+---------+
|   481| 73|     M|      retired|  37771|     1944|
|   860| 70|     F|      retired|  48322|     1947|
|   767| 70|     M|     engineer|  00000|     1947|
|   803| 70|     M|administrator|  78212|     1947|
|   559| 69|     M|    executive|  10022|     1948|
|   585| 69|     M|    librarian|  98501|     1948|
|   349| 68|     M|      retired|  61455|     1949|
|   573| 68|     M|      retired|  48911|     1949|
|   211| 66|     M|     salesman|  32605|     1951|
|   651| 65|     M|      retired|  02903|     1952|
|   564| 65|     M|      retired|  94591|     1952|
|   318| 65|     M|      retired|  06518|     1952|
|   423| 64|     M|        other|  91606|     1953|
|   845| 64|     M|       doctor|  97405|     1953|
|   364| 63|     M|     engineer|  01810|     1954|
|   777| 63|     M|   programmer|  01810|     1954|
|   858| 63|

In [252]:
df.orderBy(df.age.desc(), df.gender).show()

+------+---+------+-------------+-------+---------+
|userid|age|gender|   occupation|zipcode|birthyear|
+------+---+------+-------------+-------+---------+
|   481| 73|     M|      retired|  37771|     1944|
|   860| 70|     F|      retired|  48322|     1947|
|   767| 70|     M|     engineer|  00000|     1947|
|   803| 70|     M|administrator|  78212|     1947|
|   559| 69|     M|    executive|  10022|     1948|
|   585| 69|     M|    librarian|  98501|     1948|
|   349| 68|     M|      retired|  61455|     1949|
|   573| 68|     M|      retired|  48911|     1949|
|   211| 66|     M|     salesman|  32605|     1951|
|   651| 65|     M|      retired|  02903|     1952|
|   564| 65|     M|      retired|  94591|     1952|
|   318| 65|     M|      retired|  06518|     1952|
|   423| 64|     M|        other|  91606|     1953|
|   845| 64|     M|       doctor|  97405|     1953|
|   364| 63|     M|     engineer|  01810|     1954|
|   777| 63|     M|   programmer|  01810|     1954|
|   858| 63|

## 2.5 Distinct

In [253]:
df.select("occupation").distinct().show()

+-------------+
|   occupation|
+-------------+
|    librarian|
|      retired|
|       lawyer|
|         none|
|       writer|
|   programmer|
|    marketing|
|        other|
|    executive|
|    scientist|
|      student|
|     salesman|
|       artist|
|   technician|
|administrator|
|     engineer|
|   healthcare|
|     educator|
|entertainment|
|    homemaker|
+-------------+
only showing top 20 rows



In [254]:
df.select(df.occupation).distinct().show()

+-------------+
|   occupation|
+-------------+
|    librarian|
|      retired|
|       lawyer|
|         none|
|       writer|
|   programmer|
|    marketing|
|        other|
|    executive|
|    scientist|
|      student|
|     salesman|
|       artist|
|   technician|
|administrator|
|     engineer|
|   healthcare|
|     educator|
|entertainment|
|    homemaker|
+-------------+
only showing top 20 rows



In [257]:
df.select("gender", "occupation").distinct().show()

+------+-------------+
|gender|   occupation|
+------+-------------+
|     M|    executive|
|     M|     educator|
|     F|         none|
|     F|entertainment|
|     F|      retired|
|     F|       artist|
|     F|    librarian|
|     F|     engineer|
|     F|   healthcare|
|     F|administrator|
|     M|        other|
|     M|    homemaker|
|     F|       lawyer|
|     M|   programmer|
|     M|     salesman|
|     M|         none|
|     M|entertainment|
|     M|    marketing|
|     M|   technician|
|     M|administrator|
+------+-------------+
only showing top 20 rows



## 2.6 Group By

In [269]:
#df.groupby("gender").count().show()
#df.groupby(df.gender).count().show()

In [263]:
df.groupBy("gender").count().show()
#df.groupBy(df.gender).count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|  273|
|     M|  670|
+------+-----+



In [268]:
df.groupBy("gender", "occupation")\
    .count()\
    .orderBy("occupation", "gender")\
    .show()

+------+-------------+-----+
|gender|   occupation|count|
+------+-------------+-----+
|     F|administrator|   36|
|     M|administrator|   43|
|     F|       artist|   13|
|     M|       artist|   15|
|     M|       doctor|    7|
|     F|     educator|   26|
|     M|     educator|   69|
|     F|     engineer|    2|
|     M|     engineer|   65|
|     F|entertainment|    2|
|     M|entertainment|   16|
|     F|    executive|    3|
|     M|    executive|   29|
|     F|   healthcare|   11|
|     M|   healthcare|    5|
|     F|    homemaker|    6|
|     M|    homemaker|    1|
|     F|       lawyer|    2|
|     M|       lawyer|   10|
|     F|    librarian|   29|
+------+-------------+-----+
only showing top 20 rows



In [272]:
df.select("occupation", "age")\
    .groupBy("occupation")\
    .avg()\
    .show()

+-------------+------------------+
|   occupation|          avg(age)|
+-------------+------------------+
|    librarian|              40.0|
|      retired| 63.07142857142857|
|       lawyer|             36.75|
|         none|26.555555555555557|
|       writer| 36.31111111111111|
|   programmer|33.121212121212125|
|    marketing| 37.61538461538461|
|        other|34.523809523809526|
|    executive|          38.71875|
|    scientist| 35.54838709677419|
|      student|22.081632653061224|
|     salesman|35.666666666666664|
|       artist|31.392857142857142|
|   technician|33.148148148148145|
|administrator| 38.74683544303797|
|     engineer| 36.38805970149254|
|   healthcare|           41.5625|
|     educator| 42.01052631578948|
|entertainment| 29.22222222222222|
|    homemaker| 32.57142857142857|
+-------------+------------------+
only showing top 20 rows



## 2.7 Join

In [295]:
zipcode_df = pd.read_csv("data/free-zipcode-database-Primary.csv")

In [300]:
zipcode_df = zipcode_df[["City", "State", "ZipCodeType", "Zipcode"]]

In [301]:
zipcode = sqlContext.createDataFrame(zipcode_df)

In [303]:
zipcode.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)



In [311]:
df_join = df.join(zipcode, "zipcode", "left_outer")

In [314]:
#df_join.printSchema()

In [315]:
#df_join.show()

# 3.  Spark DataFrame Actions

In [317]:
df_join.show()

+-------+------+---+------+-------------+---------+-------------+-----+-----------+
|zipcode|userid|age|gender|   occupation|birthyear|         City|State|ZipCodeType|
+-------+------+---+------+-------------+---------+-------------+-----+-----------+
|  21911|   175| 26|     F|    scientist|     1991|   RISING SUN|   MD|   STANDARD|
|  80521|    66| 23|     M|      student|     1994| FORT COLLINS|   CO|   STANDARD|
|  12550|    48| 45|     M|administrator|     1972|     NEWBURGH|   NY|   STANDARD|
|  48043|   636| 47|     M|     educator|     1970|MOUNT CLEMENS|   MI|   STANDARD|
|  91505|   896| 28|     M|       writer|     1989|      BURBANK|   CA|   STANDARD|
|  98405|   732| 28|     F|        other|     1989|       TACOMA|   WA|   STANDARD|
|  10960|   204| 52|     F|    librarian|     1965|        NYACK|   NY|   STANDARD|
|  10960|   766| 42|     M|        other|     1975|        NYACK|   NY|   STANDARD|
|  14211|   760| 35|     F|        other|     1982|      BUFFALO|   NY|   ST

In [318]:
df_join.take(1)

[Row(zipcode=u'21911', userid=175, age=26, gender=u'F', occupation=u'scientist', birthyear=1991, City=u'RISING SUN', State=u'MD', ZipCodeType=u'STANDARD')]

In [321]:
#df_join.collect()

In [320]:
df_join.count()

943

# 4.  Persist 持久化

In [322]:
# The default storage level has changed to MEMORY_AND_DISK to match Scala in 2.0.
df_join.persist()

DataFrame[zipcode: string, userid: int, age: int, gender: string, occupation: string, birthyear: int, City: string, State: string, ZipCodeType: string]

In [337]:
# Persists the DataFrame with the default storage level (MEMORY_AND_DISK).
df_join.cache()

DataFrame[zipcode: string, userid: int, age: int, gender: string, occupation: string, birthyear: int, City: string, State: string, ZipCodeType: string]

In [323]:
# 取消持久化
df_join.unpersist()

DataFrame[zipcode: string, userid: int, age: int, gender: string, occupation: string, birthyear: int, City: string, State: string, ZipCodeType: string]

# 5.  Save Data

In [325]:
# parquet file
df_join.repartition(1).write.save("result/join_parquet")

In [326]:
# csv file
df_join.repartition(1).write.csv("result/join_csv")

In [328]:
# json file
df_join.repartition(1).write.json("result/join_json")

In [330]:
# Hive table
df_join.repartition(1).write.partitionBy("occupation").mode("overwrite").format("parquet").saveAsTable('DB.table')

In [331]:
# 將 spark DF 轉為 pandas DF
df_join.toPandas()

Unnamed: 0,zipcode,userid,age,gender,occupation,birthyear,City,State,ZipCodeType
0,21911,175,26,F,scientist,1991,RISING SUN,MD,STANDARD
1,80521,66,23,M,student,1994,FORT COLLINS,CO,STANDARD
2,12550,48,45,M,administrator,1972,NEWBURGH,NY,STANDARD
3,48043,636,47,M,educator,1970,MOUNT CLEMENS,MI,STANDARD
4,91505,896,28,M,writer,1989,BURBANK,CA,STANDARD
5,98405,732,28,F,other,1989,TACOMA,WA,STANDARD
6,10960,204,52,F,librarian,1965,NYACK,NY,STANDARD
7,10960,766,42,M,other,1975,NYACK,NY,STANDARD
8,14211,760,35,F,other,1982,BUFFALO,NY,STANDARD
9,62903,764,27,F,educator,1990,CARBONDALE,IL,STANDARD


# 6.  Register Temp Table

## Spark SQL

Spark SQL是由Spark DF衍生而來的，必須先建立Spark DF，然後透過登錄Spark SQL temp table，就可以使用Spark SQL語法。

In [332]:
df_join.registerTempTable("join_table")

In [333]:
sqlContext.sql("""
SELECT  *
FROM    join_table
LIMIT   5
""").show()

+-------+------+---+------+-------------+---------+-------------+-----+-----------+
|zipcode|userid|age|gender|   occupation|birthyear|         City|State|ZipCodeType|
+-------+------+---+------+-------------+---------+-------------+-----+-----------+
|  21911|   175| 26|     F|    scientist|     1991|   RISING SUN|   MD|   STANDARD|
|  80521|    66| 23|     M|      student|     1994| FORT COLLINS|   CO|   STANDARD|
|  12550|    48| 45|     M|administrator|     1972|     NEWBURGH|   NY|   STANDARD|
|  48043|   636| 47|     M|     educator|     1970|MOUNT CLEMENS|   MI|   STANDARD|
|  91505|   896| 28|     M|       writer|     1989|      BURBANK|   CA|   STANDARD|
+-------+------+---+------+-------------+---------+-------------+-----+-----------+



In [334]:
query_result = sqlContext.sql("""
                                SELECT  *
                                FROM    join_table
                                LIMIT   5
                              """)

In [335]:
type(query_result)

pyspark.sql.dataframe.DataFrame