In [1]:
from pyspark.sql import SparkSession

'''
来自：examples/src/main/python/sql/datasource.py
包含：
    1、读取、写入数据（三种方式）
    2、根据 parquet file 、json file 等创建临时视图
    3、合并 parquet schema
    3、连接 mysql
                    
DataFrame函数：http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

以下示例均单机测试
'''

spark = SparkSession\
    .builder\
    .master("local")\
    .appName("datasource")\
    .getOrCreate()

In [2]:
# 三种读取方式

df1 = spark.read.load("data/users.parquet")
df2 = spark.read.parquet("data/users.parquet")
df3 = spark.sql("select * from parquet.`data/users.parquet`")

df1.show()
df2.show()
df3.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [18]:
# 三种写入方式  当前pyspark==2.4.4

df1.write.save("data/ouput1",format="json")
df1.write.partitionBy("name").format("json").save("data/ouput2")
df1.write.json("data/ouput3")

In [4]:
# 读取json，写入到parquet

dfj = spark.read.load("data/people.json", format="json")
dfj.select("name", "age")\
    .write\
    .save("data/namesAndAges.parquet", format="parquet")

In [5]:
# 读取json的另一种形式： RDD

def json_dataset_example(spark):
    sc = spark.sparkContext

# path：string represents path to the JSON dataset,or RDD of Strings storing JSON objects.

    path = "data/people.json"
    peopleDF = spark.read.json(path)
    peopleDF.printSchema()
    
    peopleDF.createOrReplaceTempView("people")

    teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()

    # Alternatively, a DataFrame can be created for a JSON dataset represented by
    # an RDD[String] storing one JSON object per string
    jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
    otherPeopleRDD = sc.parallelize(jsonStrings)
    otherPeople = spark.read.json(otherPeopleRDD)
    otherPeople.show()

json_dataset_example(spark)

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+------+
|  name|
+------+
|Justin|
+------+

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]| Yin|
+----------------+----+



In [7]:
# 读取csv

dfc = spark.read.load("data/people.csv",
                      format="csv", 
                      sep=":", 
                      inferSchema="true", 
                      header="true")
dfc.show()

+------------------+
|      name;age;job|
+------------------+
|Jorge;30;Developer|
|  Bob;32;Developer|
+------------------+



In [9]:
# 读取orc，写入到orc

dfo = spark.read.orc("data/users.orc")
dfo.write.format("orc")\
    .option("orc.bloom.filter.columns", "favorite_color")\
    .option("orc.dictionary.key.threshold", "1.0")\
    .save("data/users_with_options.orc")

In [10]:
# 读取json文件，写入到parquet
# 再根据 parquet file 创建临时视图

def parquet_example(spark):
    peopleDF = spark.read.json("data/people.json")
    peopleDF.write.parquet("data/people.parquet")
    
    parquetFile = spark.read.parquet("data/people.parquet")
    
    parquetFile.createOrReplaceTempView("parquetFile")
    
    spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19").show()

parquet_example(spark)

+------+
|  name|
+------+
|Justin|
+------+



In [21]:
# 合并 parquet schema

from pyspark.sql import Row
def parquet_schema_merging_example(spark):
    sc = spark.sparkContext
    
    squaresDF = spark.createDataFrame(sc.parallelize(range(1,6))
                                      .map(lambda x:Row(single=x,double=x**2)))
    squaresDF.write.parquet("data/test_table/key=1")
    
    cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                    .map(lambda i: Row(single=i, triple=i ** 3)))
    cubesDF.write.parquet("data/test_table/key=2")
    
    mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    mergedDF.printSchema()
    mergedDF.show()

parquet_schema_merging_example(spark)

## 读取avro的数据，写入到avro

从 MAVEN 下载对应版本的 JAR 包 `spark-avro_2.12-3.0.1.jar`，放到 Spark 的 `jars` 目录下 【要和spark版本对应】

集群执行：`spark-submit --master spark://zgg:7077 avro_read_sparksql.py`


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("datasource_avro")\
    .getOrCreate()

dfa = spark.read.format("avro").load("file:///root/data/users.avro")

dfa.select("name", "favorite_color").write.format("avro").save("file:///root/data/namesAndFavColors.avro")

In [None]:
'''
1. 启动kafka生产者和消费者
2. 集群执行：spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 
                         --master spark://zgg:7077
                         avro_kafka_sparksql.py  
                         
生产者发送avros数据：https://www.cnblogs.com/fangjb/archive/2004/01/13/13355086.html
'''

from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("datasource_avro")\
    .getOrCreate()
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("/root/data/user.avsc", "r").read()

df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "zgg:9092")\
  .option("subscribe", "avrofrom")\
  .load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "zgg:9092")\
  .option("topic", "avroto")\
  .option("checkpointLocation","/opt/spark-3.0.1-bin-hadoop3.2/checkpointLocation")\
  .start()


## 连接 JDBC source

要将 `mysql-connector-java-8.0.21.jar` 放到 spark 的 `jars` 目录下

集群执行：`spark-submit --master spark://zgg:7077 mysql_integration__sparksql_test.py`

In [None]:
# test_spark_mysql.py
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("datasource")\
    .getOrCreate()

def jdbc_dataset_example(spark):
    # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    # Loading data from a JDBC source
    jdbcDF = spark.read \
        .format("jdbc") \
        .option("driver","com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306") \
        .option("dbtable", "mysql.dept_emp") \
        .option("user", "root") \
        .option("password", "1234") \
        .load()

    jdbcDF.write \
        .format("jdbc") \
        .option("driver","com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306") \
        .option("dbtable", "mysql.dept_emp_bk") \
        .option("user", "root") \
        .option("password", "1234") \
        .save()

jdbc_dataset_example(spark)  

# 更多连接方式见 examples/src/main/python/sql/datasource.py

## 连接 hive

1、在 classpath 添加许多依赖，将 hive-site.xml, core-site.xml 和 hdfs-site.xml 放到 conf/. 目录下。

2、实例化一个 Hive 支持的 SparkSession

3、集群执行：`spark-submit --master spark://zgg:7077 hive_integration_sparksql_test.py >hive_test_spark.log`

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

def test_hive_example():
    
    spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .enableHiveSupport() \
    .getOrCreate()
    
    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    spark.sql("LOAD DATA LOCAL INPATH '/root/data/kv1.txt' INTO TABLE src")
    
    print("SHOW ALL DATA:")
    spark.sql("SELECT * FROM src").show()
    
    print("AGGREGATION QUERIES:")
# Aggregation queries are also supported.
    spark.sql("SELECT COUNT(*) FROM src").show()

    print("WHERE FILTER DATA:")
# The results of SQL queries are themselves DataFrames and support all normal functions.
    sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

    print("TRANSFER TO RDD:")
# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
    stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
    for record in stringsDS.collect():
        print(record)
        
# You can also use DataFrames to create temporary views within a SparkSession.
    Record = Row("key", "value")
    recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
    recordsDF.createOrReplaceTempView("records")

    print("CREATE TEMP VIEW:")
# Queries can then join DataFrame data with data stored in Hive.
    spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

test_hive_example()

# 写入到持久化表
# df1.write.partitionBy("favorite_color")\
#     .bucketBy(42,"name")\
#     .saveAsTable("people_partitioned_bucketed")

## 一些通用配置

下面内容仅支持spark3.x。

In [11]:
# 忽略损坏文件

spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df = spark.read.parquet("data/dir1","data/dir1/dir2")
test_corrupt_df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
|file2.parquet|
+-------------+



In [18]:
# 忽略缺失文件

spark.sql("set spark.sql.files.ignoreMissingFiles=true")
test_corrupt_df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
|file2.parquet|
+-------------+



In [13]:
# 路径全局过滤

spark.sql("set spark.sql.files.ignoreCorruptFiles=false")
test_filter_df = spark.read.load("data/dir1",format="parquet",pathGlobFilter="*.parquet")
test_filter_df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
+-------------+



In [15]:
# 递归查找文件

spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
recursive_loaded_df = spark.read.format("parquet")\
    .option("recursiveFileLookup","true")\
    .load("data/dir1")
recursive_loaded_df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
|file2.parquet|
+-------------+



## 表间join

详细描述见性能调优页面

```sh
>>> spark.sql("select * from test").show()
+----+------+----+
|dept|userid| sal|
+----+------+----+
|  d1| user1|1000|
|  d1| user2|2000|
|  d1| user3|3000|
|  d2| user4|4000|
|  d2| user5|5000|
+----+------+----+

>>> spark.table("test").join(spark.table("test").hint("broadcast"), "userid").show()
+------+----+----+----+----+                                                    
|userid|dept| sal|dept| sal|
+------+----+----+----+----+
| user1|  d1|1000|  d1|1000|
| user2|  d1|2000|  d1|2000|
| user3|  d1|3000|  d1|3000|
| user4|  d2|4000|  d2|4000|
| user5|  d2|5000|  d2|5000|
+------+----+----+----+----+
```