In [1]:
import os
os.environ['SPARK_HOME'] = "/opt/spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = "jupyter"
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = "lab"
os.environ['PYSPARK_PYTHON'] = "python"

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("DataFrame-Operations").getOrCreate()

In [None]:
%%bash
head -n 10 data/stocks.txt

In [6]:
# Load the synthetic data into a DataFrame
data_file_path = "./data/stocks.txt"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

In [None]:
# Display schema of DataFrame
df.printSchema()

# Show the initial DataFrame
print("Initial DataFrame:")
df.show(10)

# **Select: Choose specific columns**

In [None]:
# Select specific columns
selected_columns = df.select("id", "name", "price")
print("Selected Columns:")
selected_columns.show(10)

# **Filter: Apply conditions to filter rows**

In [None]:
# Filter rows based on a condition
filtered_data = df.filter(df.quantity > 20)
print("Filtered Data:", filtered_data.count())
filtered_data.show()

### GroupBy: Group data based on specific columns 
### Aggregations: Perform functions like sum, average, etc., on grouped data.

In [None]:
# GroupBy and Aggregations
grouped_data = df.groupBy("category").agg({"quantity": "sum", "price": "avg"})
print("Grouped and Aggregated Data:")
grouped_data.show()

### Join: Combine multiple DataFrames based on specified columns.

In [None]:
# Join with another DataFrame
df2 = df.select("id", "category").limit(10)
joined_data = df.join(df2, "id", "inner")
print("Joined Data:")
joined_data.show()

### Sort: Arrange rows based on one or more columns.

In [None]:
# Sort by a column
sorted_data = df.orderBy("price")
print("Sorted Data:")
sorted_data.show(10)

In [None]:
# Sort by a column desc
from pyspark.sql.functions import col, desc
sorted_data = df.orderBy(col("price").desc(), col("id").desc())
print("Sorted Data Descending:")
sorted_data.show(10)

### Distinct: Get unique rows.

In [None]:
# Get distinct product category
distinct_rows = df.select("category").distinct()
print("Distinct Product Categories:")
distinct_rows.show()

### Drop: Remove specified columns.

In [None]:
# Drop columns
dropped_columns = df.drop("quantity", "category")
print("Dropped Columns:")
dropped_columns.show(10)

### WithColumn: Add new calculated columns.

In [None]:
# Add a new calculated column
df_with_new_column = df.withColumn("revenue", df.quantity * df.price)
print("DataFrame with New Column:")
df_with_new_column.show(10)

### Alias: Rename columns for better readability.

In [None]:
# Rename columns using alias
df_with_alias = df.withColumnRenamed("price", "product_price")
print("DataFrame with Aliased Column:")
df_with_alias.show(10)

In [18]:
# Stop the SparkSession
spark.stop()