# 4.2 用 Pyspark 建立第一個RDD

In [1]:
from __future__ import print_function, division

## import pyspark

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

## 啟動 spark

In [3]:
spark = SparkSession.builder.master("local") \
   .appName("test") \
   .enableHiveSupport() \
   .getOrCreate()

sc = spark.sparkContext

## Part1. Create a RDD from sparkContext

In [112]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print(type(wordsRDD))


<class 'pyspark.rdd.RDD'>


## Part2 Create a Dataframe from hdfs

## put data into HDFS

In [2]:
!ls ../data

NASA_access_log_Jul95_100


In [5]:
!head ../data/NASA_access_log_Jul95_100

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0
205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985
d104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] "GET / H

In [6]:
!hadoop fs -ls /

Found 2 items
drwx-wx-wx   - vagrant supergroup          0 2017-08-25 08:22 /tmp
drwxr-xr-x   - vagrant supergroup          0 2017-08-25 08:22 /user


In [9]:
!hadoop fs -put ../data/NASA_access_log_Jul95_100 /tmp

put: `/tmp/NASA_access_log_Jul95_100': File exists


In [16]:
!hadoop fs -ls /tmp

Found 2 items
-rw-r--r--   3 vagrant supergroup      10851 2017-09-25 03:44 /tmp/NASA_access_log_Jul95_100
drwx-wx-wx   - vagrant supergroup          0 2017-08-25 08:22 /tmp/hive


### 從 HDFS 中讀取資料

In [96]:
textFromHDFS = spark.read.text("hdfs:///tmp/NASA_access_log_Jul95_100")

In [95]:
print(type(textFromHDFS))

<class 'pyspark.sql.dataframe.DataFrame'>


In [97]:
textFromHDFS.head()

Row(value=u'199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245')

## Part3 Read csv format

In [26]:
!hadoop fs -tail /tmp/ratings.csv

userid,movieid,rating,ts
3,6539,5,1133571238
3,7153,4,1133571171
3,7155,3.5,1164885564
3,8529,4,1136075616
3,8533,4.5,1136418593
3,8783,5,1136075857
3,27821,4.5,1136418616
3,33750,3.5,1164885688
4,21,3,844416980
4,34,5,844416936
4,39,3,844417037
4,110,5,844416866
4,150,5,844416656
4,153,5,844416699
4,161,5,844416835
4,165,5,844416699
4,208,3,844416866
4,231,1,844416742
4,253,3,844416834
4,266,5,844417070
4,292,3,844416796
4,316,5,844416742
4,317,5,844417037
4,329,5,844416796
4,344,2,844416699
4,349,3,844416699


In [98]:
path = "hdfs:///tmp/ratings.csv"
schema = None 
sep = None
header = True

In [99]:
csvDF = spark.read.csv(path = path, schema = schema, sep = sep, header = header)

In [100]:
print(type(csvDF))

<class 'pyspark.sql.dataframe.DataFrame'>


In [101]:
print(csvDF)

DataFrame[userid: string, movieid: string, rating: string, ts: string]


In [102]:
csvDF.head()

Row(userid=u'3', movieid=u'6539', rating=u'5', ts=u'1133571238')

### comapre with read.text

In [104]:
textDF = spark.read.text(paths = path)

In [105]:
textDF

DataFrame[value: string]

In [106]:
textDF.head()

Row(value=u'userid,movieid,rating,ts')

## Part4. Read Json file

In [108]:
jsonDF = spark.read.json('hdfs:///tmp/json_example.json')

In [109]:
jsonDF

DataFrame[movieid: string, rating: bigint, userid: string]

In [110]:
jsonDF.head()

Row(movieid=u'001', rating=4, userid=u'1')

## Part5. RDD 與 DataFrame 的轉換

In [116]:
jsonRDD = jsonDF.rdd

In [124]:
print(type(jsonDF))
print(type(jsonRDD))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.rdd.RDD'>


In [125]:
jsonRDD.head()

AttributeError: 'RDD' object has no attribute 'head'

In [126]:
jsonRDD.take(1)

[Row(movieid=u'001', rating=4, userid=u'1')]

In [129]:
jsonDF2 = spark.createDataFrame(jsonRDD)

In [123]:
type(jsonDF2)

pyspark.sql.dataframe.DataFrame

In [130]:
jsonDF2.head()

Row(movieid=u'001', rating=4, userid=u'1')