## Day53_pyspark_date_and_write_functions

### Data Function

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    current_date, current_timestamp, year, month, dayofmonth, hour,
    date_add, date_sub, datediff, add_months, months_between,
    date_format, to_date, to_timestamp, quarter,lit,col, trunc, weekofyear, dayofweek
)

data = [(1,"2023-01-01", "2023-01-08 12:34:56"), (2,"2023-06-01", "2023-06-08 23:45:01"), (3,"2023-12-31", "2024-01-01 00:00:00")]
columns = ["id","date_column", "timestamp_column"]


df = spark.createDataFrame(data, columns)
df.display()
df.printSchema()

In [0]:
df = (df.withColumn("date_column",col("date_column").cast("date"))
      .withColumn("timestamp_column",col("timestamp_column").cast("timestamp"))
     )

df.display()
df.printSchema()         

In [0]:
df = df.withColumn("current_date", current_date()) \
       .withColumn("current_timestamp", current_timestamp()) \
       .withColumn("year", year("date_column")) \
       .withColumn("month", month("date_column")) \
       .withColumn("day_of_month", dayofmonth("date_column")) \
       .withColumn("hour", hour("timestamp_column")) \
       .withColumn("date_plus_7", date_add("date_column", 10)) \
       .withColumn("date_minus_7", date_sub("date_column", 10)) \
       .withColumn("datediff_days", datediff("timestamp_column", "date_column")) \
       .withColumn("next_month", add_months("date_column", 2)) \
       .withColumn("months_between", months_between("timestamp_column", "date_column")) \
       .withColumn("formatted_date", date_format("date_column", "MMyyyy")) \
       .withColumn("to_date", to_date("timestamp_column", "yyyy-MM-dd HH:mm:ss")) \
       .withColumn("to_timestamp", to_timestamp("date_column", "yyyy-MM-dd")) \
       .withColumn("id2", lit(2)*col('id')) 

df.display()

In [0]:
df_2 = df.withColumn("current_date", current_date()) \
       .withColumn("current_timestamp", current_timestamp()) \
       .withColumn("year", year("date_column")) \
       .withColumn("month", month("date_column")) \
       .withColumn("day_of_month", dayofmonth("date_column")) \
       .withColumn("hour", hour("timestamp_column")) \
       .withColumn("date_plus_7", date_add("date_column", 10)) \
       .withColumn("date_minus_7", date_sub("date_column", 10)) \
       .withColumn("datediff_days", datediff("timestamp_column", "date_column")) \
       .withColumn("next_month", add_months("date_column", 2)) \
       .withColumn("months_between", months_between("timestamp_column", "date_column")) \
       .withColumn("formatted_date", date_format("date_column", "MMyyyy")) \
       .withColumn("to_date", to_date("timestamp_column", "yyyy-MM-dd HH:mm:ss")) \
       .withColumn("to_timestamp", to_timestamp("date_column", "yyyy-MM-dd")) \
       .withColumn("id2", lit(2)*col('id')) \
       .filter('day_of_month=1 and hour = 12')

df_2.display()

In [0]:
df.createOrReplaceTempView("df_v")



In [0]:
spark.sql("""select * from df_v where month(date_column)=1 """).display()

In [0]:
spark.sql("""select * from df_v where quarter(date_column)=4 """).display()

### write Function

Here’s the same information written cleanly (as shown in your image):

---

## **Write DataFrame to CSV / Parquet / JSON / Table**

Spark SQL provides **`dataframe.write()`** to write any DataFrame to a file.

---

### **Write Modes**

* **overwrite** – Replaces (overwrites) the existing file.
* **append** – Adds the new data to the existing file.
* **ignore** – Skips the write operation if the file already exists.
* **error** – Default mode; throws an error if the file already exists.

---

### **Example**

```python
df.write.csv(path, mode='ignore', header=True, delimiter=',')
```




In [0]:
orders_data = [
    (1, "2025-10-01", 100.50, "shipped"),
    (2, "2025-10-02", 250.00, "processing"),
    (3, "2025-10-03", 75.25, "delivered"),
    (4, "2025-10-04", 300.00, "cancelled")
]
orders_columns = ["order_id", "order_date", "amount", "status"]

df = spark.createDataFrame(orders_data, orders_columns)
df.display()

In [0]:
# basic tranformations

from pyspark.sql.functions import *
df_t = df.withColumn('amount', round(col('amount'))).withColumn('status',upper(col('status')))

In [0]:
df_t.display()

In [0]:
# defold :- **error** – Default mode; throws an error if the file already exists

df_t.write.parquet("/Volumes/workspace/default/august_2025/parquet_write/orders")

/Volumes/workspace/default/august_2025/parquet_write/

In [0]:
# overwrite :- overwrite – Replaces (overwrites) the existing file.

df_t.write.mode("overwrite").parquet(
    "/Volumes/workspace/default/august_2025/parquet_write/orders"
)

In [0]:
# create a new file --order2
# ignore : - ignore – Skips the write operation if the file already exists.

df_t.write.mode("ignore").parquet(
    "/Volumes/workspace/default/august_2025/parquet_write/orders2"
)

In [0]:
# read the file

df_read = spark.read.parquet("/Volumes/workspace/default/august_2025/parquet_write/orders")
df_read.display()



In [0]:

# only read one part file...

df_read = spark.read.parquet("/Volumes/workspace/default/august_2025/parquet_write/orders2/part-00001-tid-6949260350592513418-7e1dbecb-d618-43bc-aa56-8fda5c808eae-133-1.c000.snappy.parquet")
df_read.display()

In [0]:
# append :- append – Adds the new data to the existing file.

df_t.write.mode('append').parquet("/Volumes/workspace/default/august_2025/parquet_write/orders")

In [0]:
# append the data again 

df_read = spark.read.parquet("/Volumes/workspace/default/august_2025/parquet_write/orders")
df_read.display()

In [0]:

#  We will use the overwrite mode, so when we write the new data, all existing files in the orders location will be deleted.
# A fresh file will be created containing only the latest data.
# i wnat current data not historical data..

df_t.write.mode("overwrite").parquet("/Volumes/workspace/default/august_2025/parquet_write/orders")


In [0]:
df_read = spark.read.parquet("/Volumes/workspace/default/august_2025/parquet_write/orders")
df_read.display()

Short Summary

By default, Spark writes data in partitioned (part) files — you’ll always see files like
part-00000, part-00001, etc., no matter which format you use (CSV, Parquet, JSON, etc.).

Now, if you write in CSV format, you can specify this:

df.write.mode("overwrite").csv("/parquet_write/orders", header=True)

In [0]:
df_t.write.mode('overwrite').csv("/Volumes/workspace/default/august_2025/csv_files/orders")

In [0]:
read_csv = spark.read.csv("/Volumes/workspace/default/august_2025/csv_files/orders")
read_csv.display()

In [0]:
display(
  df.selectExpr("spark_partition_id() as partition_id")
    .groupBy("partition_id")
    .count()
)