# <u>Imports</u>

In [1]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import Row, Window
import pyspark.sql.types as T
import pyspark.sql.functions as F

# <u>Spark Context</u>

In [2]:
spark_conf = SparkConf()
spark_conf.setAll([
    ("spark.master", "spark://localhost:7077"), # The address of the master node which is set within the docker compose file
    ("spark.submit.deployMode", "client"), # Client mode indicates the local host is the driver program (should be client by default)
    ("spark.driver.bindAddress", "0.0.0.0"), # Binds the driver to all available network interfaces
    ("spark.app.name", "spark-local-cluster"), # The name of the application that will display in the Spark UI
    ("spark.executor.memory", "4g") # Explicitly sets the memory allocated to the executor in the cluster (can't exceed amount allocated in the docker compose file)
])

spark = pyspark.sql.SparkSession.builder.config(conf=spark_conf).getOrCreate()

KeyboardInterrupt: 

# <u>Creating DataFrames</u>

## Defined Schema

In [None]:
schema = T.StructType([
    T.StructField("string_field", T.StringType(), True),
    T.StructField("integer_field", T.IntegerType(), True),
    T.StructField("float_field", T.DoubleType(), True),
    T.StructField("boolean_field", T.BooleanType(), True),
    T.StructField("array_field", T.ArrayType(T.StringType()), True),
    T.StructField("struct_field", T.StructType([
        T.StructField("sub_field", T.StringType(), True)
    ]))
])
df = spark.createDataFrame(
    [
        ["a", 1, 1.1, True, ["b"], {"sub_field": "c"}],
        ["d", 2, 2.1, False, ["e", "f"], {"sub_field": "g"}],
        ["d", 3, 3.1, True, ["h", "i", "j"], {"sub_field": "k"}]
    ],
    schema
)
display(df.toPandas())

In [None]:
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("age", T.IntegerType(), True),
    T.StructField("city", T.StringType(), True)
])
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Bob", "age": 25, "city": "Los Angeles"},
    {"name": "Charlie", "age": 35, "city": "Chicago"}
]
df = spark.createDataFrame(data, schema=schema)
display(df.toPandas())

## Inferred Schema

In [None]:
data = [
    Row(name="Alice", age=25, city="New York"),
    Row(name="Alice", age=25, city="New York"),
    Row(name="Bob", age=30, city="San Francisco"),
    Row(name="Charlie", age=35, city="Los Angeles"),
    Row(name="Charlie", age=50, city="St Louis")
]
df = spark.createDataFrame(data)
display(df.toPandas())

# <u>Querying DataFrames</u>

## Select

In [None]:
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("age", T.IntegerType(), True),
    T.StructField("city", T.StringType(), True)
])
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Bob", "age": 25, "city": "Los Angeles"},
    {"name": "Charlie", "age": 35, "city": "Chicago"}
]
df = spark.createDataFrame(data, schema=schema)
display(df.toPandas())

selected_df = df.select("name", "age")
display(selected_df.toPandas())

## Where

In [None]:
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("age", T.IntegerType(), True),
    T.StructField("city", T.StringType(), True),
    T.StructField("hobbies", T.ArrayType(T.StringType()), True)
])
data = [
    {"name": "Alice", "age": 30, "city": "New York", "hobbies": ["reading", "traveling"]},
    {"name": "Bob", "age": 25, "city": "Los Angeles", "hobbies": ["sports", "music"]},
    {"name": "Charlie", "age": 35, "city": "Chicago", "hobbies": ["cooking", "traveling"]},
    {"name": "David", "age": 22, "city": "Newark", "hobbies": ["reading", "gaming"]}
]
df = spark.createDataFrame(data, schema=schema)
display(df.toPandas())

display(df.where(F.col("city") == "New York").toPandas())
display(df.where(F.col("age") < 30).toPandas())
display(df.where(F.array_contains(F.col("hobbies"), "traveling")).toPandas())
display(df.where(F.col("city").contains("New")).toPandas())

## OrderBy

In [None]:
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("age", T.IntegerType(), True),
    T.StructField("city", T.StringType(), True)
])
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Bob", "age": 25, "city": "Los Angeles"},
    {"name": "Charlie", "age": 35, "city": "Chicago"}
]
df = spark.createDataFrame(data, schema=schema)
display(df.toPandas())

display(df.orderBy(F.col("age").asc()).toPandas())
display(df.orderBy(F.col("age").desc()).toPandas())

## Distinct

In [None]:
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("age", T.IntegerType(), True),
    T.StructField("city", T.StringType(), True)
])
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Bob", "age": 25, "city": "Los Angeles"},
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Charlie", "age": 35, "city": "Chicago"},
    {"name": "Alice", "age": 31, "city": "New York"}
]
df = spark.createDataFrame(data, schema=schema)
display(df.toPandas())

display(df.distinct().toPandas())
display(df.select('name').distinct().toPandas())


## Count

In [None]:
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("age", T.IntegerType(), True),
    T.StructField("city", T.StringType(), True)
])

data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Bob", "age": None, "city": "Los Angeles"},
    {"name": "Charlie", "age": 35, "city": None},
    {"name": None, "age": 22, "city": "Chicago"}
]
df = spark.createDataFrame(data, schema=schema)
display(df.toPandas())

original_count = df.count()
print(f"Original row count: {original_count}")

cleaned_df = df.dropna()
display(cleaned_df.toPandas())
cleaned_count = cleaned_df.count()
print(f"Row count after dropping nulls: {cleaned_count}")


## Limit

In [None]:
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("age", T.IntegerType(), True),
    T.StructField("city", T.StringType(), True)
])
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Bob", "age": 25, "city": "Los Angeles"},
    {"name": "Charlie", "age": 35, "city": "Chicago"}
]
df = spark.createDataFrame(data, schema=schema)
display(df.toPandas())
limited_df = df.limit(2)
display(limited_df.toPandas())

## Retrieve Value

In [None]:
schema = T.StructType([
    T.StructField("name", T.StringType(), True),
    T.StructField("age", T.IntegerType(), True),
    T.StructField("city", T.StringType(), True)
])
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": "Bob", "age": 25, "city": "Los Angeles"},
    {"name": "Charlie", "age": 35, "city": "Chicago"}
]
df = spark.createDataFrame(data, schema=schema)
display(df.toPandas())

# Retrieve specific value from a row based on equality condition
specific_value = df.where(F.col("name") == "Alice").select("age").collect()[0][0]
print(f"Age of Alice: {specific_value}")

# Create a Python list from a where condition
names_in_ny = df.where(F.col("city") == "New York").select("name").rdd.flatMap(lambda x: x).collect()
print(f"Names in New York: {names_in_ny}")

## ToPandas

In [None]:
result = df_1.toPandas()
print(result)

## ToJSON

In [None]:
result = df_1.toJSON().collect()
print(result)

# <u>DataFrame Manipulations</u>

## WithColumn

In [None]:
ex = df_2.withColumn("age_in_5_years", df_2["age"] + 5)
ex.show()

## DropDuplicates

In [None]:
ex = df_2.dropDuplicates()
ex.show()

In [None]:
ex = df_2.dropDuplicates(['name'])
ex.show()

## Explode

In [None]:
ex = df_1.withColumn("array_items", F.explode(df_1["array_field"]))
ex.show()

## GroupBy

In [None]:
ex = df_1.groupBy('string_field').agg(
    F.sum("float_field").alias("total_float"),
    F.avg("integer_field").alias("avg_integer")
)
ex.show()

## Pivot

In [None]:
ex = df_1.groupBy("string_field").pivot("boolean_field").agg(F.sum("float_field"))
ex.show()

## Drop

In [None]:
ex = df_2.drop("age")
ex.show()

# Monotonically Increasing ID

In [None]:
# Not guranteed to be 1 to N
ex = df_2.withColumn("ID", F.monotonically_increasing_id())
ex.show()

# Generating an ID Field

In [None]:
# Guranteed to be 1 to N
window_spec = Window.orderBy(F.lit(1))
ex = df_2.withColumn("ID", F.row_number().over(window_spec))
ex.show()

# <u>DataFrame Operations</u>

# Join

In [None]:
# Sample data
employee_data = [(1, "Alice", 101), (2, "Bob", 102), (3, "Catherine", 101), (4, "Daniel", 103)]
department_data = [(101, "HR"), (102, "IT"), (103, "Finance")]

# Create DataFrames
df_employee = spark.createDataFrame(employee_data, ["emp_id", "name", "dept_id"])
df_department = spark.createDataFrame(department_data, ["dept_id", "dept_name"])

# Perform the join
joined_df = df_employee.join(df_department, on="dept_id", how="inner")

# Show the result
joined_df.show()

## Union

In [None]:
# Sample data for January
january_data = [(1, "Alice", "January"), (2, "Bob", "January")]
# Sample data for February
february_data = [(3, "Charlie", "February"), (4, "David", "February")]

# Create DataFrames
df_january = spark.createDataFrame(january_data, ["emp_id", "name", "month"])
df_february = spark.createDataFrame(february_data, ["emp_id", "name", "month"])

# Perform the union
union_df = df_january.union(df_february)

# Show the result
union_df.show()