In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as func

In [3]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("SparkSQL")\
        .getOrCreate()

In [5]:
myschema = StructType([
    StructField("userID",IntegerType(),True),
    StructField("name",StringType(),True),
    StructField("age",IntegerType(),True),
    StructField("friends",IntegerType(),True)
])

In [6]:
people = spark.read.csv("fakefriends.csv",schema=myschema)

In [7]:
people.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)



In [8]:
people.show(5)

+------+--------+---+-------+
|userID|    name|age|friends|
+------+--------+---+-------+
|     0|    Will| 33|    385|
|     1|Jean-Luc| 26|      2|
|     2|    Hugh| 55|    221|
|     3|  Deanna| 40|    465|
|     4|   Quark| 68|     21|
+------+--------+---+-------+
only showing top 5 rows



In [9]:
output = people.select(people.userID,people.name,people.age,people.friends)\
               .where(people.age < 30)\
               .withColumn('insert_ts',func.current_timestamp())\
               .orderBy(people.userID).cache()

Caching - 
 - Store the result of the transformation once its called the first time in RAM - memory - saves computation time
 - There's also a more customisable option called persist - where you get to choose whether you wanna store the result in RAM or on disk
 - e.g. output.persist(StorageLevel.MEMORY_AND_DISK) - Telling spark to store this in RAM
 - output.persist(StorageLevel.DISK_ONLY) - Don't waste RAM - write this to Disk
 - output.persist(StorageLevel.MEMORY_ONLY_SER) - Keep it in RAM, but store it as serialized objects => saves space, but costs CPU to decompress
.cache is a shortcut => its literally the same as .persist(StorageLevel.MEMORY_AND_DISK)
=> you can unpersist too..if you're done with a cached/persisted DataFrame => output.unpersist()
=> Even better - to uncache immediately, use output.unpersist(blocking=True)

In [10]:
output.createOrReplaceTempView("peoples")

In [11]:
spark.sql("select userID, name from peoples").show()

+------+--------+
|userID|    name|
+------+--------+
|     1|Jean-Luc|
|     9|    Hugh|
|    16|  Weyoun|
|    21|   Miles|
|    24|  Julian|
|    25|     Ben|
|    26|  Julian|
|    32|     Nog|
|    35| Beverly|
|    46|    Morn|
|    47|   Brunt|
|    48|     Nog|
|    52| Beverly|
|    54|   Brunt|
|    60|  Geordi|
|    66|  Geordi|
|    72|  Kasidy|
|    73|   Brunt|
|    84|     Ben|
|    89|    Worf|
+------+--------+
only showing top 20 rows



writing the result to disk:

Spark Writing:
 - Spark writing is how you get a dataframe out of memory and onto some storage local disk, HDFS, S3, whatever. Its not just dumping data - its a distributed process, so spark gives you a pile of knobs to control how, where, and what gets written

 - output.write => Starts the writing process, for the output dataframe
 significance => entry point - tells spark you're just about to save this thing. Without it, you're just staring at a DataFrame in memory doing nothing. 

 - .format("csv)
   - What: Sets the output format to csv
   - Significance - Defines how the data's structured on disk. Spark supports lots of formats
 - mode("overwrite)
  - Specifies what happens if the destination already exists - here it overwrites it

 - Significance => Controls overwrite vs. append behaviour. Options:
   - overwrite: nukes existing data, writes new stuff
   - append: adds to existing data
   - ignore: skips writing if the path exists
   - error: throws an error if the path exists
 
- .option("path","file.......")
 - 
- .partitionBy("age") - splits the output into subdirectories based on the age column
Organises the data for performance and usability - 
Assumes age is key for later queries or organisation

.save() => triggers the write operation with all the above settings


Write Format Options:
 - CSV => Plain text, comma separated, human readable, works with excel, universal compatibility
 Cons - no compression, slow to read, no schema enforcement
 when: small data, sharing with non-technical people, or legacy systems

 - Parquet
 Columnar storage - optimised for big data
 Pros - fast reads, compression, stores schema, spark/hive friendly
 Cons - Not human-readable, needs tools to open
 When: Big data, performance matters, querying subsets 

 ORC - Optimised row columnar

 - another columnar-format, hive optimised
 - pros - like parquet-compression, fast reads, indexing
 - cons - less spark-native than parquet, similar trade-offs

 JSON - Nested text-based format
 Nested text-based format
 Pros - Flexible, readable, good for semi-structured data
 Cons - Bloated, slow, no columnar benefits
 when: APIs, small datasets, or nested data needs

 Avro:
  Row based, schema-evolving format
  pros - compact, schema evolution, 
  cons - less columnar efficiency, not as spark optimised
  when - data pipelines needing schema changes over time

Why CSV Here - Your example 


In [13]:
output.write\
      .format("parquet")\
      .mode("overwrite")\
      .option("path",r"C:\Users\blais\Documents\data_engineering\week5\apache_spark\write_outputs")\
      .partitionBy("age")\
      .save()

How then can you read this data back to disk?:

In [16]:
dfx = spark.read.parquet(r"C:\Users\blais\Documents\data_engineering\week5\apache_spark\write_outputs").where("age=25")

In [17]:
dfx.show()

+------+-------+-------+--------------------+---+
|userID|   name|friends|           insert_ts|age|
+------+-------+-------+--------------------+---+
|   166|Lwaxana|     10|2025-04-05 11:07:...| 25|
|   464|Beverly|    485|2025-04-05 11:07:...| 25|
|    24| Julian|      1|2025-04-05 11:07:...| 25|
|   238| Deanna|    305|2025-04-05 11:07:...| 25|
|   315| Weyoun|    208|2025-04-05 11:07:...| 25|
|   108|  Leeta|    274|2025-04-05 11:07:...| 25|
|    46|   Morn|     96|2025-04-05 11:07:...| 25|
|    96|   Ezri|    233|2025-04-05 11:07:...| 25|
|   112|   Morn|     13|2025-04-05 11:07:...| 25|
|   242|   Data|    101|2025-04-05 11:07:...| 25|
|   271|   Morn|    446|2025-04-05 11:07:...| 25|
+------+-------+-------+--------------------+---+



Can call the where clause on your read to make it cheaper and faster since the written data is already partitioned.
 - PART 1: WRITING DATA - All the ways you might do it

 Option B: Write as parquet, partitioned by 2 columns - age and friends

In [18]:
output.write\
      .format("parquet")\
      .mode("overwrite")\
      .option("path",r"C:\Users\blais\Documents\data_engineering\week5\apache_spark\write_outputs2")\
      .partitionBy("age","friends")\
      .save()

In [19]:
dfx2 = spark.read.parquet(r"C:\Users\blais\Documents\data_engineering\week5\apache_spark\write_outputs2").where("friends > 400")

In [20]:
dfx2.show()

+------+-------+--------------------+---+-------+
|userID|   name|           insert_ts|age|friends|
+------+-------+--------------------+---+-------+
|   464|Beverly|2025-04-05 11:07:...| 25|    485|
|   399|Beverly|2025-04-05 11:07:...| 24|    401|
|   106|Beverly|2025-04-05 11:07:...| 18|    499|
|   377|Beverly|2025-04-05 11:07:...| 18|    418|
|   200| Kasidy|2025-04-05 11:07:...| 21|    472|
|   265| Gowron|2025-04-05 11:07:...| 27|    471|
|    66| Geordi|2025-04-05 11:07:...| 21|    477|
|   280|  Nerys|2025-04-05 11:07:...| 26|    492|
|   484|  Leeta|2025-04-05 11:07:...| 22|    478|
|   244|  Dukat|2025-04-05 11:07:...| 21|    471|
|   444|  Keiko|2025-04-05 11:07:...| 18|    472|
|   206|   Will|2025-04-05 11:07:...| 21|    491|
|   439|   Data|2025-04-05 11:07:...| 18|    417|
|   271|   Morn|2025-04-05 11:07:...| 25|    446|
|   304|   Will|2025-04-05 11:07:...| 19|    404|
|    89|   Worf|2025-04-05 11:07:...| 24|    492|
|    25|    Ben|2025-04-05 11:07:...| 21|    445|


In [21]:
spark.stop()