In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Spark session & context
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext

### CREATE

In [19]:
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("value", StringType(), True),
])
data = [
    Row(1, "First"),
    Row(2, "Second")
]
df = spark.createDataFrame(data, schema)
df.show()

+---+------+
|id |value |
+---+------+
|1  |First |
|2  |Second|
+---+------+



### INPUT

In [25]:
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("value", StringType(), True),
])
df = spark.read.format('csv') \
    .options(header=True, delimiter='^') \
    .schema(schema) \
    .load('dataSource/test.csv')
df.show()

+---+-----+
| id|value|
+---+-----+
|  1|  One|
|  2|  Two|
+---+-----+



In [38]:
df = spark.read \
        .format("parquet") \
        .load("dataSource/parquet")
df.show(100)

+---+-----+
| id|value|
+---+-----+
|  1|  One|
|  2|  Two|
+---+-----+



### TRANSFORMATION

In [40]:
# modify single values based on condition
df = df.withColumn('value', when(col('id') == '1', None).otherwise(col('value')))
df.show()

+---+-----+
| id|value|
+---+-----+
|  1| null|
|  2|  Two|
+---+-----+



In [41]:
df = df.filter(col('value').isNotNull())
df.show()

+---+-----+
| id|value|
+---+-----+
|  2|  Two|
+---+-----+



### OUTPUT

In [30]:
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
df.repartition(1).write \
      .mode("overwrite") \
      .parquet("dataSink/parquet/")

In [42]:
df.repartition(1).write \
      .mode("overwrite") \
      .csv("dataSink/csv/", sep="^", header="true")