In [1]:
import os
import subprocess
def module(*args):        
    if isinstance(args[0], list):        
        args = args[0]        
    else:        
        args = list(args)        
    (output, error) = subprocess.Popen(['/usr/bin/modulecmd', 'python'] + args, stdout=subprocess.PIPE).communicate()
    exec(output)    
module('load', 'apps/java/jdk1.8.0_102/binary')    
os.environ['PYSPARK_PYTHON'] = os.environ['HOME'] + '/.conda/envs/jupyter-spark/bin/python'

In [2]:
import pyspark

from pyspark.sql import SparkSession
# run Spark locally with K worker threads (ideally set to number of cores)
spark = SparkSession.builder.master("local[2]").appName("COM6012 Spark Intro").getOrCreate()

sc = spark.sparkContext

In [3]:
sc

## Spark:

## 1. 结构光谱：

<img src="image_github/data_spectrum.png" width="400" height="400">

* 结构性数据：数据库

* 半结构数据：XML，JSON

* 无结构数据：语言等

## 2. Spark：

### 2.1 Spark 与 Hadoop 区别和联系：

首先，两者均为大数据框架，但是解决问题的层面不同。Hadoop更类似于分布式基础设施，而spark更像是一种处理大数据的软件（类似于计算机与操作系统的关系）。因此，可以说spark基于Hadoop。当然，spark也可以运行在除此之外的数据系统平台。其次，spark处理数据速度更快。Spark多个任务之间数据通信是基于内存，而Hadoop是基于磁盘。

### 2.2 RDD 与 Dataframe：

如图所示，左侧为RDD结构右侧为Dataframe结构。相比于RDD，Dataframe为高层次的抽象，带有schema，并且dataframe带有更丰富的sql函数可以简化操作。但是，有些时候也需要将Dataframe转换为RDD。

<img src="image_github/RDD_DF.png" width="400" height="400">

### 2.3 Spark Components：

spark项目首先创建SparkContext，通过其与Spark集群相连，可以理解为一个SparkContext就是一个spark application的生命周期，SparkContext连接Cluster manager，其分配资源给Spark executor，从而进行计算，获取数据等操作。
<img src="image_github/manager.png" width="500" height="500">

### 2.4 Spark 集群角色介绍：

**Master和Worker是Spark的守护进程（系统进程），即Spark在特定模式下正常运行所必须的进程。Driver和Executor是临时进程（用户进程），当有具体任务提交到Spark集群才会开启的进程。**

* Master：Spark特有资源调度系统的Leader。掌管着整个集群的资源信息。Master用来监听并管理所有的Worker，看Worker是否正常工作。


* Worker：Spark特有资源调度系统的Slave，有多个。每个Slave掌管着所在节点的资源信息。


* Driver：Spark的驱动器是执行开发程序中的main方法的进程。它负责开发人员编写的用来创建SparkContext、创建RDD，以及进行RDD的转化操作和行动操作代码的执行。（用户进程）


* Executor：Spark Executor是一个工作进程，负责在 Spark 作业中运行任务，任务间相互独立。Spark 应用启动时，Executor节点被同时启动，并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃，Spark 应用也可以继续执行，会将出错节点上的任务调度到其他Executor节点上继续运行。

<img src="image_github/cluster_example.png" width="400" height="400">

### 2.5 Spark三种分布式部署模式：

* Standalone：独立模式，Spark 原生的简单集群管理器， 自带完整的服务， 可单独部署到一个集群中，无需依赖任何其他资源管理系统， 使用 Standalone 可以很方便地搭建一个集群，一般在公司内部没有搭建其他资源管理框架的时候才会使用。
<img src="image_github/standalone.png" width="600" height="600">

* YARN：统一的资源管理机制， 在上面可以运行多套计算框架， 如map reduce、storm 等， 根据 driver 在集群中的位置不同，分为 yarn client 和 yarn cluster。**yarn client：Driver程序运行在客户端，适用于交互、调试，希望立即看到app的输出。yarn cluster：Driver 程序运行在由ResourceManager 启动的 APPMaster 适用于生产环境。分担压力不会拖垮某个节点。**

<img src="image_github/yarn.png" width="600" height="600">

提交任务，App Submit； RM选择一个NM启动AM，AM来启动Driver（即初始化sc），yarn的cluster模式SparkAppMaster（用来申请资源，启动driver）和SparkContext在一个进程里边。AM（SparkAppMaster）向RM申请启动Executor；（默认情况下一个节点启一个executor这样子负载比较均衡，也可以启两个）。

* Mesos：

## 3. RDD：

### 3.1 Resilient Distributed Datasets 弹性分布式数据集：

RDD 是 Spark 提供的最重要的抽象概念，它是一种有容错机制的特殊数据集合，可以分布在集群的结点上，以函数式操作集合的方式进行各种并行操作。
<img src="image_github/RDD.png" width="400" height="400">

RDD 特性：

1. 基于内存 In-Memory：可以全部或部分缓存在内存中，在多次计算间重用。

2. 只读 Immutable：不能修改，只能通过转换操作生成新的 RDD。

3. 分区 Partition：一个RDD分为多个分区，每个分区可存储在相同或不同的节点中，如上图所示。**（分布式）**

4. 容错 Fault-tolerant：RDD的每次转换都会生成一个新的RDD，所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时，Spark可以通过这个依赖关系重新计算丢失的分区数据，而不是对RDD的所有分区进行重新计算。**（弹性）**

### 3.2 RDD 操作：

**transformation（lazy evaluation）and action：创建新的数据集但是不会进行计算，记住中间转换的步骤，当遇到action时，才会真正计算。**

<img src="image_github/transformation.png" width="500" height="500">
<img src="image_github/action.png" width="500" height="500">
<img src="image_github/RDD_step.png" width="500" height="500">

**persistence/cache：将数据存储到内存中。**

**key-values RDD：**
<img src="image_github/RDD_key_value.png" width="500" height="500">

### 3.3 共享变量：

* 广播变量 Broadcast variables
* 累加器 Accumulator

## 4. Dataframe：

<img src="image_github/speed.png" width="500" height="500">

## 5. Python：

### 5.1 创建 RDD 与 Dataframe：

#### 创建 RDD：

1. 并行集合(Parallelized Collections): 从内存里直接读取数据。
2. 文件系统数据集: Hadoop Datasets 或文本文件,比如通过SparkContext.textFile()读取的数据。

#### 创建 Dataframe：spark.read.text()  与 spark.read.load()

NASA Aug95 数据格式：

<img src="image_github/NASA-format.png" width="500" height="400">

Advertising 数据格式：

<img src="image_github/ad_format.png" width="300" height="200">

In [4]:
# create RDD.
nums = sc.parallelize([(1,2,3),(4,5,6),(7,8,9)])
print(type(nums))
# create RDD.
logFile_rdd=sc.textFile("../Data/NASA_Aug95_100.txt")
print(type(logFile_rdd))

<class 'pyspark.rdd.RDD'>
<class 'pyspark.rdd.RDD'>


In [5]:
# create DataFrame.
logFile_df=spark.read.text("../Data/NASA_Aug95_100.txt") # from .txt
advertise_df = spark.read.load("../Data/Advertising.csv", format="csv", inferSchema="true", header="true") # from .csv
print(type(logFile_df))
print(type(advertise_df))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


### 5.2 展示 RDD 与 Dataframe：

#### RDD中collect函数：

**此方法可以将RDD类型的数据转化为数组**，同时会从远程集群是拉取数据到driver端。若放在分布式环境下运行，一次collect操作会将分布式各个节点上的数据汇聚到一个driver节点上，而这么一来，后续所执行的运算和操作就会脱离这个分布式环境而相当于单机环境下运行。并且，**将大量数据汇集到一个driver节点上，将数据用数组存放，占用了jvm堆内存，会导致内存溢出。可以使用take(n)。**

#### Dataframe中show()与describe().show()

In [6]:
logFile_rdd.take(3)

['in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839',
 'uplherc.upl.com - - [01/Aug/1995:00:00:07 -0400] "GET / HTTP/1.0" 304 0',
 'uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 304 0']

In [7]:
logFile_df.show(10)

+--------------------+
|               value|
+--------------------+
|in24.inetnebr.com...|
|uplherc.upl.com -...|
|uplherc.upl.com -...|
|uplherc.upl.com -...|
|uplherc.upl.com -...|
|ix-esc-ca2-07.ix....|
|uplherc.upl.com -...|
|slppp6.intermind....|
|piweba4y.prodigy....|
|slppp6.intermind....|
+--------------------+
only showing top 10 rows



In [8]:
advertise_df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+
|summary|               _c0|               TV|             radio|         newspaper|             sales|
+-------+------------------+-----------------+------------------+------------------+------------------+
|  count|               200|              200|               200|               200|               200|
|   mean|             100.5|         147.0425|23.264000000000024|30.553999999999995|14.022500000000003|
| stddev|57.879184513951124|85.85423631490805|14.846809176168728| 21.77862083852283| 5.217456565710477|
|    min|                 1|              0.7|               0.0|               0.3|               1.6|
|    max|               200|            296.4|              49.6|             114.0|              27.0|
+-------+------------------+-----------------+------------------+------------------+------------------+



### 5.3 RDD 与 Dataframe 相互转换：.toDF() 与 .rdd

In [9]:
# RDD to DF.
df1 = nums.toDF(["a", "b", "c"])
df1.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+



In [10]:
df1.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)
 |-- c: long (nullable = true)



In [11]:
# DF to RDD.
rdd1 = logFile_df.rdd
type(rdd1)

pyspark.rdd.RDD

## 6. 练习：

**Note**: Jupyther notebook is primarily for interactive learning that is more suitable on small scale data. For large-scale (big) data, stand-alone programs or HPC batch jobs are encouraged. 
* Load the NASA access log Aug95 data in Session 1 and create a DataFrame with FIVE columns by **specifying** the schema 
* Perform some more challenging mining tasks in Session 1 using *as many DataFrame functions as possible*
   * How many **unique** hosts on a particular day (e.g., 15th August)?
   * How many **unique** hosts in total (i.e., in August 1995)?
   * Which host is the most frequent visitor?
   * How many different types of return codes?
   * How many requests per day on average?
   * How many requests per post on average?
   * Any other question that you are interested in.
* Explore more CSV data of your interest via Google or at [Sample CSV data](https://support.spatialkey.com/spatialkey-sample-csv-data/), including insurance, real estate, and sales transactions.

In [12]:
logFile=spark.read.text("../Data/NASA_access_log_Aug95.gz")

In [13]:
logFile.show(5, False)

+--------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                     |
+--------------------------------------------------------------------------------------------------------------------------+
|in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839|
|uplherc.upl.com - - [01/Aug/1995:00:00:07 -0400] "GET / HTTP/1.0" 304 0                                                   |
|uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 304 0                          |
|uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 304 0                        |
|uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 304 0                           |


**正则表达式-分组或特征标群：**
<img src="image_github/regex.png" width="600" height="500">

In [14]:
sample_logs = [item['value'] for item in logFile.take(15)]

In [16]:
import re
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups() for item in sample_logs]
method_uri_protocol

[('GET', '/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', 'HTTP/1.0'),
 ('GET', '/', 'HTTP/1.0'),
 ('GET', '/images/ksclogo-medium.gif', 'HTTP/1.0'),
 ('GET', '/images/MOSAIC-logosmall.gif', 'HTTP/1.0'),
 ('GET', '/images/USA-logosmall.gif', 'HTTP/1.0'),
 ('GET', '/images/launch-logo.gif', 'HTTP/1.0'),
 ('GET', '/images/WORLD-logosmall.gif', 'HTTP/1.0'),
 ('GET', '/history/skylab/skylab.html', 'HTTP/1.0'),
 ('GET', '/images/launchmedium.gif', 'HTTP/1.0'),
 ('GET', '/history/skylab/skylab-small.gif', 'HTTP/1.0'),
 ('GET', '/images/ksclogosmall.gif', 'HTTP/1.0'),
 ('GET', '/history/apollo/images/apollo-logo1.gif', 'HTTP/1.0'),
 ('GET', '/history/apollo/images/apollo-logo.gif', 'HTTP/1.0'),
 ('GET', '/images/NASA-logosmall.gif', 'HTTP/1.0'),
 ('GET', '/shuttle/missions/sts-69/mission-sts-69.html', 'HTTP/1.0')]

In [17]:
from pyspark.sql.functions import regexp_extract

host_pattern = r'(^\S+\.[\S+\.]+\S+)\s' # "\" escape character. "S" match everyting rather than space.
ts_date_pattern = r'(\d{2}/\w{3}/\d{4})' # \d number.\w word. {} times.
ts_time_pattern = r'(:\d{2}:\d{2}:\d{2})'
method_url_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
status_pattern = r'\s(\d{3})\s'
content_size_pattern = r'\s(\d+)$'

df = logFile.select(regexp_extract('value', host_pattern, 1).alias('host'),
               regexp_extract('value', ts_date_pattern, 1).alias('date'),  
               regexp_extract('value', ts_time_pattern, 1).alias('time_str'),  
               regexp_extract('value', method_url_protocol_pattern, 1).alias('method'),
               regexp_extract('value', method_url_protocol_pattern, 2).alias('address'),
               regexp_extract('value', method_url_protocol_pattern, 3).alias('protocol'),
               regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
               regexp_extract('value', content_size_pattern, 1).cast('integer').alias('size')
              )

In [18]:
df.show()

+--------------------+-----------+---------+------+--------------------+--------+------+-----+
|                host|       date| time_str|method|             address|protocol|status| size|
+--------------------+-----------+---------+------+--------------------+--------+------+-----+
|   in24.inetnebr.com|01/Aug/1995|:00:00:01|   GET|/shuttle/missions...|HTTP/1.0|   200| 1839|
|     uplherc.upl.com|01/Aug/1995|:00:00:07|   GET|                   /|HTTP/1.0|   304|    0|
|     uplherc.upl.com|01/Aug/1995|:00:00:08|   GET|/images/ksclogo-m...|HTTP/1.0|   304|    0|
|     uplherc.upl.com|01/Aug/1995|:00:00:08|   GET|/images/MOSAIC-lo...|HTTP/1.0|   304|    0|
|     uplherc.upl.com|01/Aug/1995|:00:00:08|   GET|/images/USA-logos...|HTTP/1.0|   304|    0|
|ix-esc-ca2-07.ix....|01/Aug/1995|:00:00:09|   GET|/images/launch-lo...|HTTP/1.0|   200| 1713|
|     uplherc.upl.com|01/Aug/1995|:00:00:10|   GET|/images/WORLD-log...|HTTP/1.0|   304|    0|
|slppp6.intermind.net|01/Aug/1995|:00:00:10|   GET

#### 去重：df.select(df.index).distinct().count() 或 df.groupBy('index').count().count()

In [19]:
df1 = df.filter(df['date'] == '15/Aug/1995')

df2 = df1.groupBy('host').count() 

df2.count() 

4211

In [20]:
df3 = df.groupBy('host').count()

df3.count()

75029

In [21]:
df3.sort('count',ascending=False).show()

+--------------------+-----+
|                host|count|
+--------------------+-----+
|  edams.ksc.nasa.gov| 6530|
|piweba4y.prodigy.com| 4846|
|        163.206.89.4| 4791|
|piweba5y.prodigy.com| 4607|
|piweba3y.prodigy.com| 4416|
|www-d1.proxy.aol.com| 3889|
|www-b2.proxy.aol.com| 3534|
|www-b3.proxy.aol.com| 3463|
|www-c5.proxy.aol.com| 3423|
|www-b5.proxy.aol.com| 3411|
|www-c2.proxy.aol.com| 3407|
|www-d2.proxy.aol.com| 3404|
|www-a2.proxy.aol.com| 3337|
|         news.ti.com| 3298|
|www-d3.proxy.aol.com| 3296|
|www-b4.proxy.aol.com| 3293|
|www-c3.proxy.aol.com| 3272|
|www-d4.proxy.aol.com| 3234|
|www-c1.proxy.aol.com| 3177|
|www-c4.proxy.aol.com| 3134|
+--------------------+-----+
only showing top 20 rows



In [22]:
code = df.groupBy('status').count()
code.show()

print(code.count())

+------+-------+
|status|  count|
+------+-------+
|   501|     27|
|   500|      3|
|   400|     10|
|   403|    171|
|   404|  10056|
|   200|1398988|
|   304| 134146|
|   302|  26497|
+------+-------+

8


In [23]:
df4 = df.select('host', 'date')

In [24]:
df4.show()

+--------------------+-----------+
|                host|       date|
+--------------------+-----------+
|   in24.inetnebr.com|01/Aug/1995|
|     uplherc.upl.com|01/Aug/1995|
|     uplherc.upl.com|01/Aug/1995|
|     uplherc.upl.com|01/Aug/1995|
|     uplherc.upl.com|01/Aug/1995|
|ix-esc-ca2-07.ix....|01/Aug/1995|
|     uplherc.upl.com|01/Aug/1995|
|slppp6.intermind.net|01/Aug/1995|
|piweba4y.prodigy.com|01/Aug/1995|
|slppp6.intermind.net|01/Aug/1995|
|slppp6.intermind.net|01/Aug/1995|
|ix-esc-ca2-07.ix....|01/Aug/1995|
|slppp6.intermind.net|01/Aug/1995|
|     uplherc.upl.com|01/Aug/1995|
|        133.43.96.45|01/Aug/1995|
|kgtyk4.kj.yamagat...|01/Aug/1995|
|kgtyk4.kj.yamagat...|01/Aug/1995|
|     d0ucr6.fnal.gov|01/Aug/1995|
|ix-esc-ca2-07.ix....|01/Aug/1995|
|     d0ucr6.fnal.gov|01/Aug/1995|
+--------------------+-----------+
only showing top 20 rows



In [42]:
from pyspark.sql.functions import mean

day = r'^(\d{2})'
df5 = df4.withColumn('day', regexp_extract(df4['date'], day, 1))

df6 = df5.groupBy('day').count()

df6.show()

+---+-----+
|day|count|
+---+-----+
| 07|57362|
| 15|58847|
| 11|61246|
| 29|67988|
| 30|80641|
| 01|33996|
| 22|57762|
| 28|55496|
| 16|56653|
| 31|90125|
| 18|56246|
| 27|32823|
| 17|58988|
| 26|31608|
| 09|60458|
| 05|31893|
| 19|32094|
| 23|58097|
| 08|60157|
| 03|41388|
+---+-----+
only showing top 20 rows



In [45]:
df6.select(mean('count')).show()

+------------------+
|        avg(count)|
+------------------+
|52329.933333333334|
+------------------+

