In [47]:
from pyspark.sql import SparkSession
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

spark = SparkSession.builder.appName("lecture6").getOrCreate()

sc = spark.sparkContext

# 原始数据
simpleData = [
    ("James", "Sales", "NY", 90000, 34, 10000),
    ("Michael", "Sales", "NV", 86000, 56, 20000),
    ("Robert", "Sales", "CA", 81000, 30, 23000),
    ("Maria", "Finance", "CA", 90000, 24, 23000),
    ("Raman", "Finance", "DE", 99000, 40, 24000),
    ("Scott", "Finance", "NY", 83000, 36, 19000),
    ("Jen", "Finance", "NY", 79000, 53, 15000),
    ("Jeff", "Marketing", "NV", 80000, 25, 18000),
    ("Kumar", "Marketing", "NJ", 91000, 50, 21000),
]

# 数据表头
schema = ["employee_name", "department", "state", "salary", "age", "bonus"]
# 从原始数据和表头创建DataFrame
df = spark.createDataFrame(data=simpleData, schema=schema)
# 打印DataFrame的schema
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NV   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |DE   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |NV   |80000 |25 |18000|
|Kumar        |Marketing |NJ   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



23/05/14 20:17:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [49]:
from pyspark.sql.functions import sum, col, desc

# 按照州分组，计算每个州的总工资，过滤掉总工资小于100000的部门，按照总工资降序排序
df.groupBy("state").agg(sum("salary").alias("sum_salary")).filter(
    col("sum_salary") > 100000
).sort(desc("sum_salary")).show()

+-----+----------+
|state|sum_salary|
+-----+----------+
|   NY|    252000|
|   CA|    171000|
|   NV|    166000|
+-----+----------+



In [48]:
# 使用SQL语句实现同样的功能
df.createOrReplaceTempView("EMP")
spark.sql(
    "select state, sum(salary) as sum_salary from EMP "
    + "group by state having sum_salary > 100000 "
    + "order by sum_salary desc"
).show()

+-----+----------+
|state|sum_salary|
+-----+----------+
|   NY|    252000|
|   CA|    171000|
|   NV|    166000|
+-----+----------+



In [16]:
# 拆解
df.groupBy("state").sum("salary").show()

+-----+-----------+
|state|sum(salary)|
+-----+-----------+
|   NY|     252000|
|   NV|     166000|
|   CA|     171000|
|   DE|      99000|
|   NJ|      91000|
+-----+-----------+



In [18]:
dfGroup = df.groupBy("state").agg(sum("salary").alias("sum_salary"))
dfGroup.show()

+-----+----------+
|state|sum_salary|
+-----+----------+
|   NY|    252000|
|   NV|    166000|
|   CA|    171000|
|   DE|     99000|
|   NJ|     91000|
+-----+----------+



In [19]:
dfFilter = dfGroup.filter(dfGroup.sum_salary > 100000)
dfFilter.show()

+-----+----------+
|state|sum_salary|
+-----+----------+
|   NY|    252000|
|   NV|    166000|
|   CA|    171000|
+-----+----------+



In [22]:
dfFilter.sort(desc("sum_salary")).show()

+-----+----------+
|state|sum_salary|
+-----+----------+
|   NY|    252000|
|   CA|    171000|
|   NV|    166000|
+-----+----------+



In [1]:
import os
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

# 文件路径存在会报错， mode=0o755表示创建目录的权限为755 (rwxr-xr-x)
os.mkdir("output", mode=0o755)

FileExistsError: [Errno 17] File exists: 'outpput'

In [10]:
for root, dirs, files in os.walk(".", topdown=True):
    for name in files:
        print(os.path.join(root, name))
    print("------")
    for name in dirs:
        print(os.path.join(root, name))

./lecture6.ipynb
./.DS_Store
./~$数据分析第一期-6.pptx
./数据分析第一期-6.pptx
------
./outpput
./outpput/abc.txt
------


In [23]:
a = ["a", "b", "c", "d"]
str(a)

"['a', 'b', 'c', 'd']"

In [57]:
from functools import reduce


a = [1, 2, 3, 4]
reduce(lambda x, y: x + y, a)

type(map(lambda x: x + 1, a))

10

map

In [56]:
# map 只能用一次
a = map(lambda x: x + 1, a)
print(a)
for v in a:
    print(v)

for v in a:
    print(v)

[2, 3, 4, 5]
2
3
4
5
2
3
4
5


In [66]:
a = [1, 2, 3, 4]
a = iter(a)

while True:
    try:
        print(next(a))
    except StopIteration as e:
        print(f'error happened {e}')
        break

1
2
3
4
error happened 


In [35]:
def fib(count):
    m = 1
    n = 1
    a = 0
    ret = []
    while a < count:
        ret.append(n)
        n, m = m, m + n
        a += 1
    return ret


for item in fib(6):
    print(item)

1
1
2
3
5
8


In [37]:
# 使用 yield 实现斐波那契数列
def fib2(count):
    m = 1
    n = 1
    a = 0
    while a < count:
        yield n
        n, m = m, m + n
        a += 1


fib2 = fib2(6)
while True:
    try:
        print(next(fib2))
    except StopIteration:
        break

1
1
2
3
5
8


In [46]:
# try except 处理异常
try:
    # int("a")
    os.mkdir('output')
except ValueError as e:
    print(e)
else:
    print("no error")

print("end")

FileExistsError: [Errno 17] File exists: 'output'

In [74]:
# 读写文件
f = open('test.txt', 'a')
f.writelines(['hello world\n', 'hello python\n'])
f.close()

In [78]:
f = open('test.txt', 'r')
for line in f:
    print(line.strip())
f.close()

hello world
hello python
hello world
hello python


In [None]:
f = open('test.txt', 'a')
f.write('hello world\n')
f.flush() # 将缓存中的数据写入文件，而不是等到文件关闭时才写入
f.close()