In [1]:
from pyspark.sql import SparkSession, Row
from datetime import datetime, date
import pandas as pd
import time

In [2]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/15 22:51:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Create PySpark dataframe from list of Rows

In [3]:
df = spark.createDataFrame([
    Row(a=1, b = 1., c='string1', d=date(2023, 1, 1), e=datetime(2023, 1, 1, 12, 0)),    
    Row(a=2, b = 2., c='string2', d=date(2023, 1, 2), e=datetime(2023, 1, 2, 12, 0)),    
    Row(a=3, b = 3., c='string3', d=date(2023, 1, 3), e=datetime(2023, 1, 3, 12, 0))
])
df.show()

                                                                                

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|1.0|string1|2023-01-01|2023-01-01 12:00:00|
|  2|2.0|string2|2023-01-02|2023-01-02 12:00:00|
|  3|3.0|string3|2023-01-03|2023-01-03 12:00:00|
+---+---+-------+----------+-------------------+



                                                                                

Create PySpark dataframe with explicit schema

In [4]:
df = spark.createDataFrame([
    Row(a=1, b= 1., c= 'String1', d = date(2023, 1, 1), e=datetime(2023,1,1,12,0)),
    Row(a=2, b= 2., c= 'String2', d = date(2023, 2, 1), e=datetime(2023,1,2,12,0)),
    Row(a=3, b= 3., c= 'String3', d = date(2023, 3, 1), e=datetime(2023,1,3,12,0))],
    schema = 'a long, b double, c string, d date, e timestamp'
)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Create PySpark dataframe with pandas dataframe

In [5]:
pandas_df = pd.DataFrame(
    {
        'a': [1,2,3],
        'b': [1., 2., 3.],
        'c': ['String1', 'String2', 'String3'],
        'd': [date(2023,1,1), date(2023,1,2), date(2023,1,3)],
        'e': [datetime(2023,1,1,12,0), datetime(2023,1,2,12,0), datetime(2023,1,3,12,0)]
    }
)

df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [6]:
data = [[1, 1., 'String1', date(2023,1,1), datetime(2023,1,1,12,0)],
        [2, 2., 'String2', date(2023,1,2), datetime(2023,1,2,12,0)],
        [3, 3., 'String3', date(2023,1,3), datetime(2023,1,3,12,0)],
        [4, 4., 'String4', date(2023,1,4), datetime(2023,1,4,12,0)]]

column = ['a', 'b', 'c', 'd', 'e']
df = spark.createDataFrame(data, column)
df.show()


+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|1.0|String1|2023-01-01|2023-01-01 12:00:00|
|  2|2.0|String2|2023-01-02|2023-01-02 12:00:00|
|  3|3.0|String3|2023-01-03|2023-01-03 12:00:00|
|  4|4.0|String4|2023-01-04|2023-01-04 12:00:00|
+---+---+-------+----------+-------------------+



Dataframe display using show method

In [7]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|1.0|String1|2023-01-01|2023-01-01 12:00:00|
|  2|2.0|String2|2023-01-02|2023-01-02 12:00:00|
|  3|3.0|String3|2023-01-03|2023-01-03 12:00:00|
|  4|4.0|String4|2023-01-04|2023-01-04 12:00:00|
+---+---+-------+----------+-------------------+



In [8]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,1.0,String1,2023-01-01,2023-01-01 12:00:00
2,2.0,String2,2023-01-02,2023-01-02 12:00:00
3,3.0,String3,2023-01-03,2023-01-03 12:00:00
4,4.0,String4,2023-01-04,2023-01-04 12:00:00


In [9]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 1.0                 
 c   | String1             
 d   | 2023-01-01          
 e   | 2023-01-01 12:00:00 
only showing top 1 row



In [10]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [11]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [12]:
#summary of dataframe
df.select('a', 'b', 'c').describe().show()

[Stage 12:>                                                         (0 + 4) / 4]

+-------+------------------+------------------+-------+
|summary|                 a|                 b|      c|
+-------+------------------+------------------+-------+
|  count|                 4|                 4|      4|
|   mean|               2.5|               2.5|   null|
| stddev|1.2909944487358056|1.2909944487358056|   null|
|    min|                 1|               1.0|String1|
|    max|                 4|               4.0|String4|
+-------+------------------+------------------+-------+



                                                                                

DataFrame.collect() for collecting distributed data from the executors and bring it to the driver. Can throw out of memory exception when the dataset is too large to fit in the driver side, as all the data from the executors is collected.

In [13]:
df.collect()

[Row(a=1, b=1.0, c='String1', d=datetime.date(2023, 1, 1), e=datetime.datetime(2023, 1, 1, 12, 0)),
 Row(a=2, b=2.0, c='String2', d=datetime.date(2023, 1, 2), e=datetime.datetime(2023, 1, 2, 12, 0)),
 Row(a=3, b=3.0, c='String3', d=datetime.date(2023, 1, 3), e=datetime.datetime(2023, 1, 3, 12, 0)),
 Row(a=4, b=4.0, c='String4', d=datetime.date(2023, 1, 4), e=datetime.datetime(2023, 1, 4, 12, 0))]

In [14]:
df.take(2)
df.tail(1)

[Row(a=4, b=4.0, c='String4', d=datetime.date(2023, 1, 4), e=datetime.datetime(2023, 1, 4, 12, 0))]

toPandas() also takes all the data from executors and brings it to the driver. So this could also result in out of memory exception.

In [15]:
df

a,b,c,d,e
1,1.0,String1,2023-01-01,2023-01-01 12:00:00
2,2.0,String2,2023-01-02,2023-01-02 12:00:00
3,3.0,String3,2023-01-03,2023-01-03 12:00:00
4,4.0,String4,2023-01-04,2023-01-04 12:00:00


In [16]:
from pyspark.sql.functions import date_format

df_pandas = df.withColumn('e', date_format('e', 'yyyy-MM-dd HH:mm:ss')).toPandas()
df_pandas

Unnamed: 0,a,b,c,d,e
0,1,1.0,String1,2023-01-01,2023-01-01 12:00:00
1,2,2.0,String2,2023-01-02,2023-01-02 12:00:00
2,3,3.0,String3,2023-01-03,2023-01-03 12:00:00
3,4,4.0,String4,2023-01-04,2023-01-04 12:00:00


In [17]:
print(type(df))
print(type(df_pandas))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


Selecting and accessing data

In [18]:
df.a

Column<'a'>

In [19]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

In [20]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|String1|
|String2|
|String3|
|String4|
+-------+



In [21]:
df.withColumn('upper_c', upper('c')).show()
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|1.0|String1|2023-01-01|2023-01-01 12:00:00|STRING1|
|  2|2.0|String2|2023-01-02|2023-01-02 12:00:00|STRING2|
|  3|3.0|String3|2023-01-03|2023-01-03 12:00:00|STRING3|
|  4|4.0|String4|2023-01-04|2023-01-04 12:00:00|STRING4|
+---+---+-------+----------+-------------------+-------+

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|1.0|String1|2023-01-01|2023-01-01 12:00:00|STRING1|
|  2|2.0|String2|2023-01-02|2023-01-02 12:00:00|STRING2|
|  3|3.0|String3|2023-01-03|2023-01-03 12:00:00|STRING3|
|  4|4.0|String4|2023-01-04|2023-01-04 12:00:00|STRING4|
+---+---+-------+----------+-------------------+-------+



In [22]:
df.filter(df.a == 2).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  2|2.0|String2|2023-01-02|2023-01-02 12:00:00|
+---+---+-------+----------+-------------------+



Applying a function

In [23]:
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    return series + 1

df.select(pandas_plus_one(df.a)).show()



+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
|                 5|
+------------------+



                                                                                

In [24]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 2]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  2|2.0|String2|2023-01-02|2023-01-02 12:00:00|
+---+---+-------+----------+-------------------+



Grouping data

In [25]:
df = spark.createDataFrame([['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema = ['color', 'fruit', 'v1', 'v2'])

df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [26]:
df.groupBy('color').avg().show()



+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



                                                                                

In [27]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1 = pandas_df.v1 - pandas_df.v1.mean())

df.groupBy('color').applyInPandas(plus_mean, schema = df.schema).show()


+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



co-grouping

In [28]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def merge_ordered(l, r):
    return pd.merge_ordered(l, r)

df1.show()
df2.show()

+--------+---+---+
|    time| id| v1|
+--------+---+---+
|20000101|  1|1.0|
|20000101|  2|2.0|
|20000102|  1|3.0|
|20000102|  2|4.0|
+--------+---+---+

+--------+---+---+
|    time| id| v2|
+--------+---+---+
|20000101|  1|  x|
|20000101|  2|  y|
+--------+---+---+



In [29]:
df1.groupBy('id').cogroup(df2.groupBy('id'))\
    .applyInPandas(merge_ordered, schema = 'time int, id int, v1 double, v2 string').show()

[Stage 47:>                                                         (0 + 4) / 4]

+--------+---+---+----+
|    time| id| v1|  v2|
+--------+---+---+----+
|20000101|  1|1.0|   x|
|20000102|  1|3.0|null|
|20000101|  2|2.0|   y|
|20000102|  2|4.0|null|
+--------+---+---+----+



                                                                                

Getting data in and out

In [30]:
df.write.mode('overwrite').csv('foo.csv', header=True)
spark.read.csv('foo.csv', header = True).show()

                                                                                

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [31]:
df.write.mode('overwrite').parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()

                                                                                

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
+-----+------+---+---+



In [32]:
df.write.mode('overwrite').orc('zoo.orc')
spark.read.orc('zoo.orc').show()

                                                                                

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
+-----+------+---+---+



Working with Sql

In [33]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In [34]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()



+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



                                                                                