In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

23/09/22 08:34:54 WARN Utils: Your hostname, chris-X555LB resolves to a loopback address: 127.0.1.1; using 192.168.100.8 instead (on interface enp2s0)
23/09/22 08:34:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/22 08:34:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### DataFrame Creation

- schemas is infered (types)
- structure -> pyspark.sql.SparkSession.createDataFrame
- receive:
    - list, tuples, dictionaries
    - pyspoark.sql.Rows as a pandas DataFrame
    - RDD

In [6]:
# Datafrarme Creation
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=1., c="string1", d=date(2000,1,1), e=datetime(2000,1,1,12,0)),
    Row(a=2, b=2., c="string2", d=date(2000,2,1), e=datetime(2000,1,2,12,0)),
    Row(a=3, b=3., c="string3", d=date(2000,3,1), e=datetime(2000,1,3,12,0))
])

df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [9]:
df = spark.createDataFrame([
    Row(a=1, b=1., c="string1", d=date(2000,1,1), e=datetime(2000,1,1,12,0)),
    Row(a=2, b=2., c="string2", d=date(2000,2,1), e=datetime(2000,1,2,12,0)),
    Row(a=3, b=3., c="string3", d=date(2000,3,1), e=datetime(2000,1,3,12,0))
], schema="a long, b double, c string, d date, e timestamp")

df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [12]:
# Create a PySpark DataFrame from a pandas DataFrame
pandas_df = pd.DataFrame({
    "a": [1,2,3],
    "b": [2.,3.,4.],
    "c": ["string1","string2","string3"],
    "d": [date(2000,1,1), date(2000,2,1), date(2000,3,1)],
    "e": [datetime(2000,1,1,12,0), datetime(2000,1,2,12,0), datetime(2000,1,3,12,0)] 
})
df = spark.createDataFrame(pandas_df)
df

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [None]:
# All DaraFrame above result same 🌄
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



### Viewing Data

In [15]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [16]:
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [17]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



In [18]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [19]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [20]:
# summary general statistics
df.select("a","b","c").describe().show()

[Stage 12:>                                                         (0 + 4) / 4]

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



                                                                                

In [21]:
# bring to driver side
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In [None]:
# avoid this because of memory
#df.toPandas()

### Selecting and accesing data

In [24]:
df.a

Column<'a'>

In [27]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

In [28]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



In [29]:
# upper change to uppercase form
df.withColumn("upper_c", upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [30]:
# subset of df
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



### Applying a function
using this connection
- Pandas UDF
- Pandas Function APIs

In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("long")
def pandas_plus_one(series: pd.Series) -> pd.Series:
    #Simply plus one by using pandas Series
    return series + 1

df.select(pandas_plus_one(df.a)).show()



+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



                                                                                

### Grouping Data

In [33]:
# schema is not only type it is the name of the column
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [35]:
# only apply avg on numerical columns
df.groupby("color").avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



                                                                                

In [36]:
# apply custom function to summary
def plus_mean(pandas_df):
    # replace before column v1 with std
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby("color").applyInPandas(plus_mean, schema=df.schema).show()

[Stage 35:>                                                         (0 + 1) / 1]

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



                                                                                

In [42]:
# column names is not neccesary scheme
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000101, 1, 3.0), (20000101, 2, 4.0)],
    ("time", "id", "v1")
)
df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2')
)

def merge_ordered(l, r):
    return pd.merge_ordered(l,r)

# cogroup using the id in both dfs and join with merge function so
# it has v1 and v2 columns at the same time
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    merge_ordered, schema="time int, id int, v1 double, v2 string").show()



+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000101|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000101|  2|4.0|  y|
+--------+---+---+---+



                                                                                

In [None]:
# Getting Data In/Out