In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.getOrCreate()

In [3]:
from datetime import datetime, date 
import pandas as pd

In [4]:
df = spark.createDataFrame(
    [
        Row(
            a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)
        ),
        Row(
            a=None, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)
        ),
        Row(
            a=4, b=5., c='string3', d=date(2009, 3, 1), e=datetime(2000, 1, 3, 12, 0)
        )
    ]
)

df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [5]:
df = spark.createDataFrame(
    [
        (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string1', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
        (3, 4., 'string1', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ], schema = 'a long, b double, c string, d date, e timestamp'
)

df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [6]:
pandas_df = pd.DataFrame(
    {
        'a': [1,2,3],
        'b': [2., 3., 4.],
        'c': ['string1', 'string2', 'string3'],
        'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
        'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
    }
)

df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [7]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [8]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [12]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [13]:
df.show(1)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row



In [15]:
df.show(2, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
-RECORD 1------------------
 a   | 2                   
 b   | 3.0                 
 c   | string2             
 d   | 2000-02-01          
 e   | 2000-01-02 12:00:00 
only showing top 2 rows



In [16]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [17]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [20]:
df.select('a').describe().show()

+-------+---+
|summary|  a|
+-------+---+
|  count|  3|
|   mean|2.0|
| stddev|1.0|
|    min|  1|
|    max|  3|
+-------+---+



In [21]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In [23]:
df.take(2)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0))]

In [25]:
df.tail(1)

[Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In [29]:
df.toPandas().head(1)

  series = series.astype(t, copy=False)


Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00


In [30]:
df.select('a')

a
1
2
3


In [32]:
df.select(df.c)

c
string1
string2
string3


In [37]:
from pyspark.sql.functions import upper
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [34]:
df.filter(df.a == 2).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+



In [38]:
from pyspark.sql.functions import pandas_udf

In [40]:
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    return series + 2

df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 3|
|                 4|
|                 5|
+------------------+

