# 4.2 用 Pyspark 建立第一個RDD

In [1]:
from __future__ import print_function, division

## import pyspark

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

## 啟動 spark

In [3]:
spark = SparkSession.builder.master("local") \
   .appName("test") \
   .enableHiveSupport() \
   .getOrCreate()

sc = spark.sparkContext

## Part1. Create a RDD from sparkContext

In [4]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print(type(wordsRDD))


<class 'pyspark.rdd.RDD'>


In [5]:
wordsRDD.collect()

['cat', 'elephant', 'rat', 'rat', 'cat']

In [10]:
wordsRDD.take(5)

['cat', 'elephant', 'rat', 'rat', 'cat']

## Part2 Create a Dataframe from hdfs

## put data into HDFS

In [7]:
!ls ../data

[31mNASA_access_log_Jul95[m[m     [31mkmeans_data.txt[m[m           [31mshakespear.txt[m[m
[31mNASA_access_log_Jul95_100[m[m [31mratings.csv[m[m               [31mtitanic_test.csv[m[m
[31mjson_example.json[m[m         [31msample_libsvm_data.txt[m[m    [31mtitanic_train.csv[m[m


In [8]:
!head ../data/NASA_access_log_Jul95_100

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0
205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985
d104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] "GET / H

In [9]:
!hadoop fs -ls /

/bin/sh: hadoop: command not found


In [18]:
!hadoop fs -put ../data/NASA_access_log_Jul95_100 /tmp

put: `/tmp/NASA_access_log_Jul95_100': File exists


In [19]:
!hadoop fs -ls /tmp

Found 9 items
-rw-r--r--   3 root supergroup      10851 2018-11-05 12:51 /tmp/NASA_access_log_Jul95_100
drwx-wx-wx   - root supergroup          0 2018-10-31 11:04 /tmp/hive
-rw-r--r--   3 root supergroup        188 2018-11-05 12:51 /tmp/json_example.json
-rw-r--r--   3 root supergroup       4550 2018-11-05 12:51 /tmp/kmeans_data.txt
-rw-r--r--   3 root supergroup        561 2018-11-05 12:51 /tmp/ratings.csv
-rw-r--r--   3 root supergroup     104736 2018-11-05 12:51 /tmp/sample_libsvm_data.txt
-rw-r--r--   3 root supergroup       2700 2018-11-05 12:51 /tmp/shakespear.txt
-rw-r--r--   3 root supergroup      28629 2018-11-05 12:51 /tmp/titanic_test.csv
-rw-r--r--   3 root supergroup      61194 2018-11-05 12:51 /tmp/titanic_train.csv


### 從 HDFS 中讀取資料

In [None]:
textFromHDFS = spark.read.text("hdfs://10.211.55.100:9000/tmp/NASA_access_log_Jul95_100")

In [22]:
print(type(textFromHDFS))

<class 'pyspark.sql.dataframe.DataFrame'>


In [24]:
textFromHDFS.head(10)

[Row(value=u'199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245'),
 Row(value=u'unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985'),
 Row(value=u'199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085'),
 Row(value=u'burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0'),
 Row(value=u'199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179'),
 Row(value=u'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0'),
 Row(value=u'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0'),
 Row(value=u'205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985'),
 Row(value=u'd104.aa.net -

In [25]:
textFromHDFS.show()

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
|burger.letters.co...|
|burger.letters.co...|
|205.212.115.106 -...|
|d104.aa.net - - [...|
|129.94.144.152 - ...|
|unicomp6.unicomp....|
|unicomp6.unicomp....|
|unicomp6.unicomp....|
|d104.aa.net - - [...|
|d104.aa.net - - [...|
|d104.aa.net - - [...|
|129.94.144.152 - ...|
|199.120.110.21 - ...|
|ppptky391.asahi-n...|
|net-1-141.eden.co...|
+--------------------+
only showing top 20 rows



In [13]:
textFromHDFS1 = spark.read.text("../data/NASA_access_log_Jul95_100")

In [14]:
textFromHDFS1.show()

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
|burger.letters.co...|
|burger.letters.co...|
|205.212.115.106 -...|
|d104.aa.net - - [...|
|129.94.144.152 - ...|
|unicomp6.unicomp....|
|unicomp6.unicomp....|
|unicomp6.unicomp....|
|d104.aa.net - - [...|
|d104.aa.net - - [...|
|d104.aa.net - - [...|
|129.94.144.152 - ...|
|199.120.110.21 - ...|
|ppptky391.asahi-n...|
|net-1-141.eden.co...|
+--------------------+
only showing top 20 rows



## Part3 Read csv format

In [26]:
!hadoop fs -tail /tmp/ratings.csv

userid,movieid,rating,ts
3,6539,5,1133571238
3,7153,4,1133571171
3,7155,3.5,1164885564
3,8529,4,1136075616
3,8533,4.5,1136418593
3,8783,5,1136075857
3,27821,4.5,1136418616
3,33750,3.5,1164885688
3,33750,3.5,1164887688
3,344,,844416742
4,21,3,844416980
4,34,5,844416936
4,39,3,844417037
4,110,5,844416866
4,150,5,844416656
4,153,5,844416699
4,161,5,844416835
4,165,5,844416699
4,208,3,844416866
4,231,1,844416742
4,253,3,844416834
4,266,5,844417070
4,292,3,844416796
4,316,5,844416742
4,317,5,844417037
4,329,5,844416796
4,344,2,844416699
4,349,3,844416699
4,,,


In [27]:
path = "hdfs://10.211.55.100:9000/tmp/ratings.csv"
schema = None 
sep = None
header = True

In [28]:
csvDF = spark.read.csv(path = path, schema = schema, sep = sep, header = header)

In [29]:
print(type(csvDF))

<class 'pyspark.sql.dataframe.DataFrame'>


In [30]:
print(csvDF)

DataFrame[userid: string, movieid: string, rating: string, ts: string]


In [33]:
csvDF.head(5)

[Row(userid=u'3', movieid=u'6539', rating=u'5', ts=u'1133571238'),
 Row(userid=u'3', movieid=u'7153', rating=u'4', ts=u'1133571171'),
 Row(userid=u'3', movieid=u'7155', rating=u'3.5', ts=u'1164885564'),
 Row(userid=u'3', movieid=u'8529', rating=u'4', ts=u'1136075616'),
 Row(userid=u'3', movieid=u'8533', rating=u'4.5', ts=u'1136418593')]

In [34]:
csvDF.show()

+------+-------+------+----------+
|userid|movieid|rating|        ts|
+------+-------+------+----------+
|     3|   6539|     5|1133571238|
|     3|   7153|     4|1133571171|
|     3|   7155|   3.5|1164885564|
|     3|   8529|     4|1136075616|
|     3|   8533|   4.5|1136418593|
|     3|   8783|     5|1136075857|
|     3|  27821|   4.5|1136418616|
|     3|  33750|   3.5|1164885688|
|     3|  33750|   3.5|1164887688|
|     3|    344|  null| 844416742|
|     4|     21|     3| 844416980|
|     4|     34|     5| 844416936|
|     4|     39|     3| 844417037|
|     4|    110|     5| 844416866|
|     4|    150|     5| 844416656|
|     4|    153|     5| 844416699|
|     4|    161|     5| 844416835|
|     4|    165|     5| 844416699|
|     4|    208|     3| 844416866|
|     4|    231|     1| 844416742|
+------+-------+------+----------+
only showing top 20 rows



In [18]:
csvDF1 = spark.read.csv(path = "../data/ratings.csv", schema = None, sep = None, header = True)

In [19]:
csvDF1.show()

+------+-------+------+----------+
|userid|movieid|rating|        ts|
+------+-------+------+----------+
|     3|   6539|     5|1133571238|
|     3|   7153|     4|1133571171|
|     3|   7155|   3.5|1164885564|
|     3|   8529|     4|1136075616|
|     3|   8533|   4.5|1136418593|
|     3|   8783|     5|1136075857|
|     3|  27821|   4.5|1136418616|
|     3|  33750|   3.5|1164885688|
|     3|  33750|   3.5|1164887688|
|     3|    344|  null| 844416742|
|     4|     21|     3| 844416980|
|     4|     34|     5| 844416936|
|     4|     39|     3| 844417037|
|     4|    110|     5| 844416866|
|     4|    150|     5| 844416656|
|     4|    153|     5| 844416699|
|     4|    161|     5| 844416835|
|     4|    165|     5| 844416699|
|     4|    208|     3| 844416866|
|     4|    231|     1| 844416742|
+------+-------+------+----------+
only showing top 20 rows



### comapre with read.text

In [35]:
textDF = spark.read.text(paths = path)

In [36]:
textDF

DataFrame[value: string]

In [38]:
textDF.head()

Row(value=u'userid,movieid,rating,ts')

In [39]:
textDF.show()

+--------------------+
|               value|
+--------------------+
|userid,movieid,ra...|
| 3,6539,5,1133571238|
| 3,7153,4,1133571171|
|3,7155,3.5,116488...|
| 3,8529,4,1136075616|
|3,8533,4.5,113641...|
| 3,8783,5,1136075857|
|3,27821,4.5,11364...|
|3,33750,3.5,11648...|
|3,33750,3.5,11648...|
|    3,344,,844416742|
|    4,21,3,844416980|
|    4,34,5,844416936|
|    4,39,3,844417037|
|   4,110,5,844416866|
|   4,150,5,844416656|
|   4,153,5,844416699|
|   4,161,5,844416835|
|   4,165,5,844416699|
|   4,208,3,844416866|
+--------------------+
only showing top 20 rows



## Part4. Read Json file

In [46]:
!head ../data/json_example.json

{"userid": '1', "rating": 4, "movieid": '001'}
{"userid": '1', "rating": 3, "movieid": '002'}
{"userid": '2', "movieid": '001', "rating": 4}
{"userid": '2', "movieid": '003', "rating": 2}


In [40]:
jsonDF = spark.read.json('hdfs://10.211.55.100:9000/tmp/json_example.json')

In [41]:
jsonDF

DataFrame[movieid: string, rating: bigint, userid: string]

In [44]:
jsonDF.head(5)

[Row(movieid=u'001', rating=4, userid=u'1'),
 Row(movieid=u'002', rating=3, userid=u'1'),
 Row(movieid=u'001', rating=4, userid=u'2'),
 Row(movieid=u'003', rating=2, userid=u'2')]

In [45]:
jsonDF.show()

+-------+------+------+
|movieid|rating|userid|
+-------+------+------+
|    001|     4|     1|
|    002|     3|     1|
|    001|     4|     2|
|    003|     2|     2|
+-------+------+------+



## Part5. RDD 與 DataFrame 的轉換

In [47]:
jsonRDD = jsonDF.rdd

In [48]:
print(type(jsonDF))
print(type(jsonRDD))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.rdd.RDD'>


In [49]:
jsonRDD.head()

AttributeError: 'RDD' object has no attribute 'head'

In [51]:
jsonRDD.take(5)

[Row(movieid=u'001', rating=4, userid=u'1'),
 Row(movieid=u'002', rating=3, userid=u'1'),
 Row(movieid=u'001', rating=4, userid=u'2'),
 Row(movieid=u'003', rating=2, userid=u'2')]

In [52]:
jsonDF2 = spark.createDataFrame(jsonRDD)

In [53]:
type(jsonDF2)

pyspark.sql.dataframe.DataFrame

In [54]:
jsonDF2.head()

Row(movieid=u'001', rating=4, userid=u'1')

In [55]:
jsonDF2.show()

+-------+------+------+
|movieid|rating|userid|
+-------+------+------+
|    001|     4|     1|
|    002|     3|     1|
|    001|     4|     2|
|    003|     2|     2|
+-------+------+------+



In [56]:
wordsRDD.collect()

['cat', 'elephant', 'rat', 'rat', 'cat']

In [57]:
wordsDF = spark.createDataFrame(wordsRDD)  # 不能转换因为dataframe要求的是有行有列的格式

TypeError: Can not infer schema for type: <type 'str'>

In [58]:
wordsRDD2 = sc.parallelize([wordsList])

In [59]:
wordsRDD2.collect()

[['cat', 'elephant', 'rat', 'rat', 'cat']]

In [60]:
wordsDF2 = spark.createDataFrame(wordsRDD2)

In [62]:
wordsDF2.head()

[Row(_1=u'cat', _2=u'elephant', _3=u'rat', _4=u'rat', _5=u'cat')]

In [63]:
wordsDF2.show()

+---+--------+---+---+---+
| _1|      _2| _3| _4| _5|
+---+--------+---+---+---+
|cat|elephant|rat|rat|cat|
+---+--------+---+---+---+



In [64]:
wordsRDD3 = sc.parallelize([[i] for i in wordsList])

In [68]:
wordsRDD3.collect()

[['cat'], ['elephant'], ['rat'], ['rat'], ['cat']]

In [66]:
wordsDF3 = spark.createDataFrame(wordsRDD3)

In [67]:
wordsDF3.show()

+--------+
|      _1|
+--------+
|     cat|
|elephant|
|     rat|
|     rat|
|     cat|
+--------+

