In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

#### Words count

In [2]:
from pyspark import SparkContext
sc = SparkContext()

In [4]:
data = [1,2,3,4,5]
distData = sc.parallelize(data,8)
s = distData.reduce(lambda a,b : a*b)  # reduce表示进行聚合操作
print(s)

120


In [6]:
file = ["line1", "This is line2"]
lines = sc.parallelize(file)
print(lines.collect())

['line1', 'This is line2']


In [7]:
print(lines.collect())

['line1', 'This is line2']


In [11]:
linelengthes = lines.map(lambda line:len(line))
print(linelengthes.collect())

[5, 13]


In [12]:
def reduceFunc(a,b):
    return a+b

In [13]:
totalLength = linelengthes.reduce(reduceFunc)
print(totalLength)

18


In [14]:
linelengthes.persist()

PythonRDD[6] at collect at /tmp/ipykernel_41/1028932533.py:2

In [15]:
words = ['hello this is line one', 'hello this is line two']
words_rdd = sc.parallelize(words)
print(words_rdd.collect())

['hello this is line one', 'hello this is line two']


In [17]:
words_rdd = words_rdd.flatMap(lambda line: line.split(" "))
print(words_rdd.collect())

['hello', 'this', 'is', 'line', 'one', 'hello', 'this', 'is', 'line', 'two']


In [19]:
paris = words_rdd.map(lambda s:(s,1))
print(paris.collect())

[('hello', 1), ('this', 1), ('is', 1), ('line', 1), ('one', 1), ('hello', 1), ('this', 1), ('is', 1), ('line', 1), ('two', 1)]


In [21]:
counts = paris.reduceByKey(lambda a,b:a + b)  #对键进行聚合操作
print(counts.collect())

[('two', 1), ('hello', 2), ('this', 2), ('line', 2), ('is', 2), ('one', 1)]


#### Pi estimation

In [22]:
import random

def inside(p):
    x,y = random.random(), random.random()
    return x*x + y*y <1

In [25]:
NUM_SAMPLES = 1000000
count = sc.parallelize(range(0, NUM_SAMPLES))\
        .filter(inside).count()
print("Pi is roughly %f" % (4.0 *count / NUM_SAMPLES))  # pi*r^2 / (2r)^2 = pi/4

Pi is roughly 3.141820


#### DataFrame API

In [29]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName("DataFrame Example")\
    .getOrCreate()

df = spark.createDataFrame([
    ("TOM", "M", 30), 
    ("SAY", "F", 24), 
    ("Marry", "M", 43)
], ["name", "Sex", "age"])

# df = sc.parallelize([("TOM", "M", 30), ("SAY", "F", 24), ("Marry", "M", 43)])\
#     .toDF(["name", "Sex", "age"])
df.show()

+-----+---+---+
| name|Sex|age|
+-----+---+---+
|  TOM|  M| 30|
|  SAY|  F| 24|
|Marry|  M| 43|
+-----+---+---+



In [31]:
countByCol = df.groupBy("age").count()
countByCol.show()

+---+-----+
|age|count|
+---+-----+
| 30|    1|
| 24|    1|
| 43|    1|
+---+-----+



In [32]:
df.select("Sex").show()

+---+
|Sex|
+---+
|  M|
|  F|
|  M|
+---+



In [35]:
from pyspark.sql.functions import col

errors = df.filter(col('age') >25)
errors.show()

+-----+---+---+
| name|Sex|age|
+-----+---+---+
|  TOM|  M| 30|
|Marry|  M| 43|
+-----+---+---+

