In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/04 13:46:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

# create spark dataframe

In [3]:
from pyspark.sql import Row
from datetime import datetime, date

In [8]:
df = spark.createDataFrame(
    [
        Row(a = 1, b = 2., c='str1', d=date(2024,5,4), e=datetime(2024,5,4,12,0)),
        Row(a = 2, b= 3.0, c='test2', d=date(2024,5,7), e=datetime(2024,5,4,1,2))
    ]
)

In [9]:
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [19]:
df = spark.createDataFrame(
    [
        Row(a = 1, b = 2., c='str1', d=date(2024,5,4), e=datetime(2024,5,4,12,0)),
        Row(a = 2, b= 3.0, c='test2', d=date(2024,5,7), e=datetime(2024,5,4,1,2))
    ],
    schema = 'a string, b double, c string, d date, e timestamp'
)

In [20]:
df

DataFrame[a: string, b: double, c: string, d: date, e: timestamp]

In [22]:
import pandas as pd

In [25]:
pd_df = pd.DataFrame({
    'a': [1,2],
    'b': [2., 3.0],
    'c': ['str1', 'test2'],
    'd': [date(2024,5,4),date(2024,5,7)],
    'e': [datetime(2024,5,4,12,0),datetime(2024,5,4,1,2)]
})
df = spark.createDataFrame(pd_df)

In [26]:
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [27]:
df.show()

                                                                                

+---+---+-----+----------+-------------------+
|  a|  b|    c|         d|                  e|
+---+---+-----+----------+-------------------+
|  1|2.0| str1|2024-05-04|2024-05-04 12:00:00|
|  2|3.0|test2|2024-05-07|2024-05-04 01:02:00|
+---+---+-----+----------+-------------------+



In [28]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [29]:
df.show(1)

+---+---+----+----------+-------------------+
|  a|  b|   c|         d|                  e|
+---+---+----+----------+-------------------+
|  1|2.0|str1|2024-05-04|2024-05-04 12:00:00|
+---+---+----+----------+-------------------+
only showing top 1 row



In [30]:
df.show(1,vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | str1                
 d   | 2024-05-04          
 e   | 2024-05-04 12:00:00 
only showing top 1 row



In [31]:
df.columns

['a', 'b', 'c', 'd', 'e']

# spark select columns and show

In [32]:
df.select(['a','b']).show()

+---+---+
|  a|  b|
+---+---+
|  1|2.0|
|  2|3.0|
+---+---+



In [34]:
df.select(df.columns).describe().show()

[Stage 12:>                                                         (0 + 8) / 8]

+-------+------------------+------------------+-----+
|summary|                 a|                 b|    c|
+-------+------------------+------------------+-----+
|  count|                 2|                 2|    2|
|   mean|               1.5|               2.5| NULL|
| stddev|0.7071067811865476|0.7071067811865476| NULL|
|    min|                 1|               2.0| str1|
|    max|                 2|               3.0|test2|
+-------+------------------+------------------+-----+



                                                                                

In [35]:
df.collect()

[Row(a=1, b=2.0, c='str1', d=datetime.date(2024, 5, 4), e=datetime.datetime(2024, 5, 4, 12, 0)),
 Row(a=2, b=3.0, c='test2', d=datetime.date(2024, 5, 7), e=datetime.datetime(2024, 5, 4, 1, 2))]

In [36]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,str1,2024-05-04,2024-05-04 12:00:00
1,2,3.0,test2,2024-05-07,2024-05-04 01:02:00


# spark functions

In [39]:
from pyspark.sql.functions import upper

In [40]:
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-----+----------+-------------------+-------+
|  a|  b|    c|         d|                  e|upper_c|
+---+---+-----+----------+-------------------+-------+
|  1|2.0| str1|2024-05-04|2024-05-04 12:00:00|   STR1|
|  2|3.0|test2|2024-05-07|2024-05-04 01:02:00|  TEST2|
+---+---+-----+----------+-------------------+-------+



In [41]:
df.filter(df.a == 1).show()

+---+---+----+----------+-------------------+
|  a|  b|   c|         d|                  e|
+---+---+----+----------+-------------------+
|  1|2.0|str1|2024-05-04|2024-05-04 12:00:00|
+---+---+----+----------+-------------------+



In [42]:
df.withColumn('upper_c', upper(df.c)).filter(df.a == 1).show()

+---+---+----+----------+-------------------+-------+
|  a|  b|   c|         d|                  e|upper_c|
+---+---+----+----------+-------------------+-------+
|  1|2.0|str1|2024-05-04|2024-05-04 12:00:00|   STR1|
+---+---+----+----------+-------------------+-------+



# spark transformation udf

In [46]:
from pyspark.sql.functions import pandas_udf

In [47]:
@pandas_udf('long')
def pandas_plusone(series):
    return series+1

In [48]:
df.select(pandas_plusone(df.a)).show()

                                                                                

+-----------------+
|pandas_plusone(a)|
+-----------------+
|                2|
|                3|
+-----------------+



In [49]:
def pandas_filter_func(iter):
    for pd_dff in iter:
        yield pd_dff[pd_dff.a==1]

df.mapInPandas(pandas_filter_func, schema= df.schema).show()

                                                                                

+---+---+----+----------+-------------------+
|  a|  b|   c|         d|                  e|
+---+---+----+----------+-------------------+
|  1|2.0|str1|2024-05-04|2024-05-04 12:00:00|
+---+---+----+----------+-------------------+



In [50]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], 
    ['blue', 'banana', 2, 20], 
    ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], 
    ['red', 'carrot', 5, 50], 
    ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], 
    ['red', 'grape', 8, 80]
], schema = ['color','fruit','v1', 'v2']
)
df

DataFrame[color: string, fruit: string, v1: bigint, v2: bigint]

# spark group by

In [51]:
df.groupBy('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [59]:
def plus_mean(pandas_df):
    print(pandas_df)
    print(pandas_df.v1 )
    print(pandas_df.v1.mean() )
    print(' val ' ,pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean()))
    print(pandas_df.v1 )
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

[Stage 58:>                                                         (0 + 1) / 1]

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



   color   fruit  v1  v2
0  black  carrot   6  60
0    6
Name: v1, dtype: int64
6.0
 val     color   fruit   v1  v2
0  black  carrot  0.0  60
0    6
Name: v1, dtype: int64
  color   fruit  v1  v2
0  blue  banana   2  20
1  blue   grape   4  40
0    2
1    4
Name: v1, dtype: int64
3.0
 val    color   fruit   v1  v2
0  blue  banana -1.0  20
1  blue   grape  1.0  40
0    2
1    4
Name: v1, dtype: int64
  color   fruit  v1  v2
0   red  banana   1  10
1   red  carrot   3  30
2   red  carrot   5  50
3   red  banana   7  70
4   red   grape   8  80
0    1
1    3
2    5
3    7
4    8
Name: v1, dtype: int64
4.8
 val    color   fruit   v1  v2
0   red  banana -3.8  10
1   red  carrot -1.8  30
2   red  carrot  0.2  50
3   red  banana  2.2  70
4   red   grape  3.2  80
0    1
1    3
2    5
3    7
4    8
Name: v1, dtype: int64
                                                                                

# spark cogroup

In [64]:
df1 = spark.createDataFrame([
    (20000101, 1, 1.0), 
    (20000101, 2, 2.0), 
    (20000102, 1, 3.0), 
    (20000102, 2, 4.0)
],
    ('time', 'id', 'v1')
)

df2 = spark.createDataFrame([
     (20000101, 1, 'x'), 
     (20000101, 2, 'y')
],
    ('time', 'id', 'v2')
)


DataFrame[time: bigint, id: bigint, v1: double]

In [65]:
df1.show()

+--------+---+---+
|    time| id| v1|
+--------+---+---+
|20000101|  1|1.0|
|20000101|  2|2.0|
|20000102|  1|3.0|
|20000102|  2|4.0|
+--------+---+---+



In [66]:
df2.show()

+--------+---+---+
|    time| id| v2|
+--------+---+---+
|20000101|  1|  x|
|20000101|  2|  y|
+--------+---+---+



In [67]:
df1.groupby('id').avg().show()

+---+------------+-------+-------+
| id|   avg(time)|avg(id)|avg(v1)|
+---+------------+-------+-------+
|  1|2.00001015E7|    1.0|    2.0|
|  2|2.00001015E7|    2.0|    3.0|
+---+------------+-------+-------+



In [69]:
df1

DataFrame[time: bigint, id: bigint, v1: double]

In [70]:
def merge_ordered(l, r):
    return pd.merge_ordered(l,r)
    
df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
merge_ordered, schema='time int, id int, v1 double, v2 string').show()

[Stage 75:>                                                         (0 + 1) / 1]

+--------+---+---+----+
|    time| id| v1|  v2|
+--------+---+---+----+
|20000101|  1|1.0|   x|
|20000102|  1|3.0|NULL|
|20000101|  2|2.0|   y|
|20000102|  2|4.0|NULL|
+--------+---+---+----+



                                                                                

# spark write and read from file csv,parquet,orc

In [71]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], 
    ['blue', 'banana', 2, 20], 
    ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], 
    ['red', 'carrot', 5, 50], 
    ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], 
    ['red', 'grape', 8, 80]
], schema = ['color','fruit','v1', 'v2']
)

In [72]:
df.write.csv('foo.csv', header=True)

                                                                                

In [74]:
spark.read.csv('foo.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  1| 10|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [75]:
df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()

24/05/04 22:03:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  7| 70|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|  red|banana|  1| 10|
|  red|carrot|  3| 30|
|  red| grape|  8| 80|
+-----+------+---+---+



In [76]:
df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  1| 10|
|  red|carrot|  5| 50|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
+-----+------+---+---+



# process as Spark SQL

In [77]:
df.createOrReplaceTempView('fruit_table')

In [78]:
spark.sql('select * from fruit_table').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [87]:
@pandas_udf('long')
def add_one_fun(series):
    return series+2

spark.udf.register('add_1',add_one_fun)
spark.sql('select add_1(v1) from fruit_table').show()

+---------+
|add_1(v1)|
+---------+
|        3|
|        4|
|        5|
|        6|
|        7|
|        8|
|        9|
|       10|
+---------+



In [89]:
from pyspark.sql.functions import expr

df.selectExpr('add_1(v1)').show()

+---------+
|add_1(v1)|
+---------+
|        3|
|        4|
|        5|
|        6|
|        7|
|        8|
|        9|
|       10|
+---------+



In [90]:
df.select(expr('count(*)') > 0).show()

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+

