# DataFrame Creation

https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html

In [1]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

## Resiliennt Distributed Datasets，RDD ：弹性分布式数据集

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [35]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [4]:
# 经典格式：
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [5]:
# 创建具有显式架构的 PySpark DataFrame:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.show()

                                                                                

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [6]:
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [7]:
# 从 pandas DataFrame 创建 PySpark DataFrame
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [8]:
# 从包含元组列表的 RDD 创建 PySpark DataFrame
rdd = spark.sparkContext.parallelize([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [9]:
# All DataFrames above result same.
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



# 查看

In [10]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


### 垂直

In [11]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



In [12]:
df.show(vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
-RECORD 1------------------
 a   | 2                   
 b   | 3.0                 
 c   | string2             
 d   | 2000-02-01          
 e   | 2000-01-02 12:00:00 
-RECORD 2------------------
 a   | 3                   
 b   | 4.0                 
 c   | string3             
 d   | 2000-03-01          
 e   | 2000-01-03 12:00:00 



### 摘要

In [13]:
df.select("a", "b", "c").describe().show()

[Stage 12:>                                                         (0 + 4) / 4]

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



                                                                                

DataFrame.collect()将分布式数据收集到驱动端作为 Python 中的本地数据。

In [14]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

为了避免抛出内存不足异常，请使用DataFrame.take()or DataFrame.tail()

In [15]:
# take() missing 1 required positional argument: 'num'
df.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

In [16]:
df.tail(2)

[Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

PySpark DataFrame 还提供转换回pandas DataFrame以利用 pandas API

In [17]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [18]:
type(df)

pyspark.sql.dataframe.DataFrame

# 选择和访问数据

PySpark DataFrame 是惰性计算的，简单地选择一列不会触发计算，但它会返回一个Column实例。

In [19]:
df.a

Column<'a'>

In [20]:
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [21]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

In [22]:
#  c: string (nullable = true)
type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

In [23]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



分配新Column实例

In [24]:
df.withColumn('upper_c' , upper(df.c).cast("Integer")).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|   null|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|   null|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|   null|
+---+---+-------+----------+-------------------+-------+



In [25]:
df_c = df.withColumn('upper_c' , upper(df.c)).show()
df_c

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



选择行的子集:DataFrame.filter()

In [26]:
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



# 应用函数

 UDF：https://blog.csdn.net/lsshlsw/article/details/79932643

https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html

允许用户在 Python 原生函数中直接使用pandas Series中的 API。

In [27]:
from pyspark.sql.functions import pandas_udf

In [28]:
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

+-------+
|(a + 1)|
+-------+
|      2|
|      3|
|      4|
+-------+



另一个例子是DataFrame.mapInPandas允许用户直接使用pandas DataFrame中的 API，而不受结果长度等任何限制。

In [29]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield (pandas_df[pandas_df.a == 1])

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

[Stage 34:>                                                         (0 + 3) / 3]

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



                                                                                

# 分组数据

PySpark DataFrame 还提供了一种使用常用方法拆分-应用-组合策略来处理分组数据的方法。它按特定条件对数据进行分组，对每个组应用一个函数，然后将它们组合回 DataFrame。

In [30]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [31]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



您还可以使用 pandas API 对每个组应用 Python 本机函数。

In [32]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



共同分组和应用功能

In [33]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    asof_join, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+



# 输入/输出数据

In [36]:
df.write.csv('foo.csv', header=True)

In [37]:
spark.read.csv('foo.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



CSV 简单易用。Parquet 和 ORC 是高效且紧凑的文件格式，可以更快地读取和写入。

In [38]:
df.write.parquet('bar.parquet')

                                                                                

In [39]:
spark.read.parquet('bar.parquet').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
+-----+------+---+---+



In [40]:
df.write.orc('zoo.orc')

In [41]:
spark.read.orc('zoo.orc').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
+-----+------+---+---+



# 使用SQL

DataFrame 和 Spark SQL 共享相同的执行引擎，因此它们可以无缝互换使用。

In [42]:
df.createOrReplaceTempView("tableA")

In [43]:
spark.sql("SELECT * from tableA").show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [44]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)

spark.sql("SELECT * from tableA").show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [45]:
spark.sql("SELECT add_one(v1) FROM tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



In [53]:
spark.sql("SELECT color,v1,V2 FROM tableA where color = 'red'").show()

+-----+---+---+
|color| v1| V2|
+-----+---+---+
|  red|  1| 10|
|  red|  3| 30|
|  red|  5| 50|
|  red|  7| 70|
|  red|  8| 80|
+-----+---+---+



In [56]:
spark.sql("SELECT add_one(v2) FROM tableA").show()



+-----------+
|add_one(v2)|
+-----------+
|         11|
|         21|
|         31|
|         41|
|         51|
|         61|
|         71|
|         81|
+-----------+



                                                                                

这些 SQL 表达式可以直接混合并用作 PySpark 列。

In [57]:
from pyspark.sql.functions import expr

In [58]:
# two ways：
df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+

