# Explode

In [1]:
import os
import sys
import site

cwd = os.getcwd()
print(f"Current directory: {cwd}")
print(f"Current Python version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print(f"Current Python interpreter: {sys.executable}")
print(f"Current site-packages: {site.getsitepackages()}")

sys.path.append(os.path.join(cwd, "site-packages"))

Current directory: /home/jovyan/docs/source/00-New/01-Create-DataFrame
Current Python version: 3.10.6
Current Python interpreter: /opt/conda/bin/python
Current site-packages: ['/opt/conda/lib/python3.10/site-packages']


In [2]:
# 首先创建一个 Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark

## Create DataFrame from Python List of Tuple

```
data = [
    (v11, v12, ...),
    (v21, v22, ...),
    ...
]
```

In [7]:
pdf = spark.createDataFrame(
    [
        (1, [1, 2]), 
        (2, [3, 4, 5]), 
        (3, [6, 7]),
    ],
    ("id", "values"),
)
pdf.printSchema()
pdf.show()

root
 |-- id: long (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: long (containsNull = true)

+---+---------+
| id|   values|
+---+---------+
|  1|   [1, 2]|
|  2|[3, 4, 5]|
|  3|   [6, 7]|
+---+---------+



In [8]:
from pyspark.sql import functions as F

pdf.select(
    pdf.id,
    F.explode(pdf.values).alias("value")
).show()


+---+-----+
| id|value|
+---+-----+
|  1|    1|
|  1|    2|
|  2|    3|
|  2|    4|
|  2|    5|
|  3|    6|
|  3|    7|
+---+-----+



In [9]:
pdf.withColumn(
    "value",
    F.explode(pdf.values).alias("value")
).show()

+---+---------+-----+
| id|   values|value|
+---+---------+-----+
|  1|   [1, 2]|    1|
|  1|   [1, 2]|    2|
|  2|[3, 4, 5]|    3|
|  2|[3, 4, 5]|    4|
|  2|[3, 4, 5]|    5|
|  3|   [6, 7]|    6|
|  3|   [6, 7]|    7|
+---+---------+-----+



In [47]:
pdf = spark.createDataFrame(
    [(1, "alice"), (2, "bob")],
    schema=("id", "name"),
)
pdf.printSchema()
pdf.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

+---+-----+
| id| name|
+---+-----+
|  1|alice|
|  2|  bob|
+---+-----+



In [10]:
pdf1 = spark.createDataFrame(
    [
        (1, "a"), 
    ],
    schema=("c1", "c2"),
)
pdf1.printSchema()
pdf1.show()

pdf2 = spark.createDataFrame(
    [
        (1, "a", "alice"), 
        (1, "a", "bob"),
    ],
    schema=("c11", "c22", "c33"),
)
pdf2.printSchema()
pdf2.show()

root
 |-- c1: long (nullable = true)
 |-- c2: string (nullable = true)

+---+---+
| c1| c2|
+---+---+
|  1|  a|
+---+---+

root
 |-- c11: long (nullable = true)
 |-- c22: string (nullable = true)
 |-- c33: string (nullable = true)

+---+---+-----+
|c11|c22|  c33|
+---+---+-----+
|  1|  a|alice|
|  1|  a|  bob|
+---+---+-----+



In [11]:
pdf = pdf1.join(
    pdf2,
    (pdf1.c1 == pdf2.c11) & (pdf1.c2 == pdf2.c22),
    "left",
)
pdf.show()

+---+---+---+---+-----+
| c1| c2|c11|c22|  c33|
+---+---+---+---+-----+
|  1|  a|  1|  a|  bob|
|  1|  a|  1|  a|alice|
+---+---+---+---+-----+



## Create DataFrame from Python List of Dict

```
data = [
    {"col1": v11, "col2": v12, ...},
    {"col1": v21, "col2": v22, ...},
    ...
]
```

In [48]:
pdf = spark.createDataFrame(
    [
        {"id": 1, "name": "alice"},
        {"id": 2, "name": "bob"},
    ],
)
pdf.printSchema()
pdf.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

+---+-----+
| id| name|
+---+-----+
|  1|alice|
|  2|  bob|
+---+-----+



## Explicitly Define the Schema

创建 DataFrame 的时候, Spark 支持自动推导类型, 也支持显式指定 Schema. [pyspark.sql.types](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/data_types.html) 这个模块定义了 Spark 中的所有数据类型. 一个 Schema 本质就是一个 Struct 结构体 (key value pair), 里面可以 embed 其他的结构体以描述一个 column 是一个复杂对象的情况. 里面也可以 embed 一个 Array, 其中 Array 里面的元素的类型也需要指定. 用 Struct + Array + generic type 基本上就可以构建出任意复杂的 Schema.

下面我们来手动定义一个 Schema.

In [31]:
# 先 import 需要的类
from pyspark.sql.types import ( 
    IntegerType,
    StringType, 
    DoubleType,    
    BooleanType,
    TimestampType, # datetime.datetime
    DateType, # datetime.date
    ArrayType, 
    StructField,
    StructType,
)


# spark session 的 createDataFrame 是通过直接输入数据创建 DataFrame 的方法. 在生产环境下我们一般都是从数据源读取, 
# 而在开发过程中我们经常要实验性地创建一个 DataFrame 来尝试一些 API. 所以这种方法还是非常有必要掌握的.
pdf = spark.createDataFrame(
    # 一个列表中的每一个元素代表一个 Row, 你可以用 tuple 或是 dict 来代表一个 Row, 这里用 tuple 为例
    # 在使用 tuple 的时候请注意每个值的顺序
    [
        (
            1, 
            "Alice", 
            {"ssn": "111-222-333", "aliases": ["alice", "ali"]},
            [1, 2, 3],
            [
                {"key": "profession", "value": "banker"},
            ],
        ),
        (
            2, 
            "Bob", 
            {"ssn": "444-555-666", "aliases": ["bob", "bb"]},
            [4, 5, 6],
            [
                {"key": "profession", "value": "teacher"},
            ],
        ),
    ],
    # 创建 DataFrame 时显式指定 schema 结构体
    schema=StructType([
        StructField("id", IntegerType(), nullable=True),
        StructField("name", StringType(), nullable=True),
        StructField(
            "profile", 
            StructType([
                StructField("ssn", StringType(), nullable=True),
                StructField("aliases", ArrayType(StringType()), nullable=True),
            ]), 
            nullable=True,
        ),
        StructField("numbers", ArrayType(IntegerType()), nullable=True),
        StructField(
            "tags", 
            ArrayType(StructType([
                StructField("key", StringType(), nullable=False),
                StructField("value", StringType(), nullable=False),
            ])), 
            nullable=True,
        ),
    ])
)
# 用 .show 函数打印数据, 用 vertical = True 来进行列式打印, 用  truncate = False 来显示全部字符串
pdf.show(vertical=True, truncate=False)

+---+-----+---------------------------+---------+-----------------------+
|id |name |profile                    |numbers  |tags                   |
+---+-----+---------------------------+---------+-----------------------+
|1  |Alice|{111-222-333, [alice, ali]}|[1, 2, 3]|[{profession, banker}] |
|2  |Bob  |{444-555-666, [bob, bb]}   |[4, 5, 6]|[{profession, teacher}]|
+---+-----+---------------------------+---------+-----------------------+



In [32]:
pdf.select(
    pdf.id,
    pdf.name,
).show(vertical=True, truncate=False)

-RECORD 0-----
 id   | 1     
 name | Alice 
-RECORD 1-----
 id   | 2     
 name | Bob   



In [33]:
pdf.select(
    pdf.profile,
).show(vertical=True, truncate=False)

-RECORD 0------------------------------
 profile | {111-222-333, [alice, ali]} 
-RECORD 1------------------------------
 profile | {444-555-666, [bob, bb]}    



In [11]:
pdf.select(
    pdf.profile.ssn
).show(vertical=True, truncate=False)

-RECORD 0------------------
 profile.ssn | 111-222-333 
-RECORD 1------------------
 profile.ssn | 444-555-666 



In [15]:
pdf.select(
    pdf.profile.aliases
).show(vertical=True, truncate=False)

-RECORD 0-----------------------
 profile.aliases | [alice, ali] 
-RECORD 1-----------------------
 profile.aliases | [bob, bb]    



In [20]:
pdf.select(
    pdf.numbers
).show(vertical=True, truncate=False)

-RECORD 0------------
 numbers | [1, 2, 3] 
-RECORD 1------------
 numbers | [4, 5, 6] 



In [30]:
pdf.select(
    pdf.numbers[0].alias("number0")
).show(vertical=True, truncate=False)

-RECORD 0------
 number0 | 1   
-RECORD 1------
 number0 | 4   



In [26]:
pdf.select(
    pdf.tags
).show(vertical=True, truncate=False)

-RECORD 0-----------------------
 tags | [{profession, banker}]  
-RECORD 1-----------------------
 tags | [{profession, teacher}] 



In [29]:
pdf.select(
    pdf.tags[0].key.alias("tags0.key")
).show(vertical=True, truncate=False)

-RECORD 0---------------
 tags0.key | profession 
-RECORD 1---------------
 tags0.key | profession 



## Read CSV


``SparkSession.read.csv`` 是用来从 CSV 读数据的函数.

Ref: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html#pyspark.sql.DataFrameReader.csv


In [54]:
# 默认情况下是 spark 会把第一行当成数据而不是 header, 如果你的 csv 文件有 header 则需要显示指定
pdf = spark.read.csv("./users.csv", header=True, sep=",")
pdf.printSchema()
pdf.show()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

+---+-----+
| id| name|
+---+-----+
|  1|alice|
|  2|  bob|
|  3|cathy|
+---+-----+



In [53]:
# 因为 CSV 文件是一个字符串编码的文件, 在读数据的经常会遇到到底数字是被解读为整数还是字符串的问题
# 你可以在读取 CSV 文件时定义 Schema, 显式地告诉 Spark 哪些 column 要被视为整数, 哪些 column 要被视为字符串
pdf = spark.read.csv(
    "./users.csv", 
    header=True,
    sep=",",
    schema=StructType([
        StructField("id", IntegerType(), nullable=True),
        StructField("name", StringType(), nullable=True),
    ]),
)
pdf.printSchema()
pdf.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)

+---+-----+
| id| name|
+---+-----+
|  1|alice|
|  2|  bob|
|  3|cathy|
+---+-----+



In [59]:
# Python 中经常会遇到从 csv 数据字符串本身读取而不是将其视为一个路径的情况, 在 pandas 等框架中我们一般用 io buffer 来实现
# 而在 spark 中你需要将字符串分割为许多行, 其中每行代表一条数据
# 然后用 spark.sparkContext.parallelize(lines) 生成一个并行对象 (spark 是分布式并行框架)
# 然后就可以用 spark.read.csv 来读数据了

import io

with open("./users.csv", "r") as f:
    lines = f.read().splitlines()
    # df = spark.read.csv(dt, header=True)
    pdf = spark.read.csv(spark.sparkContext.parallelize(lines), header=True, sep=",")
    pdf.printSchema()
    pdf.show()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

+---+-----+
| id| name|
+---+-----+
|  1|alice|
|  2|  bob|
|  3|cathy|
+---+-----+



## Select A Column

对 DataFrame 进行向量化的列操作是大数据处理中最简单的操作之一. 无论多么复杂的操作都是基于此, 要么对一个列的计算过程更复杂, 要么涉及到多个列的计算. 本节我们来看看如何选取一个 Column 并对其进行计算.

In [61]:
pdf = spark.createDataFrame(
    [(1, "alice"), (2, "bob")],
    ("id", "name")
)
pdf.show()

+---+-----+
| id| name|
+---+-----+
|  1|alice|
|  2|  bob|
+---+-----+



In [35]:
from pyspark.sql import functions as F

pdf.select(
    F.col("id")
).show()

+---+
| id|
+---+
|  1|
|  2|
+---+



In [43]:
pdf.select(
    # 去掉第一个字符和最后一个字符
    F.expr("substring(name, 2, length(name)-2)").alias("name_substring")
).show()

+--------------+
|name_substring|
+--------------+
|           lic|
|             o|
+--------------+



In [64]:
@F.udf(IntegerType())
def double_the_integer(v: int) -> int:
    return v * 2

pdf.select(
    double_the_integer(pdf.id).alias("id")
).show()

+---+
| id|
+---+
|  2|
|  4|
+---+



## Next