<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Imports:" data-toc-modified-id="Imports:-1">Imports:</a></span></li><li><span><a href="#Creating-the-Spark-Entry-point-(SparkSession-/-SparkContext)" data-toc-modified-id="Creating-the-Spark-Entry-point-(SparkSession-/-SparkContext)-2">Creating the Spark Entry point (<code>SparkSession</code> / <code>SparkContext</code>)</a></span></li><li><span><a href="#Data-Preparation" data-toc-modified-id="Data-Preparation-3">Data Preparation</a></span></li><li><span><a href="#Filtering" data-toc-modified-id="Filtering-4">Filtering</a></span></li><li><span><a href="#GROUP_BY" data-toc-modified-id="GROUP_BY-5">GROUP_BY</a></span></li><li><span><a href="#How-to-Handle-Missing-Value" data-toc-modified-id="How-to-Handle-Missing-Value-6">How to Handle Missing Value</a></span></li><li><span><a href="#Demo-with-pyspark.sql.functions" data-toc-modified-id="Demo-with-pyspark.sql.functions-7">Demo with <code>pyspark.sql.functions</code></a></span></li><li><span><a href="#Sampling-Data" data-toc-modified-id="Sampling-Data-8">Sampling Data</a></span><ul class="toc-item"><li><span><a href="#Experiment-1" data-toc-modified-id="Experiment-1-8.1">Experiment 1</a></span></li><li><span><a href="#Experiment-2" data-toc-modified-id="Experiment-2-8.2">Experiment 2</a></span></li></ul></li><li><span><a href="#Partitioning" data-toc-modified-id="Partitioning-9">Partitioning</a></span></li><li><span><a href="#I/O" data-toc-modified-id="I/O-10">I/O</a></span><ul class="toc-item"><li><span><a href="#Convert-from-*.parqet-to-*.csv" data-toc-modified-id="Convert-from-*.parqet-to-*.csv-10.1">Convert from <code>*.parqet</code> to <code>*.csv</code></a></span></li><li><span><a href="#Convert-from-*.csv-to-*.parqet" data-toc-modified-id="Convert-from-*.csv-to-*.parqet-10.2">Convert from <code>*.csv</code> to <code>*.parqet</code></a></span></li></ul></li></ul></div>

#### Imports:

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.context import SparkContext

from pyspark.sql import functions as F
from pyspark.sql.functions import (
    collect_list,
    udf, col, when, asc, desc, lit, coalesce,
    mean, sum, avg, rand, stddev,
    count, countDistinct,
    format_number, isnan,
    asc, desc, mean, 
    rank, lag, lead,
    upper, concat, lower, substring
)
from pyspark.sql.window import Window

from pyspark.sql.types import (
    StructField, StructType, LongType, TimestampType,
    StringType, IntegerType, 
    FloatType, BooleanType,
    DateType,
)

In [None]:
import pyspark, datetime, os
import numpy as np, pandas as pd, matplotlib.pyplot as plt
from sparking import *

%matplotlib inline

In [None]:
# pyspark.__version__

#### Documents:

- [Spark 3.1.0](https://spark.apache.org/docs/3.0.1/index.html)
- [PySpark Overview](https://spark.apache.org/docs/latest/api/python/index.html#)
- [PySpark API Reference](https://spark.apache.org/docs/latest/api/python/reference/index.html#api-reference)
- [PySpark Session Object](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)
- [PySpark Core](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html)
- [Transformations and Actions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.streaming.html#transformations-and-actions)
- [Broadcast and Accumulator](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#broadcast-and-accumulator)

- One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. When running SQL from within another programming language the results will be returned as a `Dataset`/`DataFrame`.
- The `Dataset` API is available in Scala and Java. Python does not have the support for the Dataset API.
- The `DataFrame` API is available in Scala, Java, Python, and R
   

-   **Ntive Data Types in PySpark**:

    -   **BooleanType**: Represents a boolean value.
    -   **ByteType**: Represents a byte value.
    -   **ShortType**: Represents a short integer value.
    -   **IntegerType**: Represents an integer value.
    -   **LongType**: Represents a long integer value.
    -   **FloatType**: Represents a float value.
    -   **DoubleType**: Represents a double value.
    -   **DecimalType**: Represents a decimal value.
    -   **StringType**: Represents a string value.
    -   **BinaryType**: Represents a binary (byte array) value.
    -   **DateType**: Represents a date value.
    -   **TimestampType**: Represents a timestamp value.
    -   **ArrayType**: Represents an array of values.
    -   **MapType**: Represents a map of key-value pairs.
    -   **StructType**: Represents a struct (complex type) with fields.

#### Creating the Spark Entry point (`SparkSession` / `SparkContext`)

In [None]:
my_conf = SparkConf().setAppName("My_Spark_App") \
                  .set("spark.sql.shuffle.partitions", "2") \
                  .set("spark.driver.memory", "4g") \
                  .set("spark.executor.memory", "2g") \
                  .set("spark.sql.autoBroadcastJoinThreshold", 10_000_000) \
                  .set("spar.sql.adaptive.enabled", False)


spark = SparkSession.builder \
        .appName("RenamedSparkApp") \
        .config(conf=my_conf) \
        .getOrCreate()

sc = spark.sparkContext
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Enable eager evaluation for better formatting of the output
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
# Disable Broadcast Join
spark.conf.set("spar.sql.autoBroadcastJoinThreshold", -1)

In [None]:
# spark.sparkContext.getConf().getAll()
# spark.conf.get("spark.sql.parquet.filterPushDown")
# spark.conf.get("spark.sql.sources.bucketing.enabled")
# spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
# spark.conf.get("spark.sql.warehouse.dir")

#### Data Preparation

```python
Option 1:

column_names = ["column1", "column2", "column3"]

# Read the CSV file with header
df = spark.read.csv("path/to/csvfile.csv", header=True, inferSchema=True).toDF(*new_column_names)

Option 2:

df = spark.read.csv("path/to/csvfile.csv", header=True, inferSchema=True).

# Rename columns
for idx, new_name in enumerate(new_column_names):
    df = df.withColumnRenamed(f"_c{idx}", new_name)

Option 3:

# Define the column names
column_names = "column1,column2,column3"

# Read the CSV file with specified column names
df = spark.read.option("header", "false") \
               .option("inferSchema", "true") \
               .option("delimiter", ",") \
               .option("quote", "\"") \
               .option("escape", "\"") \
               .schema(column_names) \
               .csv("path/to/csvfile.csv")
```

- Method to rename the columns
```python
columns = sqldf.columns
for old_col, new_col in zip(columns, column_names):
    sqldf = sqldf.withColumnRenamed(old_col, new_col)
```

In [None]:
DATA_DIR = os.environ['DATA'] + '/IBM_Data_Analysis'

In [None]:
# ! head -3 {DATA_DIR}/imports-85.csv

In [None]:
column_names = ['symboling', 'normalized-losses', 'make', 'fuel-type', 'aspiration',
       'num-of-doors', 'body-style', 'drive-wheels', 'engine-location',
       'wheel-base', 'length', 'width', 'height', 'curb-weight', 'engine-type',
       'num-of-cylinders', 'engine-size', 'fuel-system', 'bore', 'stroke',
       'compression-ratio', 'horsepower', 'peak-rpm', 'city-mpg',
       'highway-mpg', 'price']

In [None]:
sqldf = spark.read.csv(DATA_DIR + "/imports-85.csv/", header=False).toDF(*column_names)

In [None]:
sqldf.show(2)

In [None]:
# pddf = sqldf.toPandas(); pddf

In [None]:
# # creates a temporary view against which we can run SQL queries.
# df = sqldf.createOrReplaceTempView('auto')
# spark.sql("SELECT * FROM auto LIMIT 2").show()

In [None]:
# sqldf.printSchema()
# sqldf.describe()
# sqldf.take(2)

In [None]:
# df = spark.range(1, 10000, 1, 10).select(col("id"), rand(10).alias("Attribute"))
# df.show(5)

##### MySQL DB Tables

In [None]:
EMP_PATH = "/Users/am/mydocs/Software_Development/Databases/RDBMS/sql/schemas/employees.csv"
DEPT_PATH = "/Users/am/mydocs/Software_Development/Databases/RDBMS/sql/schemas/department.csv"

emp_schema = StructType(fields=[
    StructField('EMPLOYEE_ID', StringType(), True),
    StructField('FIRST_NAME',  StringType(), True),
    StructField('LAST_NAME', StringType(), True),
    StructField('AGE', FloatType(), True),
    StructField('SALARY', FloatType(), True),
    StructField('JOINING_DATE', TimestampType(), True),
    StructField('DEPARTMENT_ID', LongType(), True),
    StructField('MANAGER_ID', LongType(), True)
])

dept_schema = StructType(fields=[
    StructField('DEPARTMENT_ID', LongType(), True),
    StructField('DEPARTMENT',  StringType(), True)
])

emp = spark.read.csv(EMP_PATH, header=False, schema=emp_schema)
dept = spark.read.csv(DEPT_PATH, header=False, schema=dept_schema)

In [None]:
# emp.show()
# dept.show()

In [None]:
emp_df = employees_df(50)
bns_df = bonus_df(50)
emp_df.show()

## [DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html)

- **Attributes of DataFrame**:

    1. **`columns`**: A list of column names in the DataFrame.
    2. **`dtypes`**: A list of tuples with column names and their data types.
    3. **`schema`**: Provides the schema of the DataFrame (columns, types, etc.).
    4. **`rdd`**: Access to the underlying RDD of the DataFrame.
    5. **`isStreaming`**: Indicates whether the DataFrame is from a streaming source (`True` or `False`).
    6. **`na`**: Provides access to functions for handling missing data (NaN values).
    7. **`stat`**: Provides access to statistical functions.

In [None]:
spark = SparkSession.builder.appName("DataFrame_Methods_Analysis").getOrCreate()

##### How to create DataFrame from 'list' or 'dict'

In [None]:
import pandas as pd
from faker import Faker
import random

fake = Faker()

# Function to randomly introduce None or empty values
def randomize_value(value, null_prob=0.1, empty_prob=0.1):
    if random.random() < null_prob:
        return None  # Insert None (null in PySpark)
    elif random.random() < empty_prob:
        return ''    # Insert empty string
    else:
        return value

# # Generate a dictionary with 100 rows, with some fields containing null or empty values
# data_dict = {
#     'name': [randomize_value(fake.name()) for _ in range(100)],
#     'email': [randomize_value(fake.email(), null_prob=0.05, empty_prob=0.05) for _ in range(100)],
#     'address': [randomize_value(fake.address(), null_prob=0.1) for _ in range(100)],
#     'job': [randomize_value(fake.job(), null_prob=0.1, empty_prob=0.05) for _ in range(100)],
#     'phone_number': [randomize_value(fake.phone_number(), null_prob=0.05) for _ in range(100)],
#     'salary': [randomize_value(fake.random_int(min=40000, max=150000), null_prob=0.05) for _ in range(100)],
# }

# Generate a DataFrame with 100 rows of fake data
data_dict = {
    'name': [randomize_value(fake.name()) for _ in range(100)],
    'email': [fake.email() for _ in range(100)],
    'address': [fake.address() for _ in range(100)],
    'job': [fake.job() for _ in range(100)],
    'phone_number': [fake.phone_number() for _ in range(100)],
    'salary': [randomize_value(fake.random_int(min=40000, max=150000), null_prob=0.05) for _ in range(100)],
}

# df = pd.DataFrame(data)
# print(df.head())

# Convert dictionary to list of tuples (rows)
data_tuples = list(zip(data_dict['name'], data_dict['email'], data_dict['address'], data_dict['job'], data_dict['phone_number'], data_dict['salary']))

data_tuples

# # Define the schema (optional)
# schema = StructType([
#     StructField("name", StringType(), True),
#     StructField("email", StringType(), True),
#     StructField("address", StringType(), True),
#     StructField("job", StringType(), True),
#     StructField("phone_number", StringType(), True),
#     StructField("salary", IntegerType(), True)
# ])

# df = spark.createDataFrame(data_tuples, schema=schema)
df = spark.createDataFrame(data_tuples)
df.show()

In [None]:
# dir(fake)

#### Filtering

In [None]:
# Sample data
data = [
    ("Alice", 34, "2023-01-01", "HR", 50000),
    ("Bob", 45, "2022-12-15", "Finance", 50000),
    ("Catherine", 29, "2023-03-05", "HR", 60000),
    ("David", 50, "2021-07-30", "Finance", 70000),
    ("Eva", 40, "2023-05-22", "IT", 110000)
]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "age", "join_date", "department", "salary"])

# Show the original DataFrame
print("Original DataFrame:")
df.show()

In [None]:
# Filtering with filter
df = emp_df.filter((emp_df.salary < 40000) | (col('salary') > 80000)).show()

In [None]:
# 'where' is an alias for filter
emp_df.where((emp_df.department == "IT") | col('department').isin("HR", "Finance") & emp_df.salary.between(34000, 95000)).show() 

In [None]:
# Filtering with rlike (regular expression)
emp_df.filter(emp_df.first_name.rlike("^[DA].*")).show()

In [None]:
emp_df.show()

In [None]:
emp_salary_group = emp_df.withColumn(
    'SalaryGroup',
    when(emp_df['Salary'] < 40000, 'Under Paid')
    .when((col('salary') >= 40000) & (emp_df.salary < 65000), 'Medium Paid')
    .when((emp_df['Salary'] >= 65000) & (emp_df['Salary'] < 75000), 'Well Paid')
    .otherwise('Highly Paid')
)

emp_salary_group.show()

In [None]:
# Register the DataFrame as a SQL temporary view
emp_df.createOrReplaceTempView("employees")

# SQL 
query = """
SELECT
    Salary, employee_id, 
    CASE
        WHEN Salary < 40000 THEN 'Under Paid'
        WHEN (Salary >= 40000) AND (Salary < 60000) THEN 'Medium Paid'
        WHEN Salary BETWEEN 60000 AND 75000 THEN 'Wel Paid'
        ELSE 'Highly Paid'
    END AS SalaryGroup,
    email
FROM
    employees
"""

salary_group_df = spark.sql(query)
salary_group_df.show()

In [None]:
# Register the DataFrame as a SQL temporary view
emp_df.createOrReplaceTempView("emp_df_view")

# Filtering with SQL query
filtered_df = spark.sql("SELECT * FROM emp_df_view WHERE salary > 30000")


In [None]:
# age_group_df.show(n=5, truncate=False)

In [None]:
# Sample data with null values
data_with_nulls = [
    ("Alice", None, "2023-01-01", "HR"),
    ("Bob", 45, "2022-12-15", "Finance"),
    ("Catherine", 29, "2023-03-05", "HR"),
    ("David", 50, "2021-07-30", None),
    ("Eva", 40, "2023-05-22", "IT")
]

# Create DataFrame
df_nulls = spark.createDataFrame(data_with_nulls, ["name", "age", "join_date", "department"])

In [None]:
# Filtering with isNull
df_nulls.filter(df_nulls.age.isNull()).show()

In [None]:
# Filtering with isNotNull
df_nulls.filter(df_nulls.department.isNotNull()).show()

#### GROUP_BY

In [None]:
df.groupBy('department').agg(avg('salary')).show()

In [None]:
# Group by multiple conditions
df.groupBy('department', 'salary').count().sort(asc('count')).show()

In [None]:
# Get average values
df.groupby('department').mean('salary').show()

In [None]:
# Combination with filter() function
df.groupby('department').mean('salary').filter(col('avg(salary)') >= 100000).show()

In [None]:
# mean() function calls agg(avg()) function. After agg() function, you can use alias() function for renaming.
df.groupBy('department').agg(avg('salary') \
        .alias('mean_salary')).filter(col('mean_salary') >= 1000) \
        .sort(desc('mean_salary')).show()

In [None]:
# mean() function calls agg(avg()) function. After agg() function, you can use alias() function for renaming.
df.groupBy('department').agg(
    avg('salary').alias('mean_salary'),
    stddev('salary').alias('stddev_salary')
).show()   

In [None]:
# # Pivot Table 
# # You can get aggregation group members as a header by pivottable.
# df.groupBy('HomePlanet').agg(count('VIP').alias('VIPCount')).show()
# df.groupby('HomePlanet').pivot('VIP').count().show()
# df.groupby('HomePlanet').pivot('VIP').max('TotalBill').show()

In [None]:
# SQL
query = """
SELECT
    HomePlanet
    , COUNT(*) AS Count
    , COUNT(DISTINCT PassengerId) as TotalIDs
    , AVG(TotalBill) AS mean_total_bill
    , MAX(TotalBill) AS max_total_bill
    , SUM(TotalBill) AS sum_of_total_bill
FROM
    train_df
GROUP BY
    HomePlanet
HAVING
    mean_total_bill > 1000
ORDER BY
    mean_total_bill DESC
"""

# spark.sql(query).show(n=5, truncate=False)

#### Partitioning

```python
df_repartitioned = df.repartition(3)

df_coalesced = df.repartition(4).coalesce(2)

df_transactions.repartition(5).write.mode("overwrite").option("header", "true").csv("/TNX_test.csv")

df_transactions.coalesce(1).write.mode('overwrite').option("header", "true").csv("TNX_test.csv")

df.repartition(3).write.mode("overwrite").partitionBy("listen_date").parquet("path/to/write")

## Write Partitioned Data
df.write.partitionBy("column").parquet("path/to/output")
```

#### Join

```python
# Regular join
joined_df = df1.join(df2, 'key')

# Sort merge join
joined_df = df1.join(df2, 'key').sort('key')

# Broadcast join
from pyspark.sql.functions import broadcast
joined_df = df1.join(broadcast(df2), 'key')

df1.join(df2.hint("broadcast"), "join_key")
df1.join(df2.hint("shuffle_hash"), "join_key")
df1.join(df2.hint("merge"), "join_key")
```

In [None]:
# Inner Join
df = emp.join(dept, emp.DEPARTMENT_ID == dept.DEPARTMENT_ID, "inner").select("*")
pddf = df.toPandas(); pddf.head(5)
# df = df.select("EMPLOYEE_ID","FIRST_NAME","LAST_NAME","AGE","DEPARTMENT","MANAGER_ID").show()

In [None]:
# various type of Join: left, right, outer, leftsemi, leftanti
left_join_df = emp.join(dept, emp.DEPARTMENT_ID == dept.DEPARTMENT_ID, "left")
left_join_df.show()

In [None]:
# # Match on multiple columns
# df = df.join(other_table, ['first_name', 'last_name'], 'left')

In [None]:
# Cross Join
cross_join_df = emp.crossJoin(dept)
cross_join_df.show()

In [None]:
# # Left join in another dataset
# df = df.join(person_lookup_table, 'person_id', 'left')

## How to Handle Missing Value

- **`na.fill()`**: Replaces null values with a specified value.
- **`na.drop()`**: Drops rows with null values.
- **`na.replace()`**: Replaces specified values in a DataFrame, including nulls.
- **`fillna()`**: Alias for `na.fill()`.
- **`dropna()`**: Alias for `na.drop()`.

1. **`drop`**: Removes rows with null values.

    ```python
    df.na.drop()  # Drops rows containing any null values
    df.na.drop(how="all")  # Drops rows where all columns are null
    df.na.drop(subset=["column1", "column2"])  # Drops rows with nulls only in specified columns
    ```

2. **`fill`**: Fills null values with a specified value.

    ```python
    df.na.fill("default_value")  # Fills all nulls with "default_value" for all columns
    df.na.fill({"column1": 0, "column2": "N/A"})  # Fills nulls with specified values per column
    ```

3. **`replace`**: Replaces values (including nulls) with specified replacements.

    ```python
    df.na.replace(["old_value1", "old_value2"], ["new_value1", "new_value2"])
    ```

4. **`replace` for specific null handling**: You can also use this function to replace `null` with other values across specific columns.

    ```python
    df.na.replace({None: "replacement_value"})  # Replaces `null` values with "replacement_value"
    ```

In [None]:
# Initialize SparkSession
spark = SparkSession.builder.appName("HandleMissingData").getOrCreate()

In [None]:
df.na.fill({'value': 0}).show()

# Replace all nulls with a specific value
df = df.fillna({'first_name': 'Tom','age': 0,})

# Replace empty strings with null (leave out subset keyword arg to replace in all columns)
df = df.replace({"": None}, subset=["name"])

# Convert Python/PySpark/NumPy NaN operator to null
df = df.replace(float("nan"), None)

In [None]:
# Drop duplicate rows in a dataset (distinct)
df = df.dropDuplicates() # or
df = df.distinct()

# Drop duplicate rows, but consider only specific columns
df = df.dropDuplicates(['name', 'height'])

# Drop rows with any null values
df_dropped_any = df.na.drop()
df_dropped_any.show()

df_dropped_any2 = df.dropna('any')
df_dropped_any2.show()

# Drop rows with all null values
df_dropped_all = df.na.drop(how='all')
df_dropped_all.show()

df_dropped_all2 = df.dropna('all')

# Drop rows with null values in specific columns
df_dropped_subset = df.na.drop(subset=['name', 'salary'])
df_dropped_subset.show()

#---

df.drop('column_name') # drop the entire column

In [None]:
df = df.withColumn('bns_date', F.to_date(F.col('bonus_date'), 'MM-dd-yyyy'))
df = df.withColumnRenamed('bns_date', 'bonus_date')

# Take the first value that is not null
df = df.withColumn('last_name', coalesce(df.last_name, df.surname, lit('N/A')))


In [None]:
# float("nan")

In [None]:
# Sample data with missing values
data = [
    (1, "John", None),
    (2, None, 5000),
    (2, "", 5000),
    (3, "Sara", 4500),
    (None, "David", None),
    (5, "Mike", 5500)
]

# Define schema
schema = ["id", "name", "salary"]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show the DataFrame
df.show()

In [None]:
# Using SQL to handle missing data
df.createOrReplaceTempView("people")

filled_df_sql = spark.sql("""
SELECT id,
       COALESCE(name, 'Unknown') as name,
       COALESCE(salary, 0) as salary
FROM people
""")
filled_df_sql.show()

# Stop the SparkSession
spark.stop()

In [None]:
# coalesce and na.fill
df_nulls.withColumn("name_filled", coalesce("name", lit("Unknown"))) \
    .withColumn("salary_filled", coalesce("salary", lit(0))) \
    .na.fill({"name": "Unknown", "salary": 0}) \
    .show()

In [None]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

##### Process 'Bonus' data

In [None]:
# SQL statement with additional constraints
create_bns_sql = """CREATE TABLE Bonus IF NOT EXISTS (
	EMPLOYEE_REF_ID INT,
	BONUS_AMOUNT INT(10),
	BONUS_DATE DATETIME,
	FOREIGN KEY (EMPLOYEE_REF_ID) REFERENCES Employee(EMPLOYEE_ID) ON DELETE CASCADE
);
"""

In [None]:
bns_df = bonus_df(num_row=30)
bns_df.show()

In [None]:
bns_df = bns_df.withColumn('bns_date', F.to_date(F.col('bonus_date'), 'MM-dd-yyyy'))
bns_df = bns_df.na.drop(subset=['bonus_amount', 'bns_date'])
bns_df = bns_df.drop('bonus_date')
bns_df = bns_df.withColumnRenamed('bns_date', 'bonus_date')

bns_df.show()

In [None]:
## Write OPTION 2
# To append into an existing table, data must be validated in accordance with table constraints.
bns_df.write \
    .format("jdbc") \
    .options(**{}) \
    .mode("append") \
    .save()

## Demo with `pyspark.sql.functions`

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, when, concat, concat_ws, substring, length, 
    upper, lower, round, bround, date_format, current_date, year, month, 
    dayofmonth, collect_list, collect_set
)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("PySpark SQL Functions Examples") \
    .getOrCreate()

# Sample data
data = [
    ("Alice", 34, "New York"),
    ("Bob", 45, "San Francisco"),
    ("Catherine", 29, "Chicago"),
    ("David", 50, "New York"),
    ("Eva", 40, "San Francisco")
]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "age", "city"])

# Show the original DataFrame
print("Original DataFrame:")
df.show()

# Demonstrating various functions
df = df.withColumn("country", lit("USA"))
df = df.withColumn("age_group", when(col("age") < 40, "Young").otherwise("Old"))
df = df.withColumn("name_city", concat_ws(" - ", col("name"), col("city")))
df = df.withColumn("name_substr", substring(col("name"), 1, 3))
df = df.withColumn("name_length", length(col("name")))
df = df.withColumn("name_upper", upper(col("name"))).withColumn("name_lower", lower(col("name")))

# Adding a current date column for demonstration
df = df.withColumn("current_date", current_date())
df = df.withColumn("formatted_date", date_format(col("current_date"), "MM/dd/yyyy"))
df = df.withColumn("year", year(col("current_date"))).withColumn("month", month(col("current_date"))).withColumn("day", dayofmonth(col("current_date")))

# Show the transformed DataFrame
print("Transformed DataFrame:")
df.show()

# Creating a DataFrame with a float column for demonstration
data = [("Alice", 34.567), ("Bob", 45.123), ("Catherine", 29.987), ("David", 50.456), ("Eva", 40.789)]
df = spark.createDataFrame(data, ["name", "score"])

df = df.withColumn("score_rounded", round(col("score"), 1)).withColumn("score_brounded", bround(col("score"), 1))

# Show the DataFrame with rounded scores
print("DataFrame with Rounded Scores:")
df.show()

# Group by city and aggregate names
df = spark.createDataFrame([
    ("Alice", 34, "New York"),
    ("Bob", 45, "San Francisco"),
    ("Catherine", 29, "Chicago"),
    ("David", 50, "New York"),
    ("Eva", 40, "San Francisco")
], ["name", "age", "city"])

df = df.groupBy("city").agg(collect_list("name").alias("names_list"), collect_set("name").alias("names_set"))

# Show the aggregated DataFrame
print("Aggregated DataFrame:")
df.show(truncate=False)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, date_add, date_sub, datediff, months_between, current_date, 
    year, month, dayofmonth, dayofweek, dayofyear, weekofyear, hour, minute, 
    second, current_timestamp, unix_timestamp, coalesce, split, array_contains
)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("PySpark SQL Functions Additional Examples") \
    .getOrCreate()

# Sample data
data = [
    ("Alice", 34, "2023-01-01"),
    ("Bob", 45, "2022-12-15"),
    ("Catherine", 29, "2023-03-05"),
    ("David", 50, "2021-07-30"),
    ("Eva", 40, "2023-05-22")
]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "age", "join_date"])

# Show the original DataFrame
print("Original DataFrame:")
df.show()

# date_add and date_sub
df.withColumn("date_plus_10", date_add("join_date", 10)) \
  .withColumn("date_minus_10", date_sub("join_date", 10)) \
  .show()

# datediff and months_between
df.withColumn("days_since_join", datediff(current_date(), "join_date")) \
  .withColumn("months_since_join", months_between(current_date(), "join_date")) \
  .show()

# year, month, dayofmonth, dayofweek, dayofyear, weekofyear
df.withColumn("year", year("join_date")) \
  .withColumn("month", month("join_date")) \
  .withColumn("day", dayofmonth("join_date")) \
  .withColumn("day_of_week", dayofweek("join_date")) \
  .withColumn("day_of_year", dayofyear("join_date")) \
  .withColumn("week_of_year", weekofyear("join_date")) \
  .show()

# Sample data with timestamp
data_with_time = [
    ("Alice", 34, "2023-01-01 12:34:56"),
    ("Bob", 45, "2022-12-15 14:20:30"),
    ("Catherine", 29, "2023-03-05 08:45:15"),
    ("David", 50, "2021-07-30 19:50:40"),
    ("Eva", 40, "2023-05-22 06:25:10")
]

# Create DataFrame
df_time = spark.createDataFrame(data_with_time, ["name", "age", "join_time"])

# hour, minute, second
df_time.withColumn("hour", hour("join_time")) \
      .withColumn("minute", minute("join_time")) \
      .withColumn("second", second("join_time")) \
      .show()

# current_timestamp and unix_timestamp
df.withColumn("current_ts", current_timestamp()) \
  .withColumn("unix_ts", unix_timestamp("join_date")) \
  .show()

# Sample data with null values
data_with_nulls = [
    ("Alice", None),
    (None, 45),
    ("Catherine", 29),
    ("David", None),
    ("Eva", 40)
]

# Create DataFrame
df_nulls = spark.createDataFrame(data_with_nulls, ["name", "age"])

# coalesce and na.fill
df_nulls.withColumn("name_filled", coalesce("name", lit("Unknown"))) \
        .withColumn("age_filled", coalesce("age", lit(0))) \
        .na.fill({"name": "Unknown", "age": 0}) \
        .show()


In [None]:
# Construct a new dynamic column
df = df.withColumn('full_name', F.when(
    (df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname)
).otherwise(F.lit('N/A'))

- Demo how to use **collect_list()** and **explode()** functions


##### 1. **Aggregate Functions**:
-   These functions compute a single result from multiple rows.



- **`sum()`**: Calculates the sum of values in a column.
- **`avg()`**: Computes the average of values in a column.
- **`count()`**: Counts the number of rows.
- **`max()`**: Returns the maximum value in a column.
- **`min()`**: Returns the minimum value in a column.


In [None]:
df = emp_df.groupBy("department").agg(collect_list("first_name"), avg("salary")).show()

##### 2. **Window Functions**:
-   These functions perform calculations across a set of rows related to the current row.

- **`row_number()`**: Assigns a unique row number.
- **`rank()`**: Assigns a rank with gaps for ties.
- **`dense_rank()`**: Assigns a rank without gaps for ties.
- **`lag()`**: Gets the value of a previous row.
- **`lead()`**: Gets the value of a subsequent row.

In [None]:
windowSpec = Window.partitionBy("category").orderBy("value")
df.withColumn("row_num", row_number().over(windowSpec)).show()

##### 3. **String Functions**:
-   These functions manipulate string data.

- **`concat()`**: Concatenates multiple columns.
- **`lower()`**: Converts a string to lowercase.
- **`upper()`**: Converts a string to uppercase.
- **`substring()`**: Extracts a substring from a column.
- **`length()`**: Returns the length of a string.

In [None]:
df = emp_df.withColumn("full_name", upper(concat(substring(emp_df.first_name, 1, 1), lit(". "), emp_df.last_name))).show()

##### 4. **Date and Time Functions**:
-   These functions work with date and time data.

- **`current_date()`**: Returns the current date.
- **`current_timestamp()`**: Returns the current timestamp.
- **`datediff()`**: Returns the difference between two dates.
- **`date_add()`**: Adds days to a date.
- **`date_sub()`**: Subtracts days from a date.

In [None]:
df.withColumn("days_since", datediff(current_date(), df.start_date)).show()

##### 5. **Conditional Functions**:
-   These functions handle conditional logic and null handling.

- **`when()`**: Similar to SQL `CASE`, returns different values based on conditions.
- **`coalesce()`**: Returns the first non-null value.
- **`isnull()`**: Checks if a column is null.
- **`isnotnull()`**: Checks if a column is not null.

In [None]:
emp_df.withColumn("salary_level", when(emp_df.salary > 80000, "High").otherwise("Low")).show()

##### 6. **Math Functions**:
-   These functions perform mathematical operations.

- **`abs()`**: Returns the absolute value.
- **`round()`**: Rounds a value to a specified number of decimal places.
- **`sqrt()`**: Returns the square root of a value.

In [None]:
df.withColumn("rounded_value", round(df.value, 2)).show()

##### 7. **Collection Functions**:
-   These functions work with arrays and maps.

- **`collect_list()`**:
- **`explode()`**: Creates a new row for each element in an array.
- **`array()`**: Creates an array from multiple columns.
- **`array_contains()`**: Checks if a specific element exists in an array.
- **`array_distinct()`**: Returns an array with distinct elements.
- **`array_remove()`**: Removes all instances of a specified value from an array.
- **`sort_array()`**: Sorts the elements of an array.
- **`size()`**: Returns the length of an array or map.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, explode

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Collect List and Explode Example") \
    .getOrCreate()

# Sample data
data = [
    ("Alice", "Math"),
    ("Alice", "Science"),
    ("Bob", "Math"),
    ("Bob", "English"),
    ("Charlie", "Math"),
    ("Charlie", "Science"),
    ("Charlie", "History")
]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "subject"])

# Show the original DataFrame
print("Original DataFrame:")
df.show()

# Group by name and collect subjects into a list
collected_df = df.groupBy("name").agg(collect_list("subject").alias("subjects"))

# Show the DataFrame with collected lists
print("DataFrame after collect_list():")
collected_df.show(truncate=False)

# Explode the list of subjects into individual rows
exploded_df = collected_df.select("name", explode("subjects").alias("subject"))

# Show the DataFrame with exploded lists
print("DataFrame after explode():")
exploded_df.show()

df.withColumn("has_value", array_contains(df.array_column, 100)).show()


##### 8. **Miscellaneous Functions**:
-   These are various utility functions.

- **`lit()`**: Creates a column with a literal value.
- **`col()`**: Refers to a column by name.
- **`expr()`**: Parses an expression string into a column.

In [None]:
df.withColumn("constant", lit(1)).show()

##### 10. **Regular Expression Functions**
-   These functions perform operations using regular expressions.


- **`regexp_extract()`**: Extracts a substring using a regular expression.
- **`regexp_replace()`**: Replaces substrings matching a regular expression with another string.


In [None]:
df.withColumn("extracted", regexp_extract(df.text_column, r"\d+", 0)).show()

##### 12. **Map Functions**
-   These functions are used for manipulating map columns.


- **`map_keys()`**: Returns an array of keys in a map column.
- **`map_values()`**: Returns an array of values in a map column.
- **`map_entries()`**: Returns an array of struct for each key-value pair in a map.
- **`element_at()`**: Returns the value for a given key from a map.


In [None]:
df.withColumn("map_keys", map_keys(df.map_column)).show()

##### 13. **Conditional and Boolean Functions**
-   These functions help with conditional logic and boolean operations.


- **`greatest()`**: Returns the greatest value among columns.
- **`least()`**: Returns the least value among columns.
- **`ifnull()`**: Returns a specified value if the column is null.
- **`nvl()`**: Alias for `coalesce()`, returns the first non-null value.
- **`expr()`**: Evaluates a string expression (used for more complex SQL-like expressions).


In [None]:
df.withColumn("max_value", greatest(df.col1, df.col2, df.col3)).show()

##### 14. **Bitwise Functions**
-   These functions perform bitwise operations on columns.


- **`bitwise_not()`**: Performs bitwise NOT on a column.
- **`bitwise_and()`**: Performs bitwise AND between two columns.
- **`bitwise_or()`**: Performs bitwise OR between two columns.
- **`bitwise_xor()`**: Performs bitwise XOR between two columns.


In [None]:
df.withColumn("bitwise_result", bitwise_and(df.col1, df.col2)).show()


##### 15. **Partitioning Functions**
-   These functions can be used to partition data or perform computations within partitions.


- **`partitionBy()`**: Partitions the data based on specified columns.
- **`cume_dist()`**: Computes the cumulative distribution of values in a partition.
- **`ntile()`**: Divides the rows in a partition into `n` number of buckets.
- **`percent_rank()`**: Computes the relative rank of each row within a partition as a percentage.

In [None]:
windowSpec = Window.partitionBy("category").orderBy("value")
df.withColumn("cume_dist", cume_dist().over(windowSpec)).show()


##### **JSON Functions**
-   These functions work with JSON data types.



- **`from_json()`**: Parses a JSON string and converts it into a Spark DataFrame.
- **`to_json()`**: Converts a column into a JSON string.
- **`get_json_object()`**: Extracts JSON object from a string based on a JSON path.
- **`json_tuple()`**: Extracts values of JSON objects and returns them as tuple columns.


In [None]:
df.withColumn("json_field", get_json_object(df.json_column, "$.field_name")).show()

##### **Miscellaneous Functions**
-   Other useful utility functions.


- **`monotonically_increasing_id()`**: Generates a unique ID for each row, useful when you don’t have an existing ID column.
- **`sha2()`**: Computes the SHA-2 hash of a string or column.
- **`rand()`**: Generates a random number.
- **`explode_outer()`**: Like `explode()`, but retains rows for empty arrays.


In [None]:
df.withColumn("unique_id", monotonically_increasing_id()).show()

##### Summary Table:


| Category                | Common Functions                                     |
|-------------------------|------------------------------------------------------|
| **Aggregate**            | `sum()`, `avg()`, `count()`, `max()`, `min()`        |
| **Window**               | `row_number()`, `rank()`, `dense_rank()`, `lag()`, `lead()` |
| **String**               | `concat()`, `lower()`, `upper()`, `substring()`, `length()` |
| **Date/Time**            | `current_date()`, `datediff()`, `date_add()`, `date_sub()` |
| **Conditional**          | `when()`, `coalesce()`, `isnull()`, `isnotnull()`    |
| **Math**                 | `abs()`, `round()`, `sqrt()`                        |
| **Collection**           | `array()`, `explode()`, `size()`                    |
| **Miscellaneous**        | `lit()`, `col()`, `expr()`                          |
| **Null Handling**          | `na.fill()`, `na.drop()`, `na.replace()`, `fillna()`, `dropna()` |
| **Regex**                  | `regexp_extract()`, `regexp_replace()`                      |
| **Array**                  | `array_contains()`, `array_distinct()`, `array_remove()`, `sort_array()` |
| **Map**                    | `map_keys()`, `map_values()`, `map_entries()`, `element_at()` |
| **Conditional/Boolean**    | `greatest()`, `least()`, `ifnull()`, `nvl()`, `expr()`      |
| **Bitwise**                | `bitwise_not()`, `bitwise_and()`, `bitwise_or()`, `bitwise_xor()` |
| **Partitioning**           | `partitionBy()`, `cume_dist()`, `ntile()`, `percent_rank()` |
| **JSON**                   | `from_json()`, `to_json()`, `get_json_object()`, `json_tuple()` |
| **Miscellaneous**          | `monotonically_increasing_id()`, `sha2()`, `rand()`, `explode_outer()` |



## Demo with `pyspark.sql.window`

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    rank, dense_rank,row_number, lag, lead, cume_dist, percent_rank, ntile, 
    sum, avg, first, last
)

In [None]:
# Initialize Spark session
spark = SparkSession.builder.master("local").appName("WindowFunctionsDemo").getOrCreate()

# Sample Data
data = [("A", 1000), ("B", 900), ("C", 900), ("D", 800)]
df = spark.createDataFrame(data, ["Employee", "Salary"])

window_spec = Window.partitionBy("Salary").orderBy(df['Salary'].desc())

# Define a window specification (partition by nothing and order by Salary)
window_spec = Window.orderBy(df['Salary'].desc())

**`rank()`**: Assigns a rank to each row within a window partition based on the order, with gaps in ranks when there are ties.

**`dense_rank()`**: Similar to `rank()`, but without gaps in rank numbers when there are ties.

In [None]:
# Apply rank and dense_rank functions
df.withColumn("rank", rank().over(window_spec)) \
  .withColumn("dense_rank", dense_rank().over(window_spec)) \
  .show()

In [None]:
df.withColumn("row_number", row_number().over(window_spec)).show()

-   **lag(column, offset=1, default=None)**: It is used to access the previous row from the current row based on a given ordering.

    -   `column`: The column on which the function is applied.
    -   `offset`: The number of rows behind the current row to look at. The default is 1 (the previous row).
    -   `default`: If there are no previous rows (e.g., for the first row), the function returns None by default or you can set a custom default value.

-   **lead(column, offset=1, default=None)**: It is used to access the next row from the current row based on a given ordering.

    -   `column`: The column on which the function is applied.
    -   `offset`: The number of rows ahead of the current row to look at. The default is 1 (the next row).
    -   `default`: If there are no next rows (e.g., for the last row), the function returns None by default or you can set a custom default value.

In [None]:
df.withColumn("lag", lag("Salary", 1).over(window_spec)) \
  .withColumn("lead", lead("Salary", 1).over(window_spec)) \
  .show()

In [None]:
df.withColumn("cume_dist", cume_dist().over(window_spec)).show()

-   `percent_rank()` is based on the ranking of the rows but scaled as a percentage.
-   The first row always has a percent_rank of 0.0.
-   The last row in the partition will have a percent_rank of 1.0.

-   $percent\_rank= \frac{total rows−1}{rank−1}$


​


In [None]:
df.withColumn("percent_rank", percent_rank().over(window_spec)).show()

 **`ntile(n)`**: Divides the rows into `n` buckets or tiles and assigns each row a bucket number.


In [None]:
df.withColumn("ntile", ntile(3).over(window_spec)).show()

In [None]:
df.withColumn("sum_salary", sum("Salary").over(window_spec)) \
  .withColumn("avg_salary", avg("Salary").over(window_spec)) \
  .show()

-   **`first()`**: Returns the first value within the window partition.
-   **`last()`**: Returns the last value within the window partition.

In [None]:
df.withColumn("first_salary", first("Salary").over(window_spec)) \
  .withColumn("last_salary", last("Salary").over(window_spec)) \
  .show()

## UDF

In Apache PySpark, you can `define` and `register` user-defined functions (UDFs) to perform custom operations on DataFrame columns. There are several ways to define and register UDFs in PySpark:

-   **Example 1**: Simple Scalar UDF

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("UDF Example") \
    .getOrCreate()

# Sample data
data = [(1, "Alice"), (2, "Bob")]
df = spark.createDataFrame(data, ["id", "name"])

# Define a Python function
def upper_case(name):
    return name.upper()

# Register the UDF
upper_case_udf = udf(upper_case, StringType())

# Use the UDF
df.withColumn("name_upper", upper_case_udf(df["name"])).show()


##### Register and Use UDF in SQL
-   You can register UDFs as SQL functions and use them in SQL queries.

In [None]:
# Register the UDF
spark.udf.register("upper_case_sql", upper_case, StringType())

# Create a temporary view
df.createOrReplaceTempView("people")

# Use the UDF in a SQL query
spark.sql("SELECT id, name, upper_case_sql(name) as name_upper FROM people").show()

-   **Example 2**: UDF with Type Hints

In [None]:
# Define a Python function with type hints
def add_one(x: int) -> int:
    return x + 1

# Register the UDF
add_one_udf = udf(add_one, IntegerType())


# Use the UDF
df.withColumn("id_plus_one", add_one_udf(df["id"])).show()

##### Using `pandas_udf` for Vectorized UDFs

Vectorized UDFs are faster than standard UDFs because they utilize Apache Arrow to batch process data.

In [None]:

from pyspark.sql.functions import pandas_udf

# Define a vectorized UDF
@pandas_udf(StringType())
def upper_case_vec(names: pd.Series) -> pd.Series:
    return names.str.upper()

# Use the vectorized UDF
df.withColumn("name_upper", upper_case_vec(df["name"])).show()

##### Using `pandas_udf` for Grouped Map Operations

Grouped map operations allow you to apply a UDF to each group of data, which is useful for complex transformations.

In [None]:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Sample data with a group column
data = [(1, "Alice", "A"), (2, "Bob", "B"), (3, "Charlie", "A"), (4, "David", "B")]
df = spark.createDataFrame(data, ["id", "name", "group"])

# Define a grouped map UDF
@pandas_udf(
    StructType([
        StructField("id", IntegerType()),
        StructField("name", StringType()),
        StructField("group", StringType()),
        StructField("name_upper", StringType())
    ]),
    PandasUDFType.GROUPED_MAP
)
def upper_case_grouped(df: pd.DataFrame) -> pd.DataFrame:
    df["name_upper"] = df["name"].str.upper()
    return df

# Use the grouped map UDF
df.groupby("group").apply(upper_case_grouped).show()


##### Notes

-   **Standard UDFs (udf)**: Suitable for scalar operations and simple transformations. Defined using pyspark.sql.functions.udf.
-   **Vectorized UDFs (pandas_udf)**: Utilize Apache Arrow for performance optimization, suitable for batch operations. Defined using pyspark.sql.functions.pandas_udf.
-   **SQL UDFs**: Register UDFs as SQL functions using spark.udf.register and use them in SQL queries.
-   **Grouped Map UDFs (pandas_udf with PandasUDFType.GROUPED_MAP)**: Apply UDFs to each group of data, useful for complex group-based transformations.

## I/O

##### RDBMS

<b style="color:magenta">How to read data from MySQL in Apache Spark?</b>

-  **Download the MySQL JDBC Driver**:

    -   You can download the `MySQL Connector/J` (JDBC driver) from the official MySQL website: MySQL Connector/J.
    -   Choose the version that matches your environment (e.g., mysql-connector-java-8.0.30.jar).


- **How to nsure the Driver is Available to Spark**: 
    -    Place the downloaded `mysql-connector-java-8.0.xx.jar` file in a directory accessible by your Spark environment. Note the full path to this JAR file.

    -   **Placing the JAR in Spark’s jars Directory**: You can also place the MySQL JDBC JAR file in Spark’s `jars` folder, typically located in your Spark installation directory (`$SPARK_HOME/jars`). Spark will automatically load any JARs found in this folder when it starts.

    - Specify the JAR file for Spark Submit using **spark-submit** Command:

        - Use the `--jars` option to include the MySQL JDBC driver when you submit your Spark job.
        - `spark-shell --jars /Users/am/mydocs/Software_Development/Big_Data/elements_of_spark/mysql-connector-j-8.0.32.jar scripts/read_mysql.py`

    - Specify the JAR file for Spark Shell using **spark-shell** Command:
        - `spark-shell --jars /Users/am/mydocs/Software_Development/Big_Data/elements_of_spark/mysql-connector-j-8.0.32.jar`
        - `pyspark --jars /Users/am/mydocs/Software_Development/Big_Data/elements_of_spark/mysql-connector-j-8.0.32.jar`

In [None]:
# JDBC URL format: jdbc:mysql://<host>:<port>/<db-name>
# jdbc_url = "jdbc:mysql://localhost:3306/interview_questions"
jdbc_url = "jdbc:mysql://localhost:3306"
mysql_driver = f"{os.environ['SPARK_HOME']}/jars/mysql-connector-j-8.0.32.jar"

# Connection properties
connection_properties = {
    "user": "Shah",
    "password": "shah711409",
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Data Ingestion from MySQL into  Spark") \
    .config("spark.jars", mysql_driver) \
    .getOrCreate()

###### Read from RDBMS

In [None]:
print({**connection_properties, "dbtable":"Bonus", "url":jdbc_url+'/interview_questions'})

In [None]:
# Reading table from MySQL
# df = spark.read.jdbc(url=jdbc_url+'/interview_questions', table="Bonus", properties=connection_properties)


options = {**connection_properties, "dbtable":"Bonus", "url":jdbc_url+'/interview_questions'}
df = spark.read.format("jdbc").options(**options).load()

df.show()

24/11/10 15:31:34 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

###### Write to RDBMS

In [None]:
## Write OPTION 1
# emp_df.write.jdbc(url=jdbc_url+'/IQ', table="Employees", mode="overwrite", properties=connection_properties)

In [None]:
# SQL statement with additional constraints
create_emp_sql = """CREATE TABLE employee (
	EMPLOYEE_ID INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
	FIRST_NAME CHAR(25),
	LAST_NAME CHAR(25),
	DEPARTMENT CHAR(25),
	SALARY INT(15),
	JOINING_DATE CHAR(25),
	EMAIL CHAR(100)
);
"""

# SQL statement with additional constraints
create_bns_sql = """CREATE TABLE Bonus IF NOT EXISTS (
	EMPLOYEE_REF_ID INT,
	BONUS_AMOUNT INT(10),
	BONUS_DATE DATETIME,
	FOREIGN KEY (EMPLOYEE_REF_ID) REFERENCES Employee(EMPLOYEE_ID) ON DELETE CASCADE
);
"""

In [None]:

# JDBC options with the preactions to create the table with constraints
jdbc_options = {
    "url": jdbc_url+'/IQ',
    "dbtable": "Bonus",
    "user": connection_properties["user"],
    "password": connection_properties["password"],
    "driver": connection_properties["driver"],
}

In [None]:
emp_df = employees_df(30)
# emp_df = emp_df.fillna({'salary': 0,})
# emp_df.na.fill({"EMAIL": "UNKNOWN"})
# emp_df = emp_df.replace({None: "UNKNOWN"}, subset=['EMAIL'])

In [None]:
# emp_df = emp_df.select('FIRST_NAME','LAST_NAME','DEPARTMENT','SALARY','JOINING_DATE','EMAIL')
# emp_df.show()

In [None]:
# Update JDBC options
jdbc_options["preactions"] = create_emp_sql # This will create the table if it doesn't exist with the primary key constraint
emp_df.show(5)

`NOTES`: To append into an existing table, data must be validated in accordance with table constraints.

In [None]:
## Write OPTION 2
# To append into an existing table, data must be validated in accordance with table constraints.
emp_df.write \
    .format("jdbc") \
    .options(**jdbc_options) \
    .mode("append") \
    .save()


##### Convert from `*.parqet` to `*.csv`

In [None]:
DATA_DIR = os.environ['DATA'] + '/Spark_Experiments'
transactions_file_pq = DATA_DIR + "/transactions.parquet"

In [None]:
df_transactions = spark.read.parquet(transactions_file)
df_transactions.show()

In [None]:
df_transactions.coalesce(1).write.mode('overwrite').option("header", "true").csv(DATA_DIR + "/TNX_test.csv")
# df_transactions.repartition(5).write.mode("overwrite").option("header", "true").csv(DATA_DIR + "/TNX_test.csv")

In [None]:
transactions_file_csv = DATA_DIR + "/TNX_test.csv"
df_transactions_csv = spark.read.parquet(transactions_file)
df_transactions_csv.show()

##### Convert from `*.csv` to `*.parqet`

In [None]:
transactions_file_csv = DATA_DIR + "/TNX.csv"
df_transactions = spark.read.parquet(transactions_file)
df_transactions.repartition(5).write.mode("overwrite").option("header", "true").parquet(DATA_DIR + "/TNX_test.parquet")

## Example of RDD Operations:

**Setup: Create an RDD**: Let's start with an example RDD that contains tuples of names, subjects, and scores.

In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("RDDGroupingAggregation").getOrCreate()

# Create an RDD
data = [
    ('Alice', 'Math', 85),
    ('Bob', 'Math', 75),
    ('Alice', 'Science', 90),
    ('Bob', 'Science', 80),
    ('Charlie', 'Math', 50),
    ('Charlie', 'Science', 60)
]

# Parallelize the data into an RDD
rdd = spark.sparkContext.parallelize(data)


1. **Group By Key (`groupByKey`)**: `groupByKey()` groups the data based on a key, which in this case would be the student's name or subject.
    - Here, we used `groupByKey()` to group the scores by the student's name. The result is an RDD where each key (name) maps to an iterable of values (scores).


In [None]:
# Group by the first element (name) and collect the results
grouped_by_name = rdd.map(lambda x: (x[0], x[2])).groupByKey()

# Collect and print results
for name, scores in grouped_by_name.collect():
    print(f"{name}: {list(scores)}")


2. **Reduce By Key (`reduceByKey`)**: `reduceByKey()` is used for aggregating values based on keys. It combines values for each key using the provided function (e.g., summing up the scores).
    - Here, we used `reduceByKey()` to sum the scores for each student.


In [None]:
# Map by name and score, then reduce by summing the scores for each student
total_scores = rdd.map(lambda x: (x[0], x[2])).reduceByKey(lambda a, b: a + b)

# Collect and print results
total_scores.collect()

1. **Combine By Key (`combineByKey`)**: `combineByKey()` allows for more flexible aggregations by using three functions:
    - A **createCombiner** function to initialize the value for a key.
    - A **mergeValue** function to combine a new value into the existing accumulator.
    - A **mergeCombiners** function to merge values from different partitions.
    - In this case, `combineByKey()` is used to accumulate both the sum and count of scores, and then compute the average.


In [None]:
# Combine by key to calculate average scores for each student
avg_scores = rdd.map(lambda x: (x[0], x[2])).combineByKey(
    (lambda x: (x, 1)),                    # createCombiner (value, count)
    (lambda acc, x: (acc[0] + x, acc[1] + 1)),  # mergeValue
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))  # mergeCombiners
).mapValues(lambda v: v[0] / v[1])         # Final division to get the average

# Collect and print results
avg_scores.collect()

**Group By Key with Aggregation (`groupByKey` and `mapValues`)**: `groupByKey()` can also be used followed by aggregation. However, note that `reduceByKey` and `combineByKey` are more efficient for large datasets since they reduce the data during the shuffle.
    - This approach groups the data by subject and then sums the scores for each subject.

In [None]:
# Group by the second element (subject) and sum the scores
grouped_by_subject = rdd.map(lambda x: (x[1], x[2])).groupByKey().mapValues(lambda scores: sum(scores))

# Collect and print results
grouped_by_subject.collect()

**Aggregate By Key (`aggregateByKey`)**: `aggregateByKey()` is a powerful function that allows you to control the aggregation process with more flexibility. It is similar to `combineByKey`, but more general-purpose. It lets you:
    - Define an initial zero value (or combiner).
    - Define how values should be combined locally (within partitions) and globally (across partitions).

In [None]:
# Aggregate by key to calculate total and count of scores per subject
sum_and_count = rdd.map(lambda x: (x[1], x[2])).aggregateByKey(
    (0, 0),                                 # Initial (sum, count) pair
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),  # Within partition aggregation
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))  # Across partition aggregation
)

# Collect and print results
sum_and_count.collect()

**Key-based Sorting (`sortByKey`)**: You can sort the keys of an RDD to get the result in a sorted manner.

In [None]:
# Sort by key (student name) after calculating total scores
sorted_total_scores = total_scores.sortByKey()

# Collect and print results
sorted_total_scores.collect()

**Group By and Count Occurrences**: You can also use `groupByKey()` to count occurrences of specific values or categories.

In [None]:
# Group by name, then count how many subjects each student has
subject_count = rdd.map(lambda x: (x[0], x[1])).groupByKey().mapValues(len)

# Collect and print results
subject_count.collect()

# Stop `Spark`

In [None]:
# Stop the Spark session
spark.stop()