In [27]:
"""A simple Spark app in Python"""
import os
import sys
from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.sql import SparkSession

# 在创建一个新的对象前，必须调用现有对象的 stop()函数
try:
    sc.stop()
except:
    pass

In [28]:
# SparkContext 是调用 Spark 功能的一个主要入口
# SparkContext 的初始化需要 SparkConf 对象的一个实例，后者包含了 Spark 集群配 置的各种参数，比如主节点的 URL
conf = SparkConf().setAppName("Spark App For Loading Json").setMaster("local")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [29]:
# 弹性分布式数据集（RDD，resilient distributed dataset）是 Spark 的核心概念
# 自己创建
collection = ["a", "b", "c", "d"]
rdd_from_collection = sc.parallelize(collection)
type(rdd_from_collection)

pyspark.rdd.RDD

In [30]:
# 通过输入源创建
rdd_from_file = sc.textFile("file:/Users/a2017148/Documents/programming/pylab/pylab/books/Spark机器学习-第二版/datasets/UserPurchaseHistory.csv")
type(rdd_from_file)

pyspark.rdd.RDD

In [31]:
data = rdd_from_file.map(lambda line: line.split(",")).\
    map(lambda record: (record[0], record[1], record[2]))
type(data)

pyspark.rdd.PipelinedRDD

In [32]:
numPurchases = data.count()
numPurchases

5

In [33]:
data.collect()

[('John', 'iPhone Cover', '9.99'),
 ('John', 'Headphones', '5.49'),
 ('Jack', 'iPhone Cover', '9.99'),
 ('Jill', 'Samsung Galaxy Cover', '8.95'),
 ('Bob', 'iPad Cover', '5.49')]

In [34]:
# 去重
unique_users = data.map(lambda record: record[0]).distinct()
type(unique_users)

pyspark.rdd.PipelinedRDD

In [35]:
unique_users.count(), unique_users.collect()

(4, ['John', 'Jack', 'Jill', 'Bob'])

In [36]:
products = data.map(lambda record: (record[1], 1.0)).reduceByKey(lambda a, b: a + b).collect()
products

[('iPhone Cover', 2.0),
 ('Headphones', 1.0),
 ('Samsung Galaxy Cover', 1.0),
 ('iPad Cover', 1.0)]

In [37]:
sorted(products, key=lambda x: x[1], reverse=True)

[('iPhone Cover', 2.0),
 ('Headphones', 1.0),
 ('Samsung Galaxy Cover', 1.0),
 ('iPad Cover', 1.0)]

In [42]:
log_data = spark.read.text('/Users/a2017148/Documents/programming/pylab/pylab/books/Spark机器学习-第二版/datasets/UserPurchaseHistory.csv').cache()
log_data, type(log_data)

(DataFrame[value: string], pyspark.sql.dataframe.DataFrame)

In [None]:
# stop the SparkContext
sc.stop()