<h4>数据源</h4>

- 以下是Spark核心数据源
- CSV，JSON，Parquet，ORC，JDBC/ODBC连接，纯文本文件

<h4>数据源API的组织结构（Read API结构）</h4>

- 在介绍对特定格式的读写操作之前，先看看数据源API（Data Source API）
- Read API结构
    - DataFrameReader.format(...).option("key","value").schema(...).load()

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python").getOrCreate()

<h4>数据读取基础</h4>

- Spark数据读取使用DataFrameReader，通过SparkSession的read属性得到，即Spark.read
- 有了DataFrame reader后，需要指定几个值
    - format
    - schema
    - read模式
    - 一系列option选项
- 下面是一个整体结构的例子

spark.read.format("csv")<br>
.option("mode", "FAILFAST")<br>
.option("inferSchema", "true")<br>
.option("path", "path/to/file")<br>
.schema(someSchema)<br>
.load()

<h4>读取模式</h4>

- 从外部源读取数据很容易会遇到错误格式的数据，尤其是在处理半结构化数据时
- 读取模式指定当Spark遇到错误格式的记录时，应采取什么操作

|读取模式|说明
|:----|:----|
|permissive|当遇到错误格式的记录时，将所有字段设为null，并将所有错误格式的记录放在名为_corrupt_record字符串列中
|dropMalformed|删除包含错误格式记录的行
|failFast|遇到错误格式的记录后立即返回失败

默认是permissive

<h4>Write API结构</h4>

- 写数据的核心结构如下
- DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

<h4>写数据基础</h4>

- 写数据与读取数据非常相似，需要用到DataFrameWriter而不是DataFrameReader
- 使用DataFrame的write属性来获取DataFrameWriter

dataframe.write.format("csv")<br>
.option("mode","OVERWRITE")<br>
.option("dataformat","yyyy-MM-dd")<br>
.option("path","path/to/file")<br>
.save()

<h4>保存模式</h4>

- 保存模式指明如果Spark在指定目标路径发现有其他数据占用时采取什么操作

|保存模式|描述
|:----|:----|
|append|将输出文件追加到目标路径已存在的文件上或目录的文件列表
|overwrite|将完全覆盖目标路径中已存在的任何数据
|errorIfExists|如果目标路径已存在数据或文件，则抛出错误并返回写入操作失败
|ignore|如果目标路径已存在数据或文件，则不执行任何操作

默认值为errorIfExists，也就是说如果目标路径已有数据存在数据则Spark立即写入失败

CSV数据源选项

|read/write|Key|取值范围|默认值|说明
|:----|:----|:----|:----|:----|
|Both|sep|任意单个字符串字符|,|用作每个字段和值的分隔符的单个字符
|Both|header|true, false|false|文件中第一行是否为列的名称|
|Both|escape|任意字符串|\\|用于转义的字符
|Both|inferSchema|true, false|false|指定在读取文件时，Spark是否自动推断列类型
|Both|ignoreLeadingWhiteSpace|true, false|false|是否跳过读取值中的前导空格
|Both|ignoreTrailingWhiteSpace|true, false|false|是否跳过读取值的尾部空格
|Both|nullValue|任意字符串字符|""|声明在文件中什么字符表示null值
|Both|nanValue|任意字符串字符|NaN|声明什么字符表示CSV文件中NaN或缺失字符
|Both|positiveInf|任意字符串字符|Inf|声明什么字符表示正无穷大
|Both|negativeInf|任何字符串或字符|-Inf|声明什么字符表示负无穷大
|Both|Compression或codec|None,uncompredded,bzip2|deflate,gzip,lz4, or snappy|Spark应该使用什么压缩编解码器来读取或写入
|Both|dateFormat|任何符合Java的SimpleDataFormat的字符串或字符|yyyy-MM-dd|日期类型的日期格式
|Both|timestampFormat|任何符合Java的SimpleDataFormat的字符串或字符||时间戳类型格式
|Read|maxColumns|任意整数|20480|声明文件中的最大列数
|Read|maxCharsPerColumn|任意整数|1000000|声明列中的最大字符数
|Read|escapeQuotes|true, false|true|声明Spark是否应该转义在行中找到的引号
|Read|multiline|true, false|false|用于读取多行CSV文件，每个逻辑行可能跨越文件本身中的多行
|Write|QuoteAll|true, false|false|指定是否将所有值括在引号中，而不是仅转义具有引号字符的值

<h4>先科普下StructField</h4>

-----A field inside a StructType<br>
name：The name of this field.<br>
dataType：The data type of this field.<br>
nullable：Indicates if values of this field can be null values.<br>
metadata：The metadata of this field. The metadata should be preserved during transformation if the content of the column is not modified, e.g, in selection.

<h4>一个读取CSV文件的例子</h4>

- 如果列类型与schema不匹配，Spark只有在实际读取数据而非定义DataFrame时，才会报错

spark.read.format("csv")<br>
.option("header", "true")<br>
.option("mode","FAILFAST")<br>
.schema(myManualSchema)<br>
.load("./data/flight-data/csv/2010-summary.csv").take(5)

<h4>写CSV文件</h4>

- 以下读取CSV文件内容并写入tsv

In [5]:
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("./data/flight-data/csv/2010-summary.csv")

In [9]:
csvFile.take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=264)]

In [10]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t").save("./tmp/my-tsv-file.tsv")

Py4JJavaError: An error occurred while calling o54.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:736)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:271)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:287)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:865)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:547)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:587)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:586)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:586)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:586)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:559)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:705)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:178)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:548)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:569)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:592)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:689)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:78)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1814)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1791)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:302)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:326)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:343)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:468)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:439)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:516)
	... 21 more


<h4>JSON文件</h4>

- 在Spark中，我们提及的JSON文件指的是换行符分隔的JSON
- 即每行必须包含一个单独的、独立的有效JSON对象

<h4>JSON选项</h4>


|read/write|Key|取值范围|默认值|说明
|:----|:----|:----|:----|:----|
|Both|Compression或codec|None, uncompressed, bzip2, deflate, gzip, lz4或snappy|none|声明Spark应该使用什么压缩编码器来读取或写入文件
|Both|dateFormat|任何符合Java的SimpleDataFormat的字符串或字符|yyyy-MM-dd|日期类型的日期格式
|Both|timestampFormat|任何符合Java的SimpleDataFormat的字符串或字符||时间戳类型格式
|Read|primitiveAsString|true, false|false|将所有原始值推断为字符串类型
|Read|allowComments|true, false|false|忽略json记录中的java/c++注释
|Read|allowNumericLeadingZeros|true, false|false|允许数字中使用前导0
|Read|allowBackslashEscAPIngAny|true, false|false|允许反斜杠机制接受所有字符
|Read|multiLine|true, false|false|允许读取非换行符分隔的JSON文件

<h4>读写JSON文件</h4>

In [11]:
spark.read.format("json")\
.option("inferSchema", "true")\
.load("./data/flight-data/json/2010-summary.json").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
+-----------------+-------------------+-----+
only showing top 2 rows



In [None]:
csvFile.write.format("json").mode("overwrite").save("./tmp/my-json-file.json")

<h4>Parquet文件</h4>

- 开源的面向列的存储格式，提供了各种存储优化，提供列压缩从而节省空间
- 支持按列读取，而非整个文件读取
- Parquet是Spark的默认文件格式，建议将数据写到Parquet
- 因为从Parquet文件读取始终比从JSON文件或CSV文件效率更高
- Parquet另一个优点是支持复杂类型，也就是如果列是一个数组、map映射或struct结构，仍可以正常读入和写入
- schema内置在Parquet文件中，不需要推断

<h4>读Parquet文件</h4>

In [15]:
%%html
<img src="img/readparquet.png", width=600>

In [None]:
csvFile.write.format("parquet").mode("overwrite").save("./tmp/my-parquet-file.parquet")