## Create the dataframe

In [2]:
from pyspark.sql import SparkSession

# Because this is a local script, we need to create a Spark session.
# In a Databricks notebook, the Spark session is created automatically. We only need to state %%pyspark at the top of the cell.

spark = SparkSession.builder \
    .appName("ReadCSVExample") \
    .getOrCreate()

df = spark.read.format("csv").option("header", "false").load("orders/2019.csv")
df.show() # df.show() is a standard PySpark method, while display() is a method from Databricks notebooks.

+-------+---+----------+-----------------+--------------------+--------------------+---+--------+--------+
|    _c0|_c1|       _c2|              _c3|                 _c4|                 _c5|_c6|     _c7|     _c8|
+-------+---+----------+-----------------+--------------------+--------------------+---+--------+--------+
|SO43701|  1|2019-07-01|      Christy Zhu|christy12@adventu...|Mountain-100 Silv...|  1| 3399.99|271.9992|
|SO43704|  1|2019-07-01|       Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|  1| 3374.99|269.9992|
|SO43705|  1|2019-07-01|        Curtis Lu|curtis9@adventure...|Mountain-100 Silv...|  1| 3399.99|271.9992|
|SO43700|  1|2019-07-01|     Ruben Prasad|ruben10@adventure...|  Road-650 Black, 62|  1|699.0982| 55.9279|
|SO43703|  1|2019-07-01|   Albert Alvarez|albert7@adventure...|    Road-150 Red, 62|  1| 3578.27|286.2616|
|SO43697|  1|2019-07-01|      Cole Watson|cole1@adventure-w...|    Road-150 Red, 62|  1| 3578.27|286.2616|
|SO43699|  1|2019-07-01|    Sydney Wr

In [3]:
from pyspark.sql.types import *

orderSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType())
])

df = spark.read.format("csv").schema(orderSchema).load("orders/*.csv")

df.show(5)


+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|  CustomerName|               Email|                Item|Quantity|UnitPrice|     Tax|
+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+--------+
|         SO49171|                   1|2021-01-01| Mariah Foster|mariah21@adventur...|  Road-250 Black, 48|       1|2181.5625| 174.525|
|         SO49172|                   1|2021-01-01|  Brian Howard|brian23@adventure...|    Road-250 Red, 44|       1|  2443.35| 195.468|
|         SO49173|                   1|2021-01-01| Linda Alvarez|linda19@adventure...|Mountain-200 Silv...|       1|2071.4197|165.7136|
|         SO49174|                   1|2021-01-01|Gina Hernandez|gina4@adventure-w...|Mountain-200 Silv...|       1|2071.4197|165.7136|
|         SO49178|                   1|2021-01-0

25/07/30 15:24:07 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: orders/*.csv.
java.io.FileNotFoundException: File orders/*.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$

## Filtering and Aggregating

In [4]:
customers = df['CustomerName', 'Email']

print(customers.count())
print(customers.distinct().count())

customers.show(5)

32718
12427
+--------------+--------------------+
|  CustomerName|               Email|
+--------------+--------------------+
| Mariah Foster|mariah21@adventur...|
|  Brian Howard|brian23@adventure...|
| Linda Alvarez|linda19@adventure...|
|Gina Hernandez|gina4@adventure-w...|
|     Beth Ruiz|beth4@adventure-w...|
+--------------+--------------------+
only showing top 5 rows


In [5]:
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')
print(customers.count())
print(customers.distinct().count())

customers.show(5)

133
133
+----------------+--------------------+
|    CustomerName|               Email|
+----------------+--------------------+
|    Margaret Guo|margaret24@advent...|
|         Cara Xu|cara8@adventure-w...|
|  Alejandro Raji|alejandro46@adven...|
|      Jaime Diaz|jaime3@adventure-...|
|Bridget Andersen|bridget15@adventu...|
+----------------+--------------------+
only showing top 5 rows


In [6]:
productSales = df.select("Item", "Quantity").groupBy("Item").sum()

productSales.show(5)

+--------------------+-------------+
|                Item|sum(Quantity)|
+--------------------+-------------+
|Mountain-200 Blac...|          388|
|Touring-1000 Yell...|           74|
|Touring-1000 Blue...|           67|
|Short-Sleeve Clas...|          216|
|Women's Mountain ...|          146|
+--------------------+-------------+
only showing top 5 rows


In [7]:
productSales = df.select("Item", "Quantity").groupBy("Item").sum()
productSales.show(5)

+--------------------+-------------+
|                Item|sum(Quantity)|
+--------------------+-------------+
|Mountain-200 Blac...|          388|
|Touring-1000 Yell...|           74|
|Touring-1000 Blue...|           67|
|Short-Sleeve Clas...|          216|
|Women's Mountain ...|          146|
+--------------------+-------------+
only showing top 5 rows


In [8]:
from pyspark.sql.functions import *

# This creates a new DataFrame with a single column called 'Year', which contains the year extracted from the 'OrderDate' column.

yearlySales = df.select(year(col("OrderDate")).alias("Year")).groupBy("Year").count().orderBy("Year")

yearlySales.show()

+----+-----+
|Year|count|
+----+-----+
|2019| 1201|
|2020| 2733|
|2021|28784|
+----+-----+



## Transform and Save

In [9]:
# Create Year and Month columns
transformed_df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

# Create the new FirstName and LastName fields
transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Filter and reorder columns
transformed_df = transformed_df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "Email", "Item", "Quantity", "UnitPrice", "Tax"]

# Display the first five orders
transformed_df.show(5)

+----------------+--------------------+----------+----+-----+---------+---------+--------------------+--------------------+--------+---------+--------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|Year|Month|FirstName| LastName|               Email|                Item|Quantity|UnitPrice|     Tax|
+----------------+--------------------+----------+----+-----+---------+---------+--------------------+--------------------+--------+---------+--------+
|         SO49171|                   1|2021-01-01|2021|    1|   Mariah|   Foster|mariah21@adventur...|  Road-250 Black, 48|       1|2181.5625| 174.525|
|         SO49172|                   1|2021-01-01|2021|    1|    Brian|   Howard|brian23@adventure...|    Road-250 Red, 44|       1|  2443.35| 195.468|
|         SO49173|                   1|2021-01-01|2021|    1|    Linda|  Alvarez|linda19@adventure...|Mountain-200 Silv...|       1|2071.4197|165.7136|
|         SO49174|                   1|2021-01-01|2021|    1|     Gina|Hernandez|gina4@a

In [10]:
transformed_df.write.mode("overwrite").parquet('Files/transformed_data/orders')

print ("Transformed data saved!")

[Stage 35:>                                                         (0 + 3) / 3]

Transformed data saved!


                                                                                

In [11]:
orders_df = spark.read.format("parquet").load("Files/transformed_data/orders")
orders_df.show(5)

+----------------+--------------------+----------+----+-----+---------+---------+--------------------+--------------------+--------+---------+--------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|Year|Month|FirstName| LastName|               Email|                Item|Quantity|UnitPrice|     Tax|
+----------------+--------------------+----------+----+-----+---------+---------+--------------------+--------------------+--------+---------+--------+
|         SO49171|                   1|2021-01-01|2021|    1|   Mariah|   Foster|mariah21@adventur...|  Road-250 Black, 48|       1|2181.5625| 174.525|
|         SO49172|                   1|2021-01-01|2021|    1|    Brian|   Howard|brian23@adventure...|    Road-250 Red, 44|       1|  2443.35| 195.468|
|         SO49173|                   1|2021-01-01|2021|    1|    Linda|  Alvarez|linda19@adventure...|Mountain-200 Silv...|       1|2071.4197|165.7136|
|         SO49174|                   1|2021-01-01|2021|    1|     Gina|Hernandez|gina4@a

In [12]:
orders_df.write.partitionBy("Year","Month").mode("overwrite").parquet("Files/partitioned_data")

print ("Transformed data saved!")

Transformed data saved!


                                                                                

In [13]:
orders_2021_df = spark.read.format("parquet").load("Files/partitioned_data/Year=2021/Month=*")

orders_2021_df.show(5)

25/07/30 15:24:14 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: Files/partitioned_data/Year=2021/Month=*.
java.io.FileNotFoundException: File Files/partitioned_data/Year=2021/Month=* does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache

+----------------+--------------------+----------+---------+--------+--------------------+--------------------+--------+---------+-------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|FirstName|LastName|               Email|                Item|Quantity|UnitPrice|    Tax|
+----------------+--------------------+----------+---------+--------+--------------------+--------------------+--------+---------+-------+
|         SO59195|                   1|2021-11-01|   Alexia|   Hayes|alexia19@adventur...|Touring-2000 Blue...|       1|  1214.85| 97.188|
|         SO59195|                   2|2021-11-01|   Alexia|   Hayes|alexia19@adventur...|     Racing Socks, M|       1|     8.99| 0.7192|
|         SO59196|                   1|2021-11-01|  Anthony|  Garcia|anthony2@adventur...|Touring-2000 Blue...|       1|  1214.85| 97.188|
|         SO59196|                   2|2021-11-01|  Anthony|  Garcia|anthony2@adventur...|Sport-100 Helmet,...|       1|    34.99| 2.7992|
|         SO59191|         

## SQL