Skip to content

Latest commit

 

History

History
156 lines (151 loc) · 13.9 KB

05.DataFrame.md

File metadata and controls

156 lines (151 loc) · 13.9 KB

创建DataFrame对象

  • DataFrame的创建由SQLContext或HiveContext完成,前者支持SQL语法解析器(SQL-92语法),后者支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器。从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能,SparkSession实现了SQLContext及HiveContext所有功能。
  • Spark SQL支持通过DataFrame接口操作多种不同的数据源。DataFrame可以使用关系转换操作,也可用于创建临时表,将DataFrame注册为临时表进而对数据运行SQL查询。DataFrame提供统一接口加载和保存数据源中的数据,包括:结构化数据、Parquet文件、JSON文件、Hive表,以及通过JDBC连接外部数据源。

结构化数据文件创建DataFrame

  • JOSN数据集
    • Spark SQL可处理的数据源简洁高效、常用于网络传输的JSON格式数据集。Spark SQL可以自动推断JSON数据集的schema,并将其作为Dataset[Row]加载,使用sqlContext.read.json()完成。请注意,这里的JSON文件不是典型的JSON文件。每行必须包含单独的、独立的、有效的JSON对象。
      • 加载json文件为DataFrame方式:
        • sqlContext.read.format("json").load(filepath)
        • sqlContext.read.json("/usr/root/a.json")
  • Parquet文件
    • Parquet是列式存储格式文件,Spark SQL的默认数据源格式为Parquet格式,而且Parquet文件能够自动保存原始数据的schema,所以不需要使用case class(样例类)隐式转换。数据源为Parquet格式文件时,Spark SQL可以方便地进行读取,甚至可以直接在Parquet文件上执行查询操作。
    • 加载paiquet文件为DataFrame
      • sqlContext.read.load("/usr/root/usr.parquet")
    • 查询parquet文件,生成DataFrame
      • sqlContext.sql("SELECT * FROM parquet./usr/zhangyu/good.parquet")
  • 外部数据库创建DataFrame
    • Spark SQL可以从外部数据库(比如MySQL、Oracle等数据库)中创建DataFrame
    • 使用这种方式创建DataFrame需要通过JDBC连接或ODBC连接的方式访问数据库
    • val url = "jdbc:mysql://192.168.128.130/test(数据库名)"
    • sqlContext.read.format("jdbc").options(|Map("url"->url,|"user"->"root",|"password"->"root"|"dbtable"->"people")).load()
  • RDD创建DataFrame
    • Spark SQL支持已有的RDD转换为DataFrame对象。当组成RDD[T]的每一个T对象内部具有共同且鲜明的字段结构时,可以隐式或者显式地总结出创建DataFrame对象所必要的结构信息(Schema)进行转换,进而在DataFrame上调用RDD所不具备的强大丰富的API,或者执行简洁的SQL查询。
    • Spark SQL 支持两种不同的方法用于转换已存在的 RDD 成为 DataFrame。第一种方法是使用反射机制自动推断包含指定的对象类型的RDD的Schema进行隐式转化。在Spark应用程序中,已知 Schema 时这个基于反射的方法可以让代码更简洁,并且运行效果良好。第二种方法是通过编程接口,构造一个 Schema ,然后将其应用到已存在的 RDD[Row] (将RDD[T]转化为Row对象组成的RDD)。
    • 利用反射机制推断RDD模式,使用这种方式首先需要定义一个case class,因为只有case class才能被Spark隐式地转换为DataFrame。
      • 步骤1:定义一个case class
        • case class person(name:String,age:Int)
      • 步骤二:将普通RDD转换成样例类RDD
        • val data = sc.textFile("/user/root/sparkSql/people.text").map(_.split(","))
      • 步骤三:将样例类RDD转换为DataFrame
        • val people = data.map(p=>person(p(0),p(1).trim.toInt)).toDF()
    • 当case类不能提前定义时,编程接口允许用户构建schema并应用到RDD上,DataFrame就可以通过编程方式创建
      • 1700395999378
      • 从原来的RDD创建一个元组或列表的RDD,将每个元素转化成Row类型
        • val rdd1=sc.textFile("/user/people.txt") val rdd2=rdd1.map(x=>x.split(",")) val rowdd = rdd2.map(x=>Row(x(0),x(1))
      • 用StructType创建一个和上一个步骤中创建的RDD中元组或列表的结构相匹配的Schema
        • val string = "name age" val split=string.split(" ") val fields=split.map(x=>StructFiled(x,StringType,nullable=true)) val schema=StructType(fields)
      • 通过SQLcontext方法提供的createDataFrame方法将Schema应用到RDD
        • val peopleDF=sqlContext.createDateFrame(rowrdd,schema)
    • 从Hive中的表创建DataFrame
      • 从Hive表中的表创建DataFrame,先声明一个HiveContext对象
        • hiveContext.sql("use test")
      • 使用HiveContext对象查询Hive中的表并转成DataFrame
        • val people=hiveContext.sql("select * from people")

DataFrame上的操作

Action操作

Action操作在DataFrame上触发真正计算,返回结果,主要的Action操作如表 所示。Pasted image 20231121155320

DateFrame查看数据

方法 描述
printSchema 打印数据模式
show 查看数据
first/head/take/takeAsList 获取若干行数据
collect/collectAsList 获取所有数据
  • printSchema函数查看数据模式,打印出列的名称和类型

    • movies.printSchema
  • DataFrame查看数据-show 1700554063582

  • DataFrame查看数据——first/head/take/takeAsList 1700554113228

  • DataFrame查看数据——collect/collectAsList

    • collect方法可以将DataFrame中的所有数据都获取到,并返回一个Array对象
    • collectAsList方法可以获取所有数据到List
  • DataFrame统计数据 1700554216636

  • count(): Long返回DataFrame的数据记录的条数

    • Pasted image 20231121161120
  • DataFrame统计数据——describe

    • describe(cols: String*): DataFrame 该方法可以动态的传入一个或多个String类型的字段名,结果仍然是DataFrame对象,用来统计数值类型字段的统计值。在DataFrame下只需调用describe()子函数,便可以得到信息:Count(记录条数)、Mean(平均值)、Stddev(样本标准差)、Min(最小值)、Max(最大值),进而掌握大规模结构化数据集的某字段的统计特性。
  • Transformation操作 Pasted image 20231121161309

  • apply(colName: String): Column 该方法用来指定列名返回DataFrame的列。下列两种获取Column的方法等效,返回的皆为对应的Column。若需要对某指定列进行删除或者对指定列的数值进行计算等操作,可以采用该方法获得Column形式的列。 df("Height") df.apply("Weight")

  • col(colName: String): Column 该方法用来获取指定字段,apply()和col()参数类型、个数以及返回值类型均相同,只能获取某一列,返回对象为Column类型。df.col("name")

  • agg(expr:Column,exprs: Column*): DataFrame agg是一种聚合操作,该方法输入的是对于聚合操作的表达,可同时对多个列进行聚合操作,agg为DataFrame提供数据列不需要经过分组就可以执行统计操作,也可以与groupBy法配合使用。

    • 提供的统计函数

      • sum:求和
      • max:最大值
      • min:最小值
      • mean/avg:均值
      • count:条目数
      • stddev:样本标准差
    • df.agg(avg(df("age")),min(df("Height")) ,max(df("Weight"))).show()

  • select(col: String, cols: String*): DataFrame

    • 该方法用于获取指定字段值,根据传入的String类型的字段名,获取指定字段的值,以DataFrame类型返回。
    • df.select(df("name")).show(6)
  • distinct(): DataFrame

    • 该方法用来返回对DataFrame的数据记录去重后的DataFrame。具体实现如下述代码所示,选择了有重复记录的“age”列,最后调用distinct方法进行去重。
  • drop(col: Column): DataFrame

    • 该方法用来去除指定字段,保留其他字段,返回一个新的DataFrame,其中不包含去除的字段,一次只能去除一个字段。drop方法有两种重载函数:df.drop(“id”)和df.drop(df(“id”)),前者的输入参数是描述列名称的String,而后者传入的是Column类型的列。
  • except(other: DataFrame): DataFrame

    • 返回DataFrame,包含当前DataFrame的数据记录,同时Rows不在另一个DataFrame中,相当于两个DataFrame做减法
  • filter/where(conditionExpr: String): DataFrame

    • 按参数指定的SQL表达式的条件过滤DataFrame,传入筛选条件表达式,可以用and和or,得到DataFrame类型的返回结果。filter/where方法根据参数类型以及数目不同进行了同名函数重载
    • scala> df.filter(df("age“) >24).show(false)
    • scala> df.filter($"age“>24).show(false)
    • scala> df.filter("age >24 ").show(false)
    • scala> df.where("name = 'Ar'").show()
    • scala> df.where($"name"==='Ar').show()
    • scala> df.where(df("name")==='Ar').show()
  • groupBY(col1:String,cols:String*):RelationalGroupedDataset

    • 使用一个或者多个指定的列对DataFrame进行分组,以便对它们执行聚合操作。groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象。
    • val userGroupBy=user.groupBy("gender")
    • groupBy()方法得到的是RelationalGroupedDataset对象,在RelationalGroupedDataset的API中提供了groupBy()之后的操作,比如:
      • (1)max(colNames:String*)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段。
      • (2)min(colNames:String*)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段。
      • mean(colNames:String*)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段。
      • sum(colNames:String*)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段。
      • count()方法,获取分组中的元素个数。
  • intersect(other: DataFrame): DataFrame

    • 取两个DataFrame中同时存在的数据记录,返回DataFrame。
    • df.intersect(newdf).show(false)
  • limit(n: Int): DataFrame

    • limit()方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作,因为take/head获得的均为Array(数组),而limit返回的是一个新的转化生成的DataFrame对象。
  • orderBy(sortExprs: Column*): DataFrame

    • 按照给定的表达式对指定的一列或者多列进行排序,返回一个新的DataFrame,输入参数为多个Column类。默认为升序,若是要求降序排序,可以使用desc(“字段名称”)或者$”字段名”.desc或者在指定字段前面加“-”来表示降序排序
  • sort(sortExprs: Column*):DataFrame

    • 按照给定的表达式对指定的一列或者多列进行排序,返回一个新的DataFrame,输入参数为多个Column类。按指定字段排序,默认为升序,在Column后面加.desc表示降序排序,加.asc表示升序排序,sort()和orderBy()方法效果等效。
  • sample(withReplacement: Boolean, fraction: Double):DataFrame

    • sample对数据集进行采样,返回一个新的DataFrame。withReplacement=true,表示重复抽样;withReplacement=false,表示不重复抽样;fraction参数是生成行的比例。
  • join()

    • 对两个DataFrame执行join操作,DataFrame提供了三种join方法用于连接两个表
    • ![[1701781867433.png]]
  • na: DataFrameNaFunctions

    • 使用na方法对具有空值列的行数据进行处理,例如用指定值(缺失值)替换控制列的值,需要注意的是,在DataFrame对象上使用na方法后返回的是对应的DataFrameNaFunction对象,进而需要调用对应的drop、fill方法来处理指定列为空值的行,drop用来删除指定列为空值的行,fill使用指定的值替换指定空值列的值。
  • registerTempTable(tableName:String)/createOrReplaceTempView(tableName:String):

    • 将DataFrame注册成为临时表,然后通过SQL语句进行查询

DataFrame输出操作

  • save方法可以将DataFrame保存成文件,save操作有一个可选参数SaveMode,用这个参数可以指定如何处理数据已经存在的情况。
  • 在使用HiveContext的时候,DataFrame可以用saveAsTable方法,将数据保存成持久化的表。
  • 读取持久化表时,只需要用表名作为参数,调用SQLContext.table方法即可得到对应DataFrame。

将DataFrame保存到一个文件里的方法

Spark SQL的默认数据源格式为Parquet格式,对一些数据进行保存,将其保存到默认数据源上。 当数据源不是Parquet格式文件时,需要手动指定数据源的格式。

  • 存储为parquet格式数据
    • Dataframe.write.save(filepath)
  • 存储为其他格式数据
    • Dataframe.write.format(format).save(filepath)
  • `import org.apache.spatk.sql._Dataframe.write.format(format).mode(mode).save(filepath)
    • mode函数可以接收的参数有Overwrite、Append、Ignore和ErrorIfExists。
      • Overwrite代表覆盖目录下之前存在的数据
      • Append代表给指导目录下追加数据
      • Ignore代表如果目录下已经有文件,那就什么都不执行
      • ErrorIfExists代表如果保存目录下存在文件,抛出异常(默认)
  • 直接调用save(path:String,source:String,mode:SaveMode)方法
  • 使用saveAsTable将DataFrame对象copyOfUser保存在表名为copyUser的表中,此表的存储位置由metastore控制。
  • 读取持久化表时,只需要用表名作为参数,调用SQLContext.table方法即可得到对应DataFrame,或者通过sqlContext.sql(SQL)。