## pyspark demo

* 实例化spark并输出其版本号

In [4]:
from pyspark import SparkConf, SparkContext
sc.stop()
# 实例化spark
appName = 'pysparkdemo'
conf = SparkConf().setAppName(appName).setMaster('local[2]')
# sc.stop()
sc = SparkContext(conf=conf)
print(sc.version)

2.2.0


### map 
 * Return a new RDD by applying a function to each element of this RDD.

In [3]:
# map
x = sc.parallelize([1,2,3])
y = x.map(lambda x: (x,x**2))

print(x.collect()) 
print(y.collect())

[1, 2, 3]
[(1, 1), (2, 4), (3, 9)]


### flatMap

In [3]:
# flatMap
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, 100*x, x**2))
print('x: {}'.format(x.collect()))
print('y: {}'.format(y.collect()))

x: [1, 2, 3]
y: [1, 100, 1, 2, 200, 4, 3, 300, 9]


### mapPartitions(f, preservesPartitioning=False)
> Return a new RDD by applying a function to each partition of this RDD.

In [4]:
# mapPartitions
x = sc.parallelize([1,2,3], 2)

def f(iterator): 
    yield sum(iterator)
    
y = x.mapPartitions(f)
# glom() flattens elements on the same partition
print('x: {}'.format(x.glom().collect())) 
print('y: {}'.format(y.glom().collect()))

x: [[1], [2, 3]]
y: [[1], [5]]


### mapPartitionsWithIndex
 * Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

In [6]:
# mapPartitionsWithIndex
x = sc.parallelize([1,2,3], 3)

def f(partitionIndex, iterator): 
    yield (partitionIndex, sum(iterator))
    
y = x.mapPartitionsWithIndex(f)
 
# glom() flattens elements on the same partition
print(x.glom().collect())  
print(y.glom().collect())

[[1], [2], [3]]
[[(0, 1)], [(1, 2)], [(2, 3)]]


### getNumPartitions 
 * Returns the number of partitions in RDD

In [7]:
# getNumPartitions
x = sc.parallelize([1,2,3,4,5], 2)
y = x.getNumPartitions()
print(x.glom().collect())
print(y)

[[1, 2], [3, 4, 5]]
2


### filter(f)
* Return a new RDD containing only the elements that satisfy a predicate.

In [8]:
# filter
x = sc.parallelize([1,2,3])
y = x.filter(lambda x: x%2 == 1)  # filters out even elements
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 3]


### distinct(numPartitions=None)
* Return a new RDD containing the distinct elements in this RDD.

In [9]:
# distinct
x = sc.parallelize(['A','A','B'])
y = x.distinct()
print(x.collect())
print(y.collect())

['A', 'A', 'B']
['B', 'A']


### sample(withReplacement, fraction, seed=None)
* Return a sampled subset of this RDD.

> Parameters:	
* withReplacement – can elements be sampled multiple times (replaced when sampled out)
* fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
* seed – seed for the random number generator

In [18]:
# sample
x = sc.parallelize(range(7))
# call 'sample' 5 times
ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)] 

print('x = {}'.format(str(x.collect())))

for cnt,y in zip(range(len(ylist)), ylist):
    print('sample:' + str(cnt) + ' y = ' +  str(y.collect()))

x = [0, 1, 2, 3, 4, 5, 6]
sample:0 y = [2, 3, 6]
sample:1 y = [0, 2, 3, 4, 5, 6]
sample:2 y = [0, 3, 5]
sample:3 y = [0, 3, 5, 6]
sample:4 y = [0, 1, 2, 4]


### takeSample(withReplacement, num, seed=None)
* Return a fixed-size sampled subset of this RDD.

In [19]:
# takeSample
x = sc.parallelize(range(7))
# call 'sample' 5 times
ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)]  
print('x = ' + str(x.collect()))
for cnt,y in zip(range(len(ylist)), ylist):
    print('sample:' + str(cnt) + ' y = ' +  str(y))  # no collect on y

x = [0, 1, 2, 3, 4, 5, 6]
sample:0 y = [1, 4, 5]
sample:1 y = [3, 5, 1]
sample:2 y = [2, 5, 1]
sample:3 y = [3, 0, 5]
sample:4 y = [2, 0, 3]


### histogram(buckets)
* Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1,0,1.

In [21]:
x = sc.parallelize(range(51))

# buckets 是数字时
y = x.histogram(2)
print(y)
'''
([0, 25, 50], [25, 26])
参数buckets是2，输出两部分，[0,25,50]是桶，[25,26]是各个桶分布内的频数
'''
# buckets是列表时
yl = x.histogram([0, 5, 25, 50])
print(yl)
'''
([0, 5, 25, 50], [5, 20, 26])
参数buckets是[0,5,25,50]，输出两部分，[0,5,25,50]是桶，[5,20,26]是各个桶分布内的频数
'''

([0, 25, 50], [25, 26])
([0, 5, 25, 50], [5, 20, 26])


### DataFrame

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# spark is an existing SparkSession
df = spark.read.json("test.json")
# Displays the content of the DataFrame to stdout
df.show()

'''
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
'''