# Spark Reading and Writing

In [None]:
from pyspark.sql import SparkSession, functions as f, types as t

In [2]:
spark = SparkSession.builder.appName('notebook1').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/20 19:17:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sales_df = spark.read.schema(
    "client string, purchase_date date, product string, price integer"
).csv('./data/sales.csv', header=True, inferSchema=True)

In [6]:
sales_df.printSchema()

root
 |-- client: string (nullable = true)
 |-- purchase_date: date (nullable = true)
 |-- product: string (nullable = true)
 |-- price: integer (nullable = true)



In [16]:
sales_df.select('client', 'purchase_date').show(3)

+------------+-------------+
|      client|purchase_date|
+------------+-------------+
|Norma Fisher|   2022-08-09|
|Norma Fisher|   2022-08-13|
|Norma Fisher|   2022-08-12|
+------------+-------------+
only showing top 3 rows



In [8]:
sales_df.write.csv('./data/write/sales_csv', header=True)

In [9]:
spark.read.csv('./data/write/sales_csv', header=True, inferSchema=True).show(3)

+------------+-------------------+--------------+-----+
|      client|      purchase_date|       product|price|
+------------+-------------------+--------------+-----+
|Norma Fisher|2022-08-09 00:00:00|Vacuum cleaner|  121|
|Norma Fisher|2022-08-13 00:00:00|Microwave oven|  348|
|Norma Fisher|2022-08-12 00:00:00|         Phone| 1126|
+------------+-------------------+--------------+-----+
only showing top 3 rows



In [10]:
sales_df.write.parquet('./data/write/sales_pq')

In [11]:
sales_df_pq = spark.read.parquet('./data/write/sales_pq')

In [12]:
sales_df_pq.printSchema()

root
 |-- client: string (nullable = true)
 |-- purchase_date: date (nullable = true)
 |-- product: string (nullable = true)
 |-- price: integer (nullable = true)



In [13]:
sales_df_pq.show(3)

+------------+-------------+--------------+-----+
|      client|purchase_date|       product|price|
+------------+-------------+--------------+-----+
|Norma Fisher|   2022-08-09|Vacuum cleaner|  121|
|Norma Fisher|   2022-08-13|Microwave oven|  348|
|Norma Fisher|   2022-08-12|         Phone| 1126|
+------------+-------------+--------------+-----+
only showing top 3 rows



In [14]:
sales_df.write.parquet('./data/write/sales_pq_partitioned', partitionBy='purchase_date')

                                                                                

In [18]:
spark.read.parquet('./data/write/sales_pq_partitioned').select('client', 'purchase_date').show(3)

+--------------+-------------+
|        client|purchase_date|
+--------------+-------------+
|    Susan Levy|   2022-08-08|
|Kimberly Smith|   2022-08-08|
|Laurie Wallace|   2022-08-08|
+--------------+-------------+
only showing top 3 rows



In [21]:
sales_selected_date = spark.read.parquet('./data/write/sales_pq_partitioned/purchase_date={2022-08-07, 2022-08-08}')

In [22]:
sales_selected_date.show(3)

+--------------+-------+-----+
|        client|product|price|
+--------------+-------+-----+
|Cheryl Bradley|  Phone| 1256|
|  Bobby Flores|  Phone|  526|
|Travis Schultz|  Phone|  156|
+--------------+-------+-----+
only showing top 3 rows



In [23]:
sales_df.write.parquet('./data/write/sales_pq_partitioned2', partitionBy=['purchase_date', 'product'])

                                                                                

In [24]:
spark.read.parquet('./data/write/sales_pq_partitioned2/purchase_date=2022-08-07/product={Phone, Laptop}').show(3)

+--------------+-----+
|        client|price|
+--------------+-----+
|Cheryl Bradley| 1256|
|  Bobby Flores|  526|
|Travis Schultz|  156|
+--------------+-----+
only showing top 3 rows

