In [1]:
from pyspark.sql import SparkSession

In [2]:
filepath = '/Users/williamtun/Documents/Code/DataEngineer/pyspark/iris.csv'

In [4]:
spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Create DF1

In [5]:
df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
                load(filepath,header=True)

In [6]:
df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



# Create DF2

In [89]:
df2 = spark.sparkContext.parallelize([(None, 3.2, 1.3, 0.2, 'Setosa', "oops"),
             (4.8, 3.2, None, 0.2, 'Setosa', None),
             (5.3, None, 1.3, None, 'Versicolor', "oops")]).toDF(['sepal.length', 
                                        'sepal.width', 
                                        'petal.length',
                                        'petal.width',
                                        'variety', 
                                         'disposable_col'])

In [71]:
df2.show(3)

+------------+-----------+------------+-----------+----------+--------------+
|sepal.length|sepal.width|petal.length|petal.width|   variety|disposable_col|
+------------+-----------+------------+-----------+----------+--------------+
|        null|        3.2|         1.3|        0.2|    Setosa|          oops|
|         4.8|        3.2|        null|        0.2|    Setosa|          null|
|         5.3|       null|         1.3|       null|Versicolor|          oops|
+------------+-----------+------------+-----------+----------+--------------+



# Explore

In [28]:
df2.columns

['sepal.length',
 'sepal.width',
 'petal.length',
 'petal.width',
 'variety',
 'disposable_col']

In [29]:
df2.dtypes

[('sepal.length', 'double'),
 ('sepal.width', 'double'),
 ('petal.length', 'double'),
 ('petal.width', 'double'),
 ('variety', 'string'),
 ('disposable_col', 'string')]

In [30]:
df2 = df2.fillna(-3)

In [31]:
df2.show(4)

+------------+-----------+------------+-----------+----------+--------------+
|sepal.length|sepal.width|petal.length|petal.width|   variety|disposable_col|
+------------+-----------+------------+-----------+----------+--------------+
|        -3.0|        3.2|         1.3|        0.2|    Setosa|          oops|
|         4.8|        3.2|        -3.0|        0.2|    Setosa|          null|
|         5.3|       -3.0|         1.3|       -3.0|Versicolor|          oops|
+------------+-----------+------------+-----------+----------+--------------+



# Replace values in a column

In [48]:
from pyspark.sql.functions import when, lit, col

In [90]:
df2 = df2.withColumn(
    "disposable_col",
    when(
        col("disposable_col").isin('oops'),
        "error"
    ).otherwise(col("disposable_col"))
)

In [85]:
df2.show()

+------------+-----------+------------+-----------+----------+--------------+
|sepal.length|sepal.width|petal.length|petal.width|   variety|disposable_col|
+------------+-----------+------------+-----------+----------+--------------+
|        null|        3.2|         1.3|        0.2|    Setosa|         error|
|         4.8|        3.2|        null|        0.2|    Setosa|          null|
|         5.3|       null|         1.3|       null|Versicolor|         error|
+------------+-----------+------------+-----------+----------+--------------+



# New column name

In [91]:
df2 = df2.withColumnRenamed("disposable_col","error_boolean") #.printSchema()

In [92]:
df2.show()

+------------+-----------+------------+-----------+----------+-------------+
|sepal.length|sepal.width|petal.length|petal.width|   variety|error_boolean|
+------------+-----------+------------+-----------+----------+-------------+
|        null|        3.2|         1.3|        0.2|    Setosa|        error|
|         4.8|        3.2|        null|        0.2|    Setosa|         null|
|         5.3|       null|         1.3|       null|Versicolor|        error|
+------------+-----------+------------+-----------+----------+-------------+



In [93]:
df2.printSchema()

root
 |-- sepal.length: double (nullable = true)
 |-- sepal.width: double (nullable = true)
 |-- petal.length: double (nullable = true)
 |-- petal.width: double (nullable = true)
 |-- variety: string (nullable = true)
 |-- error_boolean: string (nullable = true)



In [100]:
df2

DataFrame[sepal.length: double, sepal.width: double, petal.length: double, petal.width: double, variety: string, error_boolean: string]

In [101]:
df2.show()

+------------+-----------+------------+-----------+----------+-------------+
|sepal.length|sepal.width|petal.length|petal.width|   variety|error_boolean|
+------------+-----------+------------+-----------+----------+-------------+
|        null|        3.2|         1.3|        0.2|    Setosa|        error|
|         4.8|        3.2|        null|        0.2|    Setosa|         null|
|         5.3|       null|         1.3|       null|Versicolor|        error|
+------------+-----------+------------+-----------+----------+-------------+



In [102]:
df2 = df2.withColumnRenamed("sepal.length","sepal_length") \
    .withColumnRenamed("sepal.width","sepal_width") 

In [103]:
df3.show()

+------------+-----------+------------+-----------+----------+-------------+
|sepal_length|sepal_width|petal.length|petal.width|   variety|error_boolean|
+------------+-----------+------------+-----------+----------+-------------+
|        null|        3.2|         1.3|        0.2|    Setosa|        error|
|         4.8|        3.2|        null|        0.2|    Setosa|         null|
|         5.3|       null|         1.3|       null|Versicolor|        error|
+------------+-----------+------------+-----------+----------+-------------+



# Drop column

In [104]:
drop_name = "error_boolean"

In [108]:
df4 = df3.drop(drop_name)

In [109]:
df4.show()

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal.length|petal.width|   variety|
+------------+-----------+------------+-----------+----------+
|        null|        3.2|         1.3|        0.2|    Setosa|
|         4.8|        3.2|        null|        0.2|    Setosa|
|         5.3|       null|         1.3|       null|Versicolor|
+------------+-----------+------------+-----------+----------+



# Filter

In [115]:
# single condition filtering
df4[df4.sepal_width==3.2].show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|        null|        3.2|         1.3|        0.2| Setosa|
|         4.8|        3.2|        null|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+



In [114]:
# multiple condition filtering
df4[(df4.sepal_width==3.2)&(df4.sepal_length>4.0)].show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         4.8|        3.2|        null|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+



# Join

In [120]:
df.join(df4, on="petal.length", how="left").head(4)

[Row(petal.length=5.4, sepal.length=6.9, sepal.width=3.1, petal.width=2.1, variety='Virginica', sepal_length=None, sepal_width=None, petal.width=None, variety=None),
 Row(petal.length=5.4, sepal.length=6.2, sepal.width=3.4, petal.width=2.3, variety='Virginica', sepal_length=None, sepal_width=None, petal.width=None, variety=None),
 Row(petal.length=3.5, sepal.length=5.0, sepal.width=2.0, petal.width=1.0, variety='Versicolor', sepal_length=None, sepal_width=None, petal.width=None, variety=None),
 Row(petal.length=3.5, sepal.length=5.7, sepal.width=2.6, petal.width=1.0, variety='Versicolor', sepal_length=None, sepal_width=None, petal.width=None, variety=None)]

In [121]:
df.join(df4, on="petal.length", how="inner").head(4)

[Row(petal.length=1.3, sepal.length=4.4, sepal.width=3.2, petal.width=0.2, variety='Setosa', sepal_length=None, sepal_width=3.2, petal.width=0.2, variety='Setosa'),
 Row(petal.length=1.3, sepal.length=4.5, sepal.width=2.3, petal.width=0.3, variety='Setosa', sepal_length=None, sepal_width=3.2, petal.width=0.2, variety='Setosa'),
 Row(petal.length=1.3, sepal.length=5.0, sepal.width=3.5, petal.width=0.3, variety='Setosa', sepal_length=None, sepal_width=3.2, petal.width=0.2, variety='Setosa'),
 Row(petal.length=1.3, sepal.length=4.4, sepal.width=3.0, petal.width=0.2, variety='Setosa', sepal_length=None, sepal_width=3.2, petal.width=0.2, variety='Setosa')]