In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

#Create a Sparksession
spark = SparkSession.builder.appName("DataFrame-Operations").getOrCreate()


In [3]:
%%bash
head -10 data/stocks.txt

id,name,category,quantity,price
1,iPhone,Electronics,10,899.99
2,MacBook,Electronics,5,1299.99
3,iPad,Electronics,15,499.99
4,Samsung TV,Electronics,8,799.99
5,LG TV,Electronics,10,699.99
6,Nike Shoes,Clothing,30,99.99
7,Adidas Shoes,Clothing,25,89.99
8,Sony Headphones,Electronics,12,149.99
9,Beats Headphones,Electronics,20,199.99

In [4]:
#Load a Synthetic data into a DataFrame
data_file_path = "./data/stocks.txt"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

In [7]:
#Display Schema of DataFrame
df.printSchema()

#Show the initial DataFrame
df.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  1|          iPhone|Electronics|      10| 899.99|
|  2|         MacBook|Electronics|       5|1299.99|
|  3|            iPad|Electronics|      15| 499.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
+---+----------------+-----------+--------+-------+



### Choose Specific Coloumns

In [8]:
#Select Specific Columns
selected_columns = df.select("id", "name", "price")
print("Selected Columns:")
selected_columns.show(7)

Selected Columns:
+---+------------+-------+
| id|        name|  price|
+---+------------+-------+
|  1|      iPhone| 899.99|
|  2|     MacBook|1299.99|
|  3|        iPad| 499.99|
|  4|  Samsung TV| 799.99|
|  5|       LG TV| 699.99|
|  6|  Nike Shoes|  99.99|
|  7|Adidas Shoes|  89.99|
+---+------------+-------+
only showing top 7 rows



### Filter: Apply conditions to filter rows.

In [9]:
# Filter Row Based on a condition
filtered_data = df.filter(df.quantity > 20)
print("Filtered Data: ", filtered_data.count())
filtered_data.show()

Filtered Data:  2
+---+------------+--------+--------+-----+
| id|        name|category|quantity|price|
+---+------------+--------+--------+-----+
|  6|  Nike Shoes|Clothing|      30|99.99|
|  7|Adidas Shoes|Clothing|      25|89.99|
+---+------------+--------+--------+-----+



### GroupBy: Group data based on specific columns

### Aggregations: Perform functions like sum, average, etc., on grouped data.

In [10]:
# GroupBy and Aggregations
grouped_data = df.groupBy("category").agg({"quantity": "sum", "price": "avg"})
print(" Group and Aggregated Data :")
grouped_data.show()

 Group and Aggregated Data :
+-----------+-------------+-----------------+
|   category|sum(quantity)|       avg(price)|
+-----------+-------------+-----------------+
|Electronics|           80|649.9899999999999|
|   Clothing|           55|            94.99|
+-----------+-------------+-----------------+



### Join: Combine multiple DataFrames based on specified columns.

In [11]:
# Join with Another DataFrame
df2 = df.select("id", "category").limit(10)
joined_data = df.join(df2, "id", "inner")
print("Joined Data :")
joined_data.show()

Joined Data :
+---+----------------+-----------+--------+-------+-----------+
| id|            name|   category|quantity|  price|   category|
+---+----------------+-----------+--------+-------+-----------+
|  1|          iPhone|Electronics|      10| 899.99|Electronics|
|  2|         MacBook|Electronics|       5|1299.99|Electronics|
|  3|            iPad|Electronics|      15| 499.99|Electronics|
|  4|      Samsung TV|Electronics|       8| 799.99|Electronics|
|  5|           LG TV|Electronics|      10| 699.99|Electronics|
|  6|      Nike Shoes|   Clothing|      30|  99.99|   Clothing|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|   Clothing|
|  8| Sony Headphones|Electronics|      12| 149.99|Electronics|
|  9|Beats Headphones|Electronics|      20| 199.99|Electronics|
+---+----------------+-----------+--------+-------+-----------+



### Sort: Arrange rows based on one or more columns.

In [12]:
#Sort by a column 
sorted_data = df.orderBy("price")
print("Sorted Data: ")
sorted_data.show()

Sorted Data: 
+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
|  3|            iPad|Electronics|      15| 499.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  1|          iPhone|Electronics|      10| 899.99|
|  2|         MacBook|Electronics|       5|1299.99|
+---+----------------+-----------+--------+-------+



In [13]:
#Sort by a column desc
from pyspark.sql.functions import col, desc
sorted_data = df.orderBy(col("price").desc(), col("id").desc())
print("Sorted Data Descending: ")
sorted_data.show()

Sorted Data Descending: 
+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  2|         MacBook|Electronics|       5|1299.99|
|  1|          iPhone|Electronics|      10| 899.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  3|            iPad|Electronics|      15| 499.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
+---+----------------+-----------+--------+-------+



### Distinct: Get unique rows.

In [14]:
# Get Distinct product category
distinct_rows = df.select("category").distinct()
print("Distinct Product Categories:")
distinct_rows.show()

Distinct Product Categories:
+-----------+
|   category|
+-----------+
|Electronics|
|   Clothing|
+-----------+



### Drop: Remove specified columns.

In [15]:
# Drop Columns
dropped_columns = df.drop("quantity", "category")
print("Dropped Columns :")
dropped_columns.show(7)

Dropped Columns :
+---+------------+-------+
| id|        name|  price|
+---+------------+-------+
|  1|      iPhone| 899.99|
|  2|     MacBook|1299.99|
|  3|        iPad| 499.99|
|  4|  Samsung TV| 799.99|
|  5|       LG TV| 699.99|
|  6|  Nike Shoes|  99.99|
|  7|Adidas Shoes|  89.99|
+---+------------+-------+
only showing top 7 rows



### WithColumn: Add new calculated columns.

In [16]:
# Add a New Calculated Column
df_with_new_column = df.withColumn("revenue", df.quantity * df.price)
print("DataFrame with New Column: ")
df_with_new_column.show()

DataFrame with New Column: 
+---+----------------+-----------+--------+-------+-------+
| id|            name|   category|quantity|  price|revenue|
+---+----------------+-----------+--------+-------+-------+
|  1|          iPhone|Electronics|      10| 899.99| 8999.9|
|  2|         MacBook|Electronics|       5|1299.99|6499.95|
|  3|            iPad|Electronics|      15| 499.99|7499.85|
|  4|      Samsung TV|Electronics|       8| 799.99|6399.92|
|  5|           LG TV|Electronics|      10| 699.99| 6999.9|
|  6|      Nike Shoes|   Clothing|      30|  99.99| 2999.7|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|2249.75|
|  8| Sony Headphones|Electronics|      12| 149.99|1799.88|
|  9|Beats Headphones|Electronics|      20| 199.99| 3999.8|
+---+----------------+-----------+--------+-------+-------+



### Alias: Rename columns for better readability.

In [17]:
# Rename column using alias
df_with_alias = df.withColumnRenamed("price", "product_price")
print("DataFrame with Aliased Column: ")
df_with_alias.show()

DataFrame with Aliased Column: 
+---+----------------+-----------+--------+-------------+
| id|            name|   category|quantity|product_price|
+---+----------------+-----------+--------+-------------+
|  1|          iPhone|Electronics|      10|       899.99|
|  2|         MacBook|Electronics|       5|      1299.99|
|  3|            iPad|Electronics|      15|       499.99|
|  4|      Samsung TV|Electronics|       8|       799.99|
|  5|           LG TV|Electronics|      10|       699.99|
|  6|      Nike Shoes|   Clothing|      30|        99.99|
|  7|    Adidas Shoes|   Clothing|      25|        89.99|
|  8| Sony Headphones|Electronics|      12|       149.99|
|  9|Beats Headphones|Electronics|      20|       199.99|
+---+----------------+-----------+--------+-------------+



In [18]:
#Stop the spark session
spark.stop()