# Global

In [None]:
import logging
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.functions import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [None]:
SPARK_CONFIGS = {
    "spark.sql.sources.partitionOverwriteMode": "dynamic",
    "spark.jars.packages": "io.delta:delta-core_2.13:2.4.0",
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
}

In [None]:
# Global variables or constants
INPUT_PATH = "/data/workspace_files/input/"
OUTPUT_PATH = "/data/workspace_files/output/"

MAIL_SERVER = "aaa.com"
MAIL_PORT = 0
SENDER_ID = "xyz@em.com"
RECEIVER_IDS = ("abc@em.com", "")

In [None]:
SIMPLE_EMAIL_MESSAGE = """
<html>
    <h1>Test mail</h1>
    <p>This is a Test mail</p>
</html>
"""

In [None]:
# configuring the logger to print logs
log_format1 = '%(asctime)s [%(levelname)-8s] <PID %(process)d:%(processName)s> %(name)s.%(funcName)s: %(message)s'
log_format2 = '%(asctime)s [%(levelname)-8s] %(name)s.%(funcName)s: %(message)s'
log_format3 = '%(asctime)s [%(levelname)-8s] [%(processName)s] %(name)s: %(message)s'

formatter = logging.Formatter(log_format3, datefmt='%d-%b-%Y %H:%M:%S')

console = logging.StreamHandler()
console.setFormatter(formatter)

file_handler = logging.FileHandler(OUTPUT_PATH + '/logs/pyspark_stdout.log', "a")
file_handler.setFormatter(formatter)

logger = logging.getLogger("PySpark Exercises")

logger.handlers.clear()

logger.addHandler(console)
logger.addHandler(file_handler)

logger.setLevel(logging.INFO)

In [1]:
spark = SparkSession.builder \
                    .appName("PySpark Excercises") \
                    .config("spark.sql.warehouse.dir", "/data/workspace_files/warehouse") \
                    .enableHiveSupport() \
                    .getOrCreate()

In [None]:
for key in SPARK_CONFIGS:
    spark.conf.set(key, SPARK_CONFIGS[key])

In [None]:
def send_email(mail_from=SENDER_ID,
               mail_to=RECEIVER_IDS,
               subject=None,
               message_text=None,
               file=None
               ):
    mail_sent = False
    try:
        message = MIMEMultipart("alternative")
        message["Subject"] = subject
        message["From"] = mail_from
        message["To"] = ", ".join(mail_to)
        message.attach(MIMEText(message_text, 'html'))
        if file is not None:
            part = MIMEBase('application', "octet-stream")
            part.set_payload(open(file, "rb").read())
            encoders.encode_base64(part)
            _, tail = os.path.split(file)
            part.add_header('Content-Disposition',
                            'attachment; filename={}'.format(str(tail)))
            message.attach(part)
        print("Message is : \n", message.as_string())
        smtp = smtplib.SMTP(MAIL_SERVER, MAIL_PORT)
        smtp.sendmail(mail_from, mail_to, message.as_string())
        mail_sent = True
        smtp.quit()
    except Exception as e:
        print(e)
    return mail_sent

In [None]:
# Estimate the partition count 
def num_partitions(spark: SparkSession, file_size, num_of_files = 1) -> int:
    # Check the default partition size
    partition_size = int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b",""))
    # Check the default open Cost in Bytes
    open_cost_size = int(spark.conf.get("spark.sql.files.openCostInBytes").replace("b",""))
    # Default parallelism
    parallelism = int(spark.sparkContext.defaultParallelism)
    # Total Actual File Size in Bytes
    total_file_size = file_size * num_of_files
    # Padded file size for Spark read
    padded_file_size = total_file_size + (num_of_files * open_cost_size)
    # Number of Bytes per Core
    bytes_per_core = padded_file_size / parallelism
    # Max Split Bytes
    max_bytes_per_split = min(partition_size, max(open_cost_size, bytes_per_core))
    # Total number of Partitions
    num_of_partitions = padded_file_size / max_bytes_per_split
    
    return num_of_partitions

In [None]:
def get_updated_headers_expr(df: DataFrame) -> list:
    fixed_col_list: list = []
    for col in df.columns:
        fixed_col_list.append(f"`{str(col).strip()}` as {str(col).strip().replace(' ','_').lower()}")
        
    return fixed_col_list

In [None]:
def fix_header(df: DataFrame):
    n_df = df
    for col in df.columns:
        n_df = df.withColumnRenamed(col, str(col).strip().replace(' ','_').lower())
    return n_df

In [None]:
# Create outer method to return the flattened Data Frame
def flatten_json_df(_df: DataFrame) -> DataFrame:
    # List to hold the dynamically generated column names
    flattened_col_list = []
    
    # Inner method to iterate over Data Frame to generate the column list
    def get_flattened_cols(df: DataFrame, struct_col: str = None) -> None:
        for col in df.columns:
            if df.schema[col].dataType.typeName() != 'struct':
                if struct_col is None:
                    flattened_col_list.append(f"{col} as {col.replace('.','_')}")
                else:
                    t = struct_col + "." + col
                    flattened_col_list.append(f"{t} as {t.replace('.','_')}")
            else:
                chained_col = struct_col +"."+ col if struct_col is not None else col
                get_flattened_cols(df.select(col+".*"), chained_col)
    
    # Call the inner Method
    get_flattened_cols(_df)
    
    # Return the flattened Data Frame
    return _df.selectExpr(flattened_col_list)

In [None]:
def union_unmachted_cols_df(df_1: DataFrame, df_2: DataFrame) -> DataFrame:
    # Lets add missing columns from df_2 to df_1
    for col in df_2.columns:
        if col not in df_1.columns:
            df_1 = df_1.withColumn(col, lit(None))
    # Lets add missing columns from df_1 to df_2
    for col in df_1.columns:
        if col not in df_2.columns:
            df_2 = df_2.withColumn(col, lit(None))
    return df_1.unionByName(df_2)

In [None]:
def get_values_as_list(df:Dataframe, column_name:str) -> list:
    return df.select(column_name).distinct().rdd.map(lambda x: x[0]).collect()

In [None]:
def list_to_str(data_list:list):
    return ', \n'.join(data_list)

# Exercise 1

## Objective – Explore data using Pyspark

### Tasks
1. Code to read data from file or databse and do the data exploration:-
    1. Type inference: detect the types of columns in a data frame.
    2. Essentials: type, unique values, missing values
    3. Quantile statistics like minimum value, Q1, median, Q3, maximum, range, inter-quartile range
    4. Descriptive statistics like mean, mode, standard deviation, sum, median absolute deviation, coefficient of variation, kurtosis, skewness
    5. Most frequent values
2. Write the code in such a way that if dataset is chnaged we can still explore the given statistics.

### Inputs
- Any type of data
### Outputs
- Standard output i.e. either in file i.e. excel(preferred) or csv or in table

In [None]:
DATA_TYPES = {
    "numeric" : [],
    "string": [],
    "date_time": []
}

In [None]:
summary_schema = StructType([ \
                             StructField("column_name", StringType(), False), \
                             StructField("data_type", StringType(), False), \
                             StructField("expected_data_type", StringType(), False), \
                             StructField("missing_val_count", StringType(), False), \
                             StructField("unique_val_count", StringType(), False), \
                             StructField("min", StringType(), False), \
                             StructField("max", StringType(), False), \
                             StructField("mean", StringType(), False), \
                             StructField("median", StringType(), False), \
                             StructField("mode", StringType(), False), \
                             StructField("std_dev", StringType(), False), \
                             StructField("q01", StringType(), False), \
                             StructField("q25", StringType(), False), \
                             StructField("q50", StringType(), False), \
                             StructField("q75", StringType(), False), \
                             StructField("q95", StringType(), False), \
                             StructField("q99", StringType(), False), \
                             StructField("kurtosis", StringType(), False), \
                             StructField("skewness", StringType(), False), \
                             StructField("median_abs_dev", StringType(), False), \
                             StructField("coeff_of_var", StringType(), False) \
                            ])

In [None]:
def explore_data(spark, path, schema = None, data_limit = None):
    summary_df = spark.createDataFrame([], )
    data_df = None
    # 1. read data
    if "/" in path:
        data_df = spark.read.load(path)
    else:
        data_df = spark.table(path)
    if schema is not None:
        data_df = data_df.selectExpr(schema)
    if data_limit is not None:
        data_df = data_df.limit(data_limit)
    # 2. get the column names and there default data types
    columns = df.columns
    types = df.dtypes
    # 3. detect the suitable type for the column
    for col in columns:
        pass
    # 4. find the missing and unique value count for each and every column
    # 5. find min, max, mean, median, mode, quantiles, std dev, 
    stats_df = df.summary()
    # 6. find frequently used values i.e. top 5 - 10
    # 7. find median absolute deviation, coefficient of variation, kurtosis, skewness
    # 8. Merge the output from step 2 to 7 into one dataframe (for wirting and displaying) and return that dataframe
    pass

In [None]:
def process_data():
    pass

# Exercise 2

## Objective – Incremental Load of Data (not using the Delta format)

### Tasks

### Inputs

### Output

In [None]:
def get_order_related_data():
    data1 = [
        ["ORD1001", "P003", 70, "01-21-2022", "01-30-2022"],
        ["ORD1004", "P033", 12, "01-24-2022", "01-30-2022"],
        ["ORD1005", "P036", 10, "01-20-2022", "01-30-2022"],
        ["ORD1002", "P016", 2, "01-10-2022", "01-30-2022"],
        ["ORD1003", "P012", 6, "01-10-2022", "01-30-2022"],
    ]
    data2 = [
        ["ORD1002", "P016", 16, "01-10-2022", "01-31-2022"],
        ["ORD1011", "P076", 21, "01-20-2022", "01-31-2022"],
    ]
    pass

# Excersice 3

## Objective – Delta Table Operations

### Tasks

### Inputs

### Output

In [None]:
# TODO: Complete the delta format related functions

In [None]:
def create_delta_table():
    pass

In [None]:
def insert_data_delta():
    pass

In [None]:
def delete_data_delta():
    pass

In [None]:
def upsert_data_delta():
    pass

In [None]:
def read_delta_table():
    pass

# References

1. https://sparkbyexamples.com/spark/spark-performance-tuning/
2. https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism
3. https://pub.towardsai.net/pyspark-job-optimization-techniques-cc85be13af26


https://www.bing.com/search?pglt=2081&q=pyspark+optimization&cvid=0c444541cee94eebb0ff267f75e703c0&aqs=edge.3.69i57j0l8j69i11004.6528j0j1&FORM=ANAB01&PC=U531&ntref=1