# 使用 Synapse Spark 浏览和修复数据

在此任务中，你将使用 Synapse Spark 笔记本浏览 Data Lake `wwi-02/sale-poc` 文件夹中的部分文件。还将使用 Python 代码修复 `sale-20170502.csv` 文件的问题，这样目录中的所有文件都可以在稍后在本实验室使用 Synapse 管道来引入。

我们需要做的第一件事是在笔记本中设置一个变量，提供 Data Lake Storage 主帐户的名称。在执行下面的单元格之前，需要将 `[YOUR-DATA-LAKE-ACCOUNT-NAME]` 替换为与 Synpse 工作区关联的 Data Lake Storage 主帐户的名称。

可以通过导航到 Synapse Studio 中的 **“数据”** 中心，选择 **“链接”** 选项卡，然后在 **Azure Data Lake Storage Gen2** 下找到以 **asadatalake** 开头的存储帐户名称，来查找 Data Lake Storage 帐户的名称。

![Data Lake Storage 主帐户在数据中心的“链接”选项卡上突出显示。](https://solliancepublicdata.blob.core.windows.net/images/synapse/data-hub-primary-data-lake-storage-account.png "Primary ADLS Gen2 Account")

1. 复制 Data Lake Storage 帐户的名称并将其粘贴到下面单元格中的 `[YOUR-DATA-LAKE-ACCOUNT-NAME]` 的位置，然后通过选择 **“运行单元格”** 按钮来执行该单元格（选择单元格时，该按钮可见）。

    ![Data Lake Storage 主帐户在数据中心的“链接”选项卡上突出显示。](https://solliancepublicdata.blob.core.windows.net/images/synapse/synapse-notebook-run-cell.png "Primary ADLS Gen2 Account")



In [None]:
adls_account_name = '[YOUR-DATA-LAKE-ACCOUNT-NAME]'

## 使用 Spark 浏览文件

1. 使用 Synapse Spark 浏览数据的第一步是从 Data Lake 加载文件。为此，我们可以使用 `SparkSession` 的 `spark.read.load()` 方法。

2. 在 Spark 中，可以将文件中的数据加载到[DataFrame](https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#datasets-and-dataframes)中，DataFrame 是一个抽象的概念，允许在命名列中构建数据。执行下面的单元格，将 `sale-20170501.csv` 文件中的数据加载到数据帧中。可以通过将鼠标悬停在单元格的左侧，然后选择蓝色的 **“运行单元格”** 按钮来运行单元格。

    ![“运行单元格”按钮突出显示在要执行的单元格内容的左侧。](https://solliancepublicdata.blob.core.windows.net/images/synapse/synapse-notebook-run-cell-load-sale-20170501-csv.png "Run cell")

In [None]:
# 首先，加载 `sale-20170501.csv` 文件，从之前的探索中可以得知，该文件的格式是正确的。
# 请注意 `header` 和 `inferSchema` 参数的使用。Header 指示文件中的第一行包含列标头，
# `inferSchema` 指示 Spark 使用文件中的数据来推断数据类型。
df = spark.read.load(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170501.csv', format='csv', header=True, inferSchema=True)

## 查看 DataFrame 的内容

将 `sale-20170501.csv` 文件中的数据加载到数据帧中之后，现在可以使用数据帧的各种方法来探索数据的属性。

1. 首先，让我们看一下导入的数据。执行下面的单元格，查看并检查数据帧中的数据。

In [None]:
display(df.limit(10))

2. 正如在使用 Azure Synapse 的按需 SQL 功能进行探索时所见，通过 Spark 可以查看和查询文件中包含的数据。 

3. 现在，使用数据帧的 `printSchema()` 方法来查看创建数据帧时使用 `inferSchema` 参数的结果。执行下面的单元格并观察输出。

In [None]:
# 现在，输出推断出来的模式。我们将需要下面这些信息来帮助解决 2017 年 5 月 2 日文件中缺少的标头。
df.printSchema()

4. `printSchema` 方法同时输出字段名和数据类型，这些字段名和数据类型基于 Spark 引擎对每个字段中包含的数据的评估。

    > 稍后可以使用此信息来帮助定义格式不佳的 `sale-20170502.csv` 文件的架构。除了字段名称和数据类型，我们还应注意文件中包含的功能或列的数量。在此示例中，请注意有 11 个字段。此信息将用于确定在何处拆分单行数据。

5. 作为进一步探索的示例，运行下面的单元格以创建并显示新的数据帧，其中包含不同 Customer 和 Product ID 配对的有序列表。我们可以使用这些类型的函数在目标字段中快速查找无效值或空值。

In [None]:
# 创建新的 DataFrame，其中包含按 CustomerId 降序排列的不同 CustomerId 和 ProductId 值的列表。
df_distinct_products = df.select('CustomerId', 'ProductId').distinct().orderBy('CustomerId')

# 显示生成的 DataFrame 的前 100 行。
display(df_distinct_products.limit(100))

6. 接下来，像我们之前那样，尝试使用 `load()` 方法打开并浏览 `sale-20170502.csv` 文件。

In [None]:
# 接下来，尝试使用与第一个文件相同的 `load()` 方法读取 2017 年 5 月 2 日的文件。
df = spark.read.load(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170502.csv', format='csv')
display(df.limit(10))

7. 正如在 T-SQL 中所见，我们在 Spark 中收到类似的错误，即处理的列数可能已超过 20480 列的限制。要处理此文件中的数据，需要使用更高级的方法，可在下一节中看到此方法。


## 处理和修复格式不佳的 CSV 文件

> 以下步骤提供了修复格式不佳的 CSV 文件 `sale-20170502.csv` 的示例代码，我们在浏览 `wwi-02/sale-poc` 文件夹中的文件时发现该文件。这只是使用 Spark 处理“修复”格式不佳的 CSV 文件的众多方法之一。

1. 要“修复”格式不佳的文件，需要采用编程方法，使用 Python 读入文件内容，然后对其进行解析，来调整其格式。

    > 要处理单行中的数据，可以使用 `SparkContext` 的 `textFile()` 方法将文件作为行的集合读取到可复原分布式数据集 (RDD) 中。这使我们能够应对围绕列数的错误，因为本质上，我们是获取存储在单列中的单个字符串值。

2. 执行下面的单元格，用文件中的数据加载 RDD。

In [None]:
# 导入 NumPy 库。NumPy 是用于处理数组的 Python 库。
import numpy as np

# 将 CSV 文件作为文本文件读入可复原分布式数据集 (RDD)。这会将文件的每一行读入 RDD 中的行。
rdd = sc.textFile(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170502.csv')

3. 数据存储在 RDD 中之后，现在我们可以访问 RDD 中的第一行（也是唯一填充行），然后将其拆分为单个字段。通过在 Notepad++ 中对文件的检查，我们可以得知所有字段都由逗号 (,) 分隔，所以让我们从逗号开始拆分，创建字段值数组。执行下面的单元格，创建数据数组。

In [None]:
# 由于我们知道只有一行，因此抓取 RDD 的第一行，并从字段分隔符（逗号）进行拆分。
data = rdd.first().split(',')

field_count = len(data)
# 输出读入数组的字段计数。
print(field_count)

4. 通过从字段分隔符拆分行，我们创建了文件中所有单个字段值的数组，可以在上面看到其计数。

5. 现在，运行下面的单元格，快速计算通过将每 11 个字段解析为一行而生成的预期行数。

In [None]:
import math

expected_row_count = math.floor(field_count / 11)
print(f'The expected row count is: {expected_row_count}')

6. 接下来，让我们创建数组来存储与每一行相关联的数据。

    > 将 max_index 设置为每行中预期的列数。通过浏览 `wwi-02/sale-poc` 文件夹中的其他文件，可以得知其包含 11 列，因此我们也将值设置为 11。

7. 除了设置变量，我们还将使用下面的单元格循环遍历 `data` 数组并将每一行的值指定为 11。通过这样做，可以将曾经是单行的数据“拆分”为适当的行，其中包含文件中正确的数据和列。

8. 执行下面的单元格，从文件数据中创建一个行数组。

In [None]:
# 创建数组来存储与每一行相关联的数据。将 max_index 设置为每行中的列数。这里为 11，我们在上面查看 5 月 1 日文件的架构时注意到了这一点。
row_list = []
max_index = 11

# 现在，我们将循环遍历从文件的单行中提取的值数组，并生成由 11 列组成的行。
while max_index <= len(data):
    row = [data[i] for i in np.arange(max_index-11, max_index)]
    row_list.append(row)

    max_index += 11

print(f'The row array contains {len(row_list)} rows. The expected number of rows was {expected_row_count}.')

9. 为了能够将文件数据作为行处理，我们需要做的最后一件事是将其读入 Spark DataFrame。在下面的单元格中，使用 `createDataFrame()` 方法将 `row_list` 数组转换为 DataFrame，该 DataFrame 还为列添加名称。列名基于在 `wwi-02/sale-poc` 目录中格式良好的文件里观察到的架构。

10. 执行下面的单元格，创建包含文件中行数据的数据帧，然后显示前 10 行。

In [None]:
# 最后，可以使用上面创建的 row_list 来创建一个 DataFrame。我们可以向其中添加一个架构参数，其中包含在第一个文件架构中看到的列名称。
df_fixed = spark.createDataFrame(row_list,schema=['TransactionId', 'CustomerId', 'ProductId', 'Quantity', 'Price', 'TotalAmount', 'TransactionDateId', 'ProfitAmount', 'Hour', 'Minute', 'StoreId'])
display(df_fixed.limit(10))

## 将已修复文件写入 Data Lake 中

1. 浏览和修复文件过程中的最后一步是将数据写回 Data Lake，因此可以按照与 `wwi-02/sale-poc` 文件夹中的其他文件相同的过程引入数据。

2. 执行下面的单元格，将数据帧保存到 Data Lake 中名为 `sale-20170502-fixed` 的文件夹中的一系列文件。

    > 备注：Spark 跨工作节点并行化工作负载，因此保存文件时，你会注意到它们被保存为“部分”文件的集合，而不是单个文件。虽然可以使用一些库来创建单个文件，但习惯于处理通过 Spark 笔记本生成的文件是有帮助的，因为它们是在本地创建的。


In [None]:
df.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170502-fixed')

## 检查 Data Lake 中的已修复文件

1. 将已修复文件写入 Data Lake 后，可以快速对其进行检查，以验证文件现在的格式是否正确。选择上面的 `wwi-02` 选项卡，然后双击 `sale-20170502-fixed` 文件夹。

    ![](https://solliancepublicdata.blob.core.windows.net/images/synapse/wwi-02-sale-poc-sale-20170502-fixed.png)

2. 在 `sale-20170502-fixed` 文件夹中，右键单击名称以 `part` 开头且扩展名为 `.csv` 的第一个文件，然后从上下文菜单中选择 **“预览”**。

    ![](https://solliancepublicdata.blob.core.windows.net/images/synapse/wwi-02-sale-poc-sale-20170502-fixed-content.png)

3. 在 **“预览”** 对话框中，确认看到了正确的列，并且每个字段中的数据看起来都有效。

    ![](https://solliancepublicdata.blob.core.windows.net/images/synapse/sale-20170502-fixed-preview.png)

## 总结

在本练习中，你使用了 Spark 笔记本来探索存储在 Data Lake 文件中的数据。你使用 Python 代码从格式错误的 CSV 文件中提取数据，将该文件中的数据汇编成正确的行，然后将已修复文件写回 Data Lake。

现在，你可以返回到实验室指南，继续实验室 2 接下来的部分。
