## Question 1

1. Clean the Data & handle the Null Values
2. Derive the highest average scored candidate

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, when, regexp_replace, format_number

# Initialize Spark session
spark = SparkSession.builder.appName('TopStudents').getOrCreate()

# Create the data dictionary
data = {
    'student_id': ['ujLsq7296726iyzru', 'zdcku34151cfkek', 'anlya8548116kgokl', 'ykucw6122786vejtt'],
    'math_score': [75.0, None, None, 79.0],
    'reading_score': [69.0, None, 80.0, 78.0],
    'writing_score': [68.0, None, 75.0, None]
}

# Convert the data dictionary to a list of tuples
data_list = list(zip(data['student_id'], data['math_score'], data['reading_score'], data['writing_score']))

# Define the schema
schema = 'student_id string, math_score float, reading_score float, writing_score float'

# Create a DataFrame
df = spark.createDataFrame(data_list, schema)

# Display the DataFrame
df.show()

# Define the function to find the topper
def find_the_topper(df):
    # 1. Drop rows with more than 2 null values
    df = df.withColumn("null_count", 
                    when(col('math_score').isNull(), 1).otherwise(0) + 
                    when(col('reading_score').isNull(), 1).otherwise(0) + 
                    when(col('writing_score').isNull(), 1).otherwise(0))

    df = df.filter(col("null_count") < 2).drop("null_count")

    # Calculate the average scores for each subject
    math_average = df.agg(avg(col('math_score'))).collect()[0][0]
    reading_score_average = df.agg(avg(col('reading_score'))).collect()[0][0]
    writing_score_average = df.agg(avg(col('writing_score'))).collect()[0][0]

    # Fill the missing values with the calculated averages
    df = df.fillna({'math_score': math_average, 'reading_score': reading_score_average, 'writing_score': writing_score_average})

    # 3. Cleaning of student_id
    df = df.withColumn("student_id", regexp_replace(col("student_id"), r'\D', ''))

    # 4. Find the average score and format it to 2 decimal places
    df = df.withColumn("average_score", 
                   format_number((col('math_score') + col('reading_score') + col('writing_score')) / 3, 2))

    # 5. Find the student with the highest average score
    topper = df.select("student_id", "average_score").orderBy(col("average_score").desc()).limit(1)

    return topper

# Run the function and display the result
topper_df = find_the_topper(df)
topper_df.show()

+-----------------+----------+-------------+-------------+
|       student_id|math_score|reading_score|writing_score|
+-----------------+----------+-------------+-------------+
|ujLsq7296726iyzru|      75.0|         69.0|         68.0|
|  zdcku34151cfkek|      null|         null|         null|
|anlya8548116kgokl|      null|         80.0|         75.0|
|ykucw6122786vejtt|      79.0|         78.0|         null|
+-----------------+----------+-------------+-------------+

+----------+-------------+
|student_id|average_score|
+----------+-------------+
|   8548116|        77.33|
+----------+-------------+



## Question 2

1. Clean the Data & handle the Null Values using approxQuantile
2. Derive the highest average scored candidate

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, format_number

# Initialize Spark session
spark = SparkSession.builder.appName('TopStudents').getOrCreate()

# Create the data dictionary
data = {
    'student_id': ['ujLsq7296726iyzru', 'zdcku34151cfkek', 'anlya8548116kgokl', 'ykucw6122786vejtt'],
    'math_score': [75.0, None, None, 79.0],
    'reading_score': [69.0, None, 80.0, 78.0],
    'writing_score': [68.0, None, 75.0, None]
}

# Convert the data dictionary to a list of tuples
data_list = list(zip(data['student_id'], data['math_score'], data['reading_score'], data['writing_score']))

# Define the schema
schema = 'student_id string, math_score float, reading_score float, writing_score float'

# Create a DataFrame
df = spark.createDataFrame(data_list, schema)

# Display the DataFrame
df.show()

# Define the function to find the topper
def find_the_topper(df):
    # 1. Drop rows with more than 2 null values
    df = df.withColumn("null_count", 
                    when(col('math_score').isNull(), 1).otherwise(0) + 
                    when(col('reading_score').isNull(), 1).otherwise(0) + 
                    when(col('writing_score').isNull(), 1).otherwise(0))

    df = df.filter(col('null_count') < 2).drop("null_count")

    # Calculate the median scores for each subject
    def calculate_median(df, col_name):
        return df.approxQuantile(col_name, [0.5], 0.0)[0]
    
    math_median = calculate_median(df, 'math_score')
    reading_score_median = calculate_median(df, 'reading_score')
    writing_score_median = calculate_median(df, 'writing_score')

    # Fill the missing values with the calculated medians
    df = df.fillna({'math_score': math_median, 'reading_score': reading_score_median, 'writing_score': writing_score_median})

    # 3. Cleaning of student_id
    df = df.withColumn("student_id", regexp_replace(col("student_id"), r'\D', ''))

    # 4. Find the average score and format it to 2 decimal places
    df = df.withColumn("average_score", 
                   format_number((col('math_score') + col('reading_score') + col('writing_score')) / 3, 2))

    # 5. Find the student with the highest average score
    topper = df.select("student_id", "average_score").orderBy(col("average_score").desc()).limit(1)

    return topper

# Run the function and display the result
topper_df = find_the_topper(df)
topper_df.show()

+-----------------+----------+-------------+-------------+
|       student_id|math_score|reading_score|writing_score|
+-----------------+----------+-------------+-------------+
|ujLsq7296726iyzru|      75.0|         69.0|         68.0|
|  zdcku34151cfkek|      null|         null|         null|
|anlya8548116kgokl|      null|         80.0|         75.0|
|ykucw6122786vejtt|      79.0|         78.0|         null|
+-----------------+----------+-------------+-------------+

+----------+-------------+
|student_id|average_score|
+----------+-------------+
|   8548116|        76.67|
+----------+-------------+



## Question 3

1. date format Handling 
2. case statement in pyspark

In [0]:
# Corrected data list
data = [
    ('James', '', 'Smith', '1994-04-01', 'M', 3000),
    ('Michael', '', 'Rose', '2000-05-19', 'M', 4000),
    ('Robert', '', 'Williams', '1978-09-05', 'M', 4000),
    ('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000),
    ('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1)
]

# Corrected columns list (as a list)
columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

from pyspark.sql.functions import *
df = df.withColumn("dob", to_date("dob")).withColumn("salary", col("salary").cast("int"))

df = df.withColumn("salary", 
                   when(col("salary") == -1, 1)
                   .when(col("salary") == 4000, 3500)
                   .otherwise(col("salary")))

# Show DataFrame
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1994-04-01|     M|  3000|
|  Michael|          |    Rose|2000-05-19|     M|  3500|
|   Robert|          |Williams|1978-09-05|     M|  3500|
|    Maria|      Anne|   Jones|1967-12-01|     F|  3500|
|      Jen|      Mary|   Brown|1980-02-17|     F|     1|
+---------+----------+--------+----------+------+------+



## Question 4

1. Configuration on pyspark for resource optimization

In [0]:
from pyspark.sql import SparkSession
import getpass

# Get the current user's name
username = getpass.getuser()

# Initialize a Spark session
spark = SparkSession \
        .builder \
        # Provide a name for your application
        .appName("Spark_API") \
        # Set the UI port to avoid conflicts
        .config('spark.ui.port', '0') \
        # Specify the warehouse directory for metastore.
        .config('spark.sql.warehouse.dir', '<path to save metastore details>') \
        # Disable dynamic allocation of resources
        .config('spark.sql.dynamicAllocation.enabled', 'false') \
        # Set the number of executor instances
        .config('spark.executor.instances', '4') \
        # Set the number of cores per executor
        .config('spark.executor.cores', '2') \
        # Set the memory allocated to each executor
        .config('spark.executor.memory', '4G') \
        # Enable off-heap memory
        .config('spark.memory.offHeap.enabled', 'true') \
        # Set the amount of off-heap memory
        .config('spark.memory.offHeap.size', '2G') \
        # Set the number of cores for the driver
        .config('spark.driver.cores', '4') \
        # Set the memory allocated to the driver
        .config('spark.driver.memory', '6G') \
        # Set the fraction of memory for storage
        .config('spark.storage.fraction', '0.7') \
        # Set the fraction of memory allocated to storage after execution
        .config('spark.memory.storageFraction', '0.6') \
        # Enable Hive support for Spark
        .enableHiveSupport() \
        # Set the master to YARN for resource management
        .master('yarn') \
        # Create the Spark session
        .getOrCreate()



## Question 5

1. Code for spark job Submit

In [0]:
from pyspark.sql import SparkSession
import getpass

# Get the current user's name
username = getpass.getuser()

# Initialize a Spark session with Hive support
spark = SparkSession \
        .builder \
        .enableHiveSupport() \
        .getOrCreate()

# Your main Spark application code goes here

# Submit the Spark job with the following configurations
spark-submit \
  # Specify the cluster manager as YARN
  --master yarn \
  # Set the deploy mode to cluster (runs the driver on a YARN cluster node)
  --deploy-mode cluster \
  # Enable verbose logging for detailed output
  --verbose \
  # Set the Spark UI port to avoid conflicts
  --conf spark.ui.port=0 \
  # Disable dynamic resource allocation
  --conf spark.sql.dynamicAllocation.enabled=false \
  # Specify the number of executor instances
  --conf spark.executor.instances=4 \
  # Set the number of cores per executor
  --conf spark.executor.cores=2 \
  # Allocate memory for each executor
  --conf spark.executor.memory=4G \
  # Enable off-heap memory usage
  --conf spark.memory.offHeap.enabled=true \
  # Set the size of off-heap memory
  --conf spark.memory.offHeap.size=2G \
  # Configure the number of cores for the driver
  --conf spark.driver.cores=4 \
  # Allocate memory for the driver
  --conf spark.driver.memory=6G \
  # Determine the fraction of memory used for storage
  --conf spark.storage.fraction=0.7 \
  # Set the fraction of memory allocated for storage after execution
  --conf spark.memory.storageFraction=0.6 \
  # Specify the Python script to run
  example_script.py

In [0]:
spark-submit \
--name Spark_API \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=<path to save metastore details> \
--conf spark.sql.dynamicAllocation.enabled=false \
--conf spark.executor.instances=4 \
--conf spark.executor.cores=2 \
--conf spark.executor.memory=4G \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2G \
--conf spark.driver.cores=4 \
--conf spark.driver.memory=6G \
--conf spark.storage.fraction=0.7 \
--conf spark.memory.storageFraction=0.6 \
--conf spark.sql.catalogImplementation=hive \
spark_app.py

  

## Question 6

1. find the different joins output for given table

In [0]:
%sql
CREATE TABLE TableA (
    id INT
);

CREATE TABLE TableB (
    id INT
);

-- Insert data into TableA
INSERT INTO TableA (id) VALUES (1), (1), (1), (0);

-- Insert data into TableB
INSERT INTO TableB (id) VALUES (1), (1), (0), (1), (NULL);

num_affected_rows,num_inserted_rows
5,5


## Question 7

1. Count the NULL values per column

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Initialize Spark session
spark = SparkSession.builder.appName("CreateDataFrameWithNulls").getOrCreate()

# Define the schema with two columns
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Sample data with null values
data = [
    (1, "Alice"),
    (2, None),      # NULL in the 'name' column
    (None, "Bob"),  # NULL in the 'id' column
    (3, "Charlie"),
    (None, None)    # NULL in both columns
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show the DataFrame
df.show()

from pyspark.sql.functions import *
df_1 = df.select([count(when(col(i).isNull(), i).alias(i)) for i in df.columns])
display(df_1)

+----+-------+
|  id|   name|
+----+-------+
|   1|  Alice|
|   2|   null|
|null|    Bob|
|   3|Charlie|
|null|   null|
+----+-------+



count(CASE WHEN (id IS NULL) THEN id END AS id),count(CASE WHEN (name IS NULL) THEN name END AS name)
2,2


## Question 8

1. convert the multiple delimeter file into structured table format

In [0]:
dbutils.fs.put("dbfs:/FileStore/delimiter_problem.txt", """
               name~|age\nfaizan,mohd~|28\nfurqan.mohd~|28\nali,akbar~|22\nzeesha,mohd~|32
               """, True)

Wrote 103 bytes.
Out[6]: True

In [0]:
df= spark.read.text('dbfs:/FileStore/workout_data/delimeter.txt')
split_df = df.select(
    split(col("value"), r"[|,;]").alias("split_col")
)
display(split_df)

split_col
"List(name, age, city, country)"
"List(John, 25, New York, USA)"
"List(Jane, 30, London, UK)"
"List(Doe, 22, Sydney, Australia)"


In [0]:
final_df = split_df.select(
    col("split_col").getItem(0).alias("name"),
    col("split_col").getItem(1).alias("age"),
    col("split_col").getItem(2).alias("city"),
    col("split_col").getItem(3).alias("country")
)

final_df.show()
first_row_df = final_df.limit(1)

# Filter out the first row to get the remaining rows
remaining_rows_df = final_df.exceptAll(first_row_df)

remaining_rows_df = final_df.where(final_df != first_row_df )
# Show the remaining rows
remaining_rows_df.show()

+----+---+--------+---------+
|name|age|    city|  country|
+----+---+--------+---------+
|name|age|    city|  country|
|John| 25|New York|      USA|
|Jane| 30|  London|       UK|
| Doe| 22|  Sydney|Australia|
+----+---+--------+---------+

+----+---+--------+---------+
|name|age|    city|  country|
+----+---+--------+---------+
| Doe| 22|  Sydney|Australia|
|John| 25|New York|      USA|
|Jane| 30|  London|       UK|
+----+---+--------+---------+



## Question 9

1. Remove Spaces from the column

In [0]:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

# Define the schema
schema = StructType([
    StructField("order_/id", IntegerType(), True),
    StructField("customer name", StringType(), True),
    StructField("pro@duct", StringType(), True),
    StructField("amount", FloatType(), True)
])

# Create the data
data = [
    (1, "John Doe", "Laptop", 1000.50),
    (2, "Jane Smith", "Smartphone", 750.00),
    (3, "Sam Brown", "Tablet", 300.25),
    (4, "Emily Jones", "Headphones", 150.75),
    (5, "Michael Lee", "Monitor", 200.00)
]

# Create the DataFrame
df = spark.createDataFrame(data, schema)

# Show the DataFrame
df.show()

+---------+-------------+----------+------+
|order_/id|customer name|  pro@duct|amount|
+---------+-------------+----------+------+
|        1|     John Doe|    Laptop|1000.5|
|        2|   Jane Smith|Smartphone| 750.0|
|        3|    Sam Brown|    Tablet|300.25|
|        4|  Emily Jones|Headphones|150.75|
|        5|  Michael Lee|   Monitor| 200.0|
+---------+-------------+----------+------+



In [0]:
import re
def correct_column_names(df):
    corrected_columns = [
        re.sub(r'[^a-zA-Z0-9_]', '_', column).strip('_') for column in df.columns
    ]
    return df.toDF(*corrected_columns)

df_corrected = correct_column_names(df)

display(df_corrected)

order__id,customer_name,pro_duct,amount
1,John Doe,Laptop,1000.5
2,Jane Smith,Smartphone,750.0
3,Sam Brown,Tablet,300.25
4,Emily Jones,Headphones,150.75
5,Michael Lee,Monitor,200.0


## Question 10

1. Handeling of Nested JSON file

In [0]:
dbutils.fs.put("dbfs:/FileStore/workout_data/nested.json", """ 
               {
  "name":"MSFT","location":"Redmond", "satellites": ["Bay Area", "Shanghai"],
  "goods": {
    "trade":true, "customers":["government", "distributer", "retail"],
    "orders":[
        {"orderId":1,"orderTotal":123.34,"shipped":{"orderItems":[{"itemName":"Laptop","itemQty":20},{"itemName":"Charger","itemQty":2}]}},
        {"orderId":2,"orderTotal":323.34,"shipped":{"orderItems":[{"itemName":"Mice","itemQty":2},{"itemName":"Keyboard","itemQty":1}]}}
    ]}}
{"name":"Company1","location":"Seattle", "satellites": ["New York"],
  "goods":{"trade":false, "customers":["store1", "store2"],
  "orders":[
      {"orderId":4,"orderTotal":123.34,"shipped":{"orderItems":[{"itemName":"Laptop","itemQty":20},{"itemName":"Charger","itemQty":3}]}},
      {"orderId":5,"orderTotal":343.24,"shipped":{"orderItems":[{"itemName":"Chair","itemQty":4},{"itemName":"Lamp","itemQty":2}]}}
    ]}}
{"name": "Company2", "location": "Bellevue",
  "goods": {"trade": true, "customers":["Bank"], "orders": [{"orderId": 4, "orderTotal": 123.34}]}}
{"name": "Company3", "location": "Kirkland"}
               """)

Wrote 1106 bytes.
Out[52]: True

In [0]:
df = spark.read.option("multiLine", "true").json("dbfs:/FileStore/workout_data/nested.json")
display(df)
df.printSchema()

goods,location,name,satellites
"List(List(government, distributer, retail), List(List(1, 123.34, List(List(List(Laptop, 20), List(Charger, 2)))), List(2, 323.34, List(List(List(Mice, 2), List(Keyboard, 1))))), true)",Redmond,MSFT,"List(Bay Area, Shanghai)"


root
 |-- goods: struct (nullable = true)
 |    |-- customers: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- orders: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- orderId: long (nullable = true)
 |    |    |    |-- orderTotal: double (nullable = true)
 |    |    |    |-- shipped: struct (nullable = true)
 |    |    |    |    |-- orderItems: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- itemName: string (nullable = true)
 |    |    |    |    |    |    |-- itemQty: long (nullable = true)
 |    |-- trade: boolean (nullable = true)
 |-- location: string (nullable = true)
 |-- name: string (nullable = true)
 |-- satellites: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import *

def child_struct(nested_df):
    # Creating python set to store dataframe metadata
    list_schema = [((), nested_df)]
    
    # Creating empty python list for final flattened columns
    flat_columns = []

    # Looping until there are no more schemas to process
    while len(list_schema) > 0:
        # Removing the latest or recently added item (dataframe schema) and returning it into the df variable
        parents, df = list_schema.pop()
        
        # Creating columns for non-struct fields
        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes if c[1][:6] != "struct"
        ]
        
        # Identifying columns that are of struct type
        struct_cols = [c[0] for c in df.dtypes if c[1][:6] == "struct"]
        
        # Adding flat columns to the flat_columns list
        flat_columns.extend(flat_cols)
        
        # Reading nested columns and appending into the stack list
        for i in struct_cols:
            projected_df = df.select(i + ".*")
            list_schema.append((parents + (i,), projected_df))
    
    # Returning the flattened DataFrame with all columns
    return nested_df.select(flat_columns)

In [0]:
from pyspark.sql.functions import explode_outer
from pyspark.sql import DataFrame

def master_array(df: DataFrame) -> DataFrame:
    # Get initial list of array columns
    array_cols = [c[0] for c in df.dtypes if c[1].startswith("array")]
    
    while len(array_cols) > 0:
        for c in array_cols:
            df = df.withColumn(c, explode_outer(col(c)))
        
        # Update the list of array columns after the explosion
        # Assume child_struct is a function that handles additional struct flattening
        df = child_struct(df)
        
        # Get the updated list of array columns
        array_cols = [c[0] for c in df.dtypes if c[1].startswith("array")]
    
    return df

In [0]:
final_output = master_array(df)
display(final_output)

location,name,satellites,goods_customers,goods_trade,goods_orders_orderId,goods_orders_orderTotal,goods_orders_shipped_orderItems_itemName,goods_orders_shipped_orderItems_itemQty
Redmond,MSFT,Bay Area,government,True,1,123.34,Laptop,20
Redmond,MSFT,Bay Area,government,True,1,123.34,Charger,2
Redmond,MSFT,Bay Area,government,True,2,323.34,Mice,2
Redmond,MSFT,Bay Area,government,True,2,323.34,Keyboard,1
Redmond,MSFT,Bay Area,distributer,True,1,123.34,Laptop,20
Redmond,MSFT,Bay Area,distributer,True,1,123.34,Charger,2
Redmond,MSFT,Bay Area,distributer,True,2,323.34,Mice,2
Redmond,MSFT,Bay Area,distributer,True,2,323.34,Keyboard,1
Redmond,MSFT,Bay Area,retail,True,1,123.34,Laptop,20
Redmond,MSFT,Bay Area,retail,True,1,123.34,Charger,2


## Question 11

1. count the number of words from given text

In [0]:
dbutils.fs.put("dbfs:/FileStore/workout_data/word_count.txt", """ 
               RRR[note 1] (subtitled onscreen as Roudram Ranam Rudhiram) is a 2022 Indian Telugu-language epic period action drama film directed by S. S. Rajamouli, who co-wrote the film with V. Vijayendra Prasad. It was produced by D. V. V. Danayya under DVV Entertainment. The film stars N. T. Rama Rao Jr., Ram Charan, Ajay Devgn, Alia Bhatt, Shriya Saran, Samuthirakani, Ray Stevenson, Alison Doody, and Olivia Morris. It is a historical fiction film about two Indian revolutionaries, Alluri Sitarama Raju (Charan) and Komaram Bheem (Rama Rao), their friendship, and their fight against the British Raj.

Made on a budget of ₹550 crore (US$74 million),[6] RRR was the most expensive Indian film at the time of its release. The film was released theatrically on 25 March 2022. With ₹223 crore (US$27 million) worldwide on its first day, RRR recorded the highest opening-day earned by an Indian film. It emerged as the highest-grossing film in its home market of Andhra Pradesh and Telangana, grossing over ₹405.9 crore (US$49 million).[7] The film grossed ₹1,389.31 crore (US$170 million) worldwide, setting several box office records for an Indian film, including the third highest-grossing Indian film, the second highest-grossing Telugu film, the highest grossing Telugu film of 2022 and the highest grossing Indian film of 2022 worldwide.[4][8]

RRR received universal critical acclaim for its direction, screenwriting, cast performances, cinematography, soundtrack, action sequences and VFX. The film was considered one of the ten best films of the year by the National Board of Review, making it only the seventh non-English language film ever to make it to the list.[9] The song "Naatu Naatu" won the Oscar for Best Original Song at the 95th Academy Awards, making it the first song from an Indian film, as well as the first from an Asian film, to win in this category. The win made RRR the first Indian feature film to win an Academy Award.[10][11] The film became the third Indian film and first Telugu film to receive nominations at the Golden Globe Awards, including Best Foreign Language Film, and won Best Original Song for "Naatu Naatu", making it the first Indian (as well as the first Asian) nominee to win the award.[12][13] RRR also won the awards for Best Foreign Language Film and Best Song at the 28th Critics' Choice Awards. At the 69th National Film Awards, the film won six awards, including Best Popular Feature Film, Best Music Direction (Keeravani) and Best Male Playback Singer (Kaala Bhairava for "Komuram Bheemudo").


               """)

Wrote 2578 bytes.
Out[84]: True

In [0]:
RDD_1 = sc.textFile("dbfs:/FileStore/workout_data/word_count.txt")
RDD_2 =RDD_1.flatMap(lambda x: x.lower().split(" "))
RDD_2.collect()


Out[90]: ['',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 'rrr[note',
 '1]',
 '(subtitled',
 'onscreen',
 'as',
 'roudram',
 'ranam',
 'rudhiram)',
 'is',
 'a',
 '2022',
 'indian',
 'telugu-language',
 'epic',
 'period',
 'action',
 'drama',
 'film',
 'directed',
 'by',
 's.',
 's.',
 'rajamouli,',
 'who',
 'co-wrote',
 'the',
 'film',
 'with',
 'v.',
 'vijayendra',
 'prasad.',
 'it',
 'was',
 'produced',
 'by',
 'd.',
 'v.',
 'v.',
 'danayya',
 'under',
 'dvv',
 'entertainment.',
 'the',
 'film',
 'stars',
 'n.',
 't.',
 'rama',
 'rao',
 'jr.,',
 'ram',
 'charan,',
 'ajay',
 'devgn,',
 'alia',
 'bhatt,',
 'shriya',
 'saran,',
 'samuthirakani,',
 'ray',
 'stevenson,',
 'alison',
 'doody,',
 'and',
 'olivia',
 'morris.',
 'it',
 'is',
 'a',
 'historical',
 'fiction',
 'film',
 'about',
 'two',
 'indian',
 'revolutionaries,',
 'alluri',
 'sitarama',
 'raju',
 '(charan)',
 'and',
 'komaram',
 'bheem',
 '(rama',
 'rao),',
 'their',
 'friendship,',
 'and

In [0]:
RDD_3 = RDD_2.filter(lambda a: a!= '')
RDD_3.map(lambda a: (a,1)).reduceByKey(lambda a,b : a+b).map(lambda a: (a[1], a[0])).sortByKey().collect()

Out[96]: [(1, '1]'),
 (1, '(subtitled'),
 (1, 'onscreen'),
 (1, 'roudram'),
 (1, 'rudhiram)'),
 (1, 'epic'),
 (1, 'period'),
 (1, 'drama'),
 (1, 'co-wrote'),
 (1, 'prasad.'),
 (1, 'produced'),
 (1, 'd.'),
 (1, 'n.'),
 (1, 'rama'),
 (1, 'jr.,'),
 (1, 'shriya'),
 (1, 'saran,'),
 (1, 'samuthirakani,'),
 (1, 'alison'),
 (1, 'doody,'),
 (1, 'olivia'),
 (1, 'morris.'),
 (1, 'two'),
 (1, 'revolutionaries,'),
 (1, '(charan)'),
 (1, 'bheem'),
 (1, 'friendship,'),
 (1, 'fight'),
 (1, 'against'),
 (1, '(us$74'),
 (1, 'million),[6]'),
 (1, 'release.'),
 (1, 'opening-day'),
 (1, 'earned'),
 (1, 'home'),
 (1, 'andhra'),
 (1, 'pradesh'),
 (1, 'telangana,'),
 (1, 'million).[7]'),
 (1, 'grossed'),
 (1, 'several'),
 (1, 'universal'),
 (1, 'direction,'),
 (1, 'screenwriting,'),
 (1, 'cast'),
 (1, 'sequences'),
 (1, 'considered'),
 (1, 'ten'),
 (1, 'films'),
 (1, 'year'),
 (1, 'board'),
 (1, 'only'),
 (1, 'seventh'),
 (1, 'ever'),
 (1, 'make'),
 (1, 'list.[9]'),
 (1, 'oscar'),
 (1, '95th'),
 (1, 'asian'),

## Question 12

1. Handling of currupted or bad record data.

In [0]:
dbutils.fs.put("dbfs:/FileStore/workout_data/bad_record.csv", """CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3,Direct Sales,Direct,12,Channel total,1
9,Tele Sales,Direct,12,Channel total,1
5,Catalog,Indirect,13,Channel total,1
4,Internet,Indirect,13,Channel total,1
2,Partners,Others,14,Channel total,1
12,Partners,Others,14,Channel total,1,45,ram,343
alpa,Partners,Others,14,Channel total,1,45,ram,343
10 Partners Others 14 Channel total 1
11 Partners Others 14 Channel total 1
               """)

Wrote 471 bytes.
Out[104]: True

In [0]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path: Union[str, List[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, NoneType] = None, inferSchema: Union[bool, str, NoneType] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[int, str, NoneType] = None, maxCharsPerColumn: Union[int, str, NoneType] = None, maxMalformedLogPerPartition: Union[int, str, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, NoneType] 

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Define the schema
df_schema = StructType([
    StructField('CHANNEL_ID', IntegerType(), True), 
    StructField('CHANNEL_DESC', StringType(), True), 
    StructField('CHANNEL_CLASS', StringType(), True), 
    StructField('CHANNEL_CLASS_ID', IntegerType(), True), 
    StructField('CHANNEL_TOTAL', StringType(), True), 
    StructField('CHANNEL_TOTAL_ID', IntegerType(), True)
])

# Read the CSV file with the specified schema
df = spark.read.schema(df_schema) \
    .option("header", "true") \
    .option("badRecordsPath", "dbfs:/FileStore/workout_data/bad_record_log/") \
    .csv("dbfs:/FileStore/workout_data/bad_record.csv")

# Display the schema of the DataFrame
df.schema

Out[4]: StructType([StructField('CHANNEL_ID', IntegerType(), True), StructField('CHANNEL_DESC', StringType(), True), StructField('CHANNEL_CLASS', StringType(), True), StructField('CHANNEL_CLASS_ID', IntegerType(), True), StructField('CHANNEL_TOTAL', StringType(), True), StructField('CHANNEL_TOTAL_ID', IntegerType(), True)])

In [0]:
display(df)

CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3,Direct Sales,Direct,12,Channel total,1
9,Tele Sales,Direct,12,Channel total,1
5,Catalog,Indirect,13,Channel total,1
4,Internet,Indirect,13,Channel total,1
2,Partners,Others,14,Channel total,1


In [0]:
display(spark.read.csv("dbfs:/FileStore/workout_data/bad_record_log/20240823T064339/bad_records/part-00000-6446d0b3-45c6-429a-9f29-f7b2e9a58651"))

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18
"{""path"":""dbfs:/FileStore/workout_data/bad_record.csv""","""record"":""12",Partners,Others,14.0,Channel total,1.0,45.0,ram,"343""","""reason"":""org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 12",Partners,Others,14.0,Channel total,1.0,45.0,ram,"343""}"
"{""path"":""dbfs:/FileStore/workout_data/bad_record.csv""","""record"":""alpa",Partners,Others,14.0,Channel total,1.0,45.0,ram,"343""","""reason"":""org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: alpa",Partners,Others,14.0,Channel total,1.0,45.0,ram,"343""}"
"{""path"":""dbfs:/FileStore/workout_data/bad_record.csv""","""record"":""10 Partners Others 14 Channel total 1""","""reason"":""org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 10 Partners Others 14 Channel total 1""}",,,,,,,,,,,,,,,,
"{""path"":""dbfs:/FileStore/workout_data/bad_record.csv""","""record"":""11 Partners Others 14 Channel total 1""","""reason"":""org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 11 Partners Others 14 Channel total 1""}",,,,,,,,,,,,,,,,


## Question 13

1. Flatten the Nested JSON 

In [0]:
df=spark.read.option("multiLine", "true").json("dbfs:/FileStore/json_example")
display(df)
df.printSchema()

batters,id,name,ppu,topping,type
"List(List(List(1001, Regular), List(1002, Chocolate), List(1003, Blueberry), List(1004, Devil's Food)))",1,Cake,0.55,"List(List(5001, None), List(5002, Glazed), List(5005, Sugar), List(5007, Powdered Sugar), List(5006, Chocolate with Sprinkles), List(5003, Chocolate), List(5004, Maple))",donut


root
 |-- batters: struct (nullable = true)
 |    |-- batter: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- ppu: double (nullable = true)
 |-- topping: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)



In [0]:
from pyspark.sql.functions import *
df = df.withColumn("batters", explode("batters.batter")) \
    .withColumn("topping", explode("topping")) \
    .select("id", "name", "ppu", "batters.id", "batters.type", "topping.id", "topping.type" )

In [0]:
display(df)

id,name,ppu,id.1,type,id.2,type.1
1,Cake,0.55,1001,Regular,5001,
1,Cake,0.55,1001,Regular,5002,Glazed
1,Cake,0.55,1001,Regular,5005,Sugar
1,Cake,0.55,1001,Regular,5007,Powdered Sugar
1,Cake,0.55,1001,Regular,5006,Chocolate with Sprinkles
1,Cake,0.55,1001,Regular,5003,Chocolate
1,Cake,0.55,1001,Regular,5004,Maple
1,Cake,0.55,1002,Chocolate,5001,
1,Cake,0.55,1002,Chocolate,5002,Glazed
1,Cake,0.55,1002,Chocolate,5005,Sugar


## Question 14

1. How would you calculate the monthly percentage change in revenue using PySpark, ensuring that the output displays only the "year-month" (ym) and revenue_diff_pct columns?

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, date_format

# Create a Spark session
spark = SparkSession.builder.appName("SampleDataFrame").getOrCreate()

# Create the sample data
data = [
    (1, "2019-01-01", 172692, 43),
    (2, "2019-01-05", 177194, 36),
    (3, "2019-01-09", 109513, 30),
    (4, "2019-01-13", 164911, 30),
    (5, "2019-01-17", 198872, 39),
    (6, "2019-01-21", 184853, 31),
    (7, "2019-01-25", 186817, 26),
    (8, "2019-01-29", 137784, 22),
    (9, "2019-02-02", 140032, 25),
    (10, "2019-02-06", 116948, 43),
    (11, "2019-02-10", 162515, 25)
]

# Define the schema for the DataFrame
columns = ["id", "created_at", "value", "purchase_id"]

# Create a PySpark DataFrame
df = spark.createDataFrame(data, schema=columns)

df.show()

df.createOrReplaceTempView('df')


+---+----------+------+-----------+
| id|created_at| value|purchase_id|
+---+----------+------+-----------+
|  1|2019-01-01|172692|         43|
|  2|2019-01-05|177194|         36|
|  3|2019-01-09|109513|         30|
|  4|2019-01-13|164911|         30|
|  5|2019-01-17|198872|         39|
|  6|2019-01-21|184853|         31|
|  7|2019-01-25|186817|         26|
|  8|2019-01-29|137784|         22|
|  9|2019-02-02|140032|         25|
| 10|2019-02-06|116948|         43|
| 11|2019-02-10|162515|         25|
+---+----------+------+-----------+

+---+----------+------+-----------+-------+---------+
| id|created_at| value|purchase_id|     ym|lag_value|
+---+----------+------+-----------+-------+---------+
|  1|2019-01-01|172692|         43|2019-01|     null|
|  2|2019-01-05|177194|         36|2019-01|   172692|
|  3|2019-01-09|109513|         30|2019-01|   177194|
|  4|2019-01-13|164911|         30|2019-01|   109513|
|  5|2019-01-17|198872|         39|2019-01|   164911|
|  6|2019-01-21|184853|    

In [0]:
from pyspark.sql.functions import col, round

# Convert 'created_at' to date format and extract year and month
df = df.withColumn("created_at", col("created_at").cast("date"))
df = df.withColumn("ym", date_format(col("created_at"), "yyyy-MM"))

# Define the window specification
window_spec = Window.partitionBy("ym").orderBy("ym")

# Apply the lag function over the defined window
df = df.withColumn("lag_value", lag(col("value")).over(window_spec))

# Show the resulting DataFrame
df.show()
# Perform the calculation and aggregation
df = df.withColumn(
    "revenue_diff_pct",
    round(((col("value") - col("lag_value")) / col("lag_value")) * 100, 2)
).select("ym", "revenue_diff_pct")

# Display the resulting DataFrame
df.show()

+-------+----------------+
|     ym|revenue_diff_pct|
+-------+----------------+
|2019-01|            null|
|2019-01|            2.61|
|2019-01|           -38.2|
|2019-01|           50.59|
|2019-01|           20.59|
|2019-01|           -7.05|
|2019-01|            1.06|
|2019-01|          -26.25|
|2019-02|            null|
|2019-02|          -16.48|
|2019-02|           38.96|
+-------+----------------+



## Question 15

In [0]:
%sql
WITH temp AS (
    SELECT
        ym,
        value,
        LAG(value) OVER (PARTITION BY ym ORDER BY ym) AS lag_value
    FROM (
        SELECT
            date_format(created_at, 'yyyy-MM') AS ym,
            value
        FROM df
    )
)

-- Step 3: Calculate the percentage difference
SELECT
    ym,
    ROUND(((value - lag_value) / lag_value) * 100, 2) AS revenue_diff_pct
FROM temp;

ym,revenue_diff_pct
2019-01,
2019-01,2.61
2019-01,-38.2
2019-01,50.59
2019-01,20.59
2019-01,-7.05
2019-01,1.06
2019-01,-26.25
2019-02,
2019-02,-16.48


## Question 17

1. Find the senior most employee categorised based on designation.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, year, rank
from pyspark.sql.window import Window

# Create a Spark session
spark = SparkSession.builder.appName("EmployeeDataFrame").getOrCreate()

# Define the data
data = [
    (1, "A", "Jan 6th, 2023", "Analyst"),
    (1, "A", "Mar 6th, 2024", "Senior Analyst"),
    (2, "B", "Feb 6th, 2023", "Consultant"),
    (2, "B", "Jun 6th, 2024", "Senior Consultant"),
    (3, "C", "July 23rd, 2024", "Senior Consultant")
]

# Define the schema/columns
columns = ["employee_id", "name", "effective_date", "designation"]

# Create the DataFrame
df = spark.createDataFrame(data, schema=columns)

# Convert 'effective_date' to date format
df = df.withColumn("effective_date", to_date(df["effective_date"], "MMM d'th', yyyy"))

# Filter for the year 2024
df_2024 = df.filter(year(col("effective_date")) == 2024)

# Define the window specification
window_spec = Window.partitionBy("employee_id").orderBy(col("effective_date").desc())

# Add the rank column and filter by rank = 1
df_ranked = df_2024.withColumn("rnk", rank().over(window_spec)) \
                    .filter(col("rnk") == 1)

# Show the resulting DataFrame
df_ranked.show()

+-----------+----+--------------+-----------------+---+
|employee_id|name|effective_date|      designation|rnk|
+-----------+----+--------------+-----------------+---+
|          1|   A|    2024-03-06|   Senior Analyst|  1|
|          2|   B|    2024-06-06|Senior Consultant|  1|
+-----------+----+--------------+-----------------+---+



## Question 16

In [0]:
%sql
WITH RANK1 AS (
    SELECT
        employee_id,
        name,
        effective_date,
        designation,
        RANK() OVER (PARTITION BY employee_id ORDER BY effective_date DESC) AS rnk
    FROM df
    WHERE YEAR(CAST(effective_date AS DATE)) = 2024
)
SELECT
    employee_id,
    name,
    effective_date,
    designation
FROM RANK1
WHERE rnk = 1;

## Question 18

1. Find the moving average for a week

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DateType, StructField, StructType

# Define the schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("visited_on", StringType(), True),
    StructField("amount", IntegerType(), True)
])

# Create the data
data = [
    (1, "Julia", "1234567890", "2015-05-01", 100),
    (2, "Samantha", "1234567890", "2015-05-02", 200),
    (3, "Julia-Samantha", "1234567890", "2015-05-03", 300)
]

# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
df.show()

+---+--------------+----------+----------+------+
| id|          name|     phone|visited_on|amount|
+---+--------------+----------+----------+------+
|  1|         Julia|1234567890|2015-05-01|   100|
|  2|      Samantha|1234567890|2015-05-02|   200|
|  3|Julia-Samantha|1234567890|2015-05-03|   300|
+---+--------------+----------+----------+------+



In [0]:
# Import necessary functions
from pyspark.sql import Window
from pyspark.sql.functions import avg
df.createOrReplaceTempView("df_sql_table")
# Define the window specification
window_spec = Window.partitionBy("name").orderBy("visited_on").rowsBetween(-6, 0)
# window_spec = Window.partitionBy("name").orderBy("visited_on").rowsBetween(window.unboundedPreceeding, window.currentRow)

# Calculate the moving average and create a new column
solution_df = df.withColumn("moving_average", avg("amount").over(window_spec))

# Show the result
solution_df.show()

+---+--------------+----------+----------+------+--------------+
| id|          name|     phone|visited_on|amount|moving_average|
+---+--------------+----------+----------+------+--------------+
|  1|         Julia|1234567890|2015-05-01|   100|         100.0|
|  3|Julia-Samantha|1234567890|2015-05-03|   300|         300.0|
|  2|      Samantha|1234567890|2015-05-02|   200|         200.0|
+---+--------------+----------+----------+------+--------------+



In [0]:
%sql
SELECT *, AVG(amount) OVER (PARTITION BY name ORDER BY visited_on ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS moving_average
FROM df_sql_table

id,name,phone,visited_on,amount,moving_average
1,Julia,1234567890,2015-05-01,100,100.0
3,Julia-Samantha,1234567890,2015-05-03,300,300.0
2,Samantha,1234567890,2015-05-02,200,200.0


## Question 19 

1. Given the product and invoice details for products at an online store, find all the products that were not sold. For each such product, display its SKU and product name. Order the result by SKU, ascending 

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DecimalType
from decimal import Decimal

# Define PRODUCT table schema
product_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("sku", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("product_description", StringType(), True),
    StructField("current_price", DecimalType(8, 2), True),
    StructField("quantity_in_stock", DecimalType(8, 2), True),
    StructField("is_active", IntegerType(), True)
])

# Sample data for PRODUCT table
product_data = [
    (1, "SKU001", "Laptop", "High performance laptop", Decimal('1000.00'), Decimal('50.00'), 1),
    (2, "SKU002", "Smartphone", "Latest smartphone model", Decimal('800.00'), Decimal('150.00'), 1),
    (3, "SKU003", "Tablet", "Affordable tablet", Decimal('300.00'), Decimal('200.00'), 1)
]

# Create PRODUCT DataFrame
product_df = spark.createDataFrame(product_data, schema=product_schema)

# Show PRODUCT DataFrame
product_df.show()

# Define INVOICE_ITEM table schema
invoice_item_schema = StructType([
    StructField("invoice_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", DecimalType(8, 2), True),
    StructField("price", DecimalType(8, 2), True),
    StructField("line_total_price", DecimalType(8, 2), True)
])

# Sample data for INVOICE_ITEM table
invoice_item_data = [
    (1001, 1, Decimal('2.00'), Decimal('1000.00'), Decimal('2000.00')),
    (1001, 2, Decimal('1.00'), Decimal('800.00'), Decimal('800.00')),
    (1002, 3, Decimal('3.00'), Decimal('300.00'), Decimal('900.00'))
]

# Create INVOICE_ITEM DataFrame
invoice_item_df = spark.createDataFrame(invoice_item_data, schema=invoice_item_schema)

# Show INVOICE_ITEM DataFrame
invoice_item_df.show()

invoice_item_df.createOrReplaceTempView("invoice")
product_df.createOrReplaceTempView("products")

+----------+------+------------+--------------------+-------------+-----------------+---------+
|product_id|   sku|product_name| product_description|current_price|quantity_in_stock|is_active|
+----------+------+------------+--------------------+-------------+-----------------+---------+
|         1|SKU001|      Laptop|High performance ...|      1000.00|            50.00|        1|
|         2|SKU002|  Smartphone|Latest smartphone...|       800.00|           150.00|        1|
|         3|SKU003|      Tablet|   Affordable tablet|       300.00|           200.00|        1|
+----------+------+------------+--------------------+-------------+-----------------+---------+

+----------+----------+--------+-------+----------------+
|invoice_id|product_id|quantity|  price|line_total_price|
+----------+----------+--------+-------+----------------+
|      1001|         1|    2.00|1000.00|         2000.00|
|      1001|         2|    1.00| 800.00|          800.00|
|      1002|         3|    3.00| 300.

In [0]:
from pyspark.sql.functions import col

# Aliasing the DataFrames
df1 = product_df.alias("df1")
df2 = invoice_item_df.alias("df2")

# Performing the left join
join_data_df = df1.join(df2, df1["product_id"] == df2["product_id"], "left") \
    .where(df1["product_id"] == 'NULL') \
    .select(col("df1.sku"), col("df1.product_name"))

# Showing the result
join_data_df.show()

+---+------------+
|sku|product_name|
+---+------------+
+---+------------+



In [0]:
%sql
SELECT p.sku, p.product_name
FROM products p
LEFT JOIN invoice i ON p.product_id = i.product_id
WHERE i.product_id IS NULL
ORDER BY p.sku ASC;

sku,product_name


## Question 20

1. Find the highest score obtained by each candidate out of different event.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DecimalType, StructField, StructType,FloatType


# Define the schema for the scoretable
schema = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("participant_name", StringType(), True),
    StructField("score", FloatType(), True)  # 3 digits, 1 after decimal point
])

# Sample data for scoretable
data = [
    (2187, "Clemencia", 9.0),
    (2187, "Clemencia", 6.6),
    (2187, "Clemencia", 8.6),
    (2187, "Susannah", 8.8)
]

# Create the DataFrame
scoretable_df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
scoretable_df.show()

scoretable_df.createOrReplaceTempView("scoretable_Table")

+--------+----------------+-----+
|event_id|participant_name|score|
+--------+----------------+-----+
|    2187|       Clemencia|  9.0|
|    2187|       Clemencia|  6.6|
|    2187|       Clemencia|  8.6|
|    2187|        Susannah|  8.8|
+--------+----------------+-----+



In [0]:
from pyspark.sql.functions import *
scoretable_df.groupBy("event_id", "participant_name").agg(max("score").alias("max_score")).orderBy("max_score", ascending = False).limit(2).show()

+--------+----------------+---------+
|event_id|participant_name|max_score|
+--------+----------------+---------+
|    2187|       Clemencia|      9.0|
|    2187|        Susannah|      8.8|
+--------+----------------+---------+



In [0]:
%sql
SELECT event_id, participant_name, MAX(score) AS max_score
FROM scoretable_Table
GROUP BY event_id, participant_name
ORDER BY max_score DESC
LIMIT 3


event_id,participant_name,max_score
2187,Clemencia,9.0
2187,Susannah,8.8


## Question 21

1. find the the candidate who secured 1,2,3rd rank in different event.

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import col, max as spark_max, dense_rank, collect_list, sort_array, concat_ws, when

# Step 1: Calculate the highest score per participant for each event
ranked_scores_df = scoretable_df.groupBy("event_id", "participant_name") \
    .agg(spark_max("score").alias("highest_score"))

# Step 2: Apply dense rank based on the highest score for each event
window_spec = Window.partitionBy("event_id").orderBy(col("highest_score").desc())

ranked_participants_df = ranked_scores_df.withColumn(
    "participant_rank", dense_rank().over(window_spec)
)

# Step 3: Filter to only include ranks 1, 2, and 3
top_ranks_df = ranked_participants_df.filter(col("participant_rank") <= 3)

# Step 4: Group by event_id and collect participant names for each rank, sorted alphabetically
ranked_grouped_df = top_ranks_df.groupBy("event_id").agg(
    # Collect and concatenate names for rank 1
    concat_ws(', ', sort_array(collect_list(when(col("participant_rank") == 1, col("participant_name"))))).alias("rank_1_names"),
    # Collect and concatenate names for rank 2
    concat_ws(', ', sort_array(collect_list(when(col("participant_rank") == 2, col("participant_name"))))).alias("rank_2_names"),
    # Collect and concatenate names for rank 3
    concat_ws(', ', sort_array(collect_list(when(col("participant_rank") == 3, col("participant_name"))))).alias("rank_3_names")
)

# Step 5: Order the results by event_id
ranked_grouped_df = ranked_grouped_df.orderBy("event_id")

# Show the result
ranked_grouped_df.show(truncate=False)

+--------+------------+------------+------------+
|event_id|rank_1_names|rank_2_names|rank_3_names|
+--------+------------+------------+------------+
|2187    |Clemencia   |Susannah    |            |
+--------+------------+------------+------------+



In [0]:
%sql
WITH ranked_scores AS (
    -- Get the highest score for each participant in each event
    SELECT 
        event_id, 
        participant_name,
        MAX(score) AS highest_score
    FROM scoretable_Table
    GROUP BY event_id, participant_name
), ranked_participants AS (
    -- Rank participants based on their highest score in each event
    SELECT 
        event_id,
        participant_name,
        highest_score,
        DENSE_RANK() OVER (PARTITION BY event_id ORDER BY highest_score DESC) AS participant_rank
    FROM ranked_scores
),
top_ranks AS (
    -- Select only top 3 ranked participants
    SELECT 
        event_id,
        participant_name,
        participant_rank
    FROM ranked_participants
    WHERE participant_rank <= 3
)
-- Group participants by rank and event
SELECT 
    event_id,
    -- Rank 1 names
    CONCAT_WS(', ', SORT_ARRAY(COLLECT_LIST(CASE WHEN participant_rank = 1 THEN participant_name END))) AS rank_1_names,
    -- Rank 2 names
    CONCAT_WS(', ', SORT_ARRAY(COLLECT_LIST(CASE WHEN participant_rank = 2 THEN participant_name END))) AS rank_2_names,
    -- Rank 3 names
    CONCAT_WS(', ', SORT_ARRAY(COLLECT_LIST(CASE WHEN participant_rank = 3 THEN participant_name END))) AS rank_3_names
FROM top_ranks
GROUP BY event_id
ORDER BY event_id;

event_id,rank_1_names,rank_2_names,rank_3_names
2187,Clemencia,Susannah,


## Question 22

1. Find the monthly max, min and average temperature

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DateType, StructField, StructType


# Define the schema for the temperature_records table
schema = StructType([
    StructField("record_date", StringType(), True),
    StructField("data_type", StringType(), True),
    StructField("data_value", IntegerType(), True)
])

# Sample data for temperature_records
data = [
    ("2020-07-01", "max", 92),
    ("2020-07-01", "min", 71),
    ("2020-07-01", "avg", 74),
    ("2020-07-02", "max", 90),
    ("2020-07-02", "min", 67)
]

# Create a DataFrame
temperature_records_df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
temperature_records_df.show()

temperature_records_df.createOrReplaceTempView("temperature_records_table")

+-----------+---------+----------+
|record_date|data_type|data_value|
+-----------+---------+----------+
| 2020-07-01|      max|        92|
| 2020-07-01|      min|        71|
| 2020-07-01|      avg|        74|
| 2020-07-02|      max|        90|
| 2020-07-02|      min|        67|
+-----------+---------+----------+



In [0]:
from pyspark.sql.functions import date_format, col, avg, min as spark_min, max as spark_max, when

# Step 1: Add a 'month' column and format the 'record_date' to 'yyyy-MM'
monthly_temps_df = temperature_records_df.withColumn(
    "month", date_format(col("record_date"), 'yyyy-MM')
)

# Step 2: Group by 'month' and 'data_type', and calculate the required aggregations
monthly_aggregates_df = monthly_temps_df.groupBy("month").agg(
    # Calculate average temperature
    avg("data_value").alias("monthly_avg_temp"),
    
    # Calculate minimum value for 'min' data_type
    spark_min(when(col("data_type") == "min", col("data_value"))).alias("monthly_min_temp"),
    
    # Calculate maximum value for 'max' data_type
    spark_max(when(col("data_type") == "max", col("data_value"))).alias("monthly_max_temp")
)

# Show the result
monthly_aggregates_df.show(truncate=False)

+-------+----------------+----------------+----------------+
|month  |monthly_avg_temp|monthly_min_temp|monthly_max_temp|
+-------+----------------+----------------+----------------+
|2020-07|78.8            |67              |92              |
+-------+----------------+----------------+----------------+



## Question 23

How can you efficiently perform the above steps using PySpark and spark sql to generate a report that meets the requirements specified, while ensuring correct handling of renaming columns, joining multiple DataFrames, calculating aggregations, and filtering based on the global average invoice amount?

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

# Define schema for country table
country_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("country_name", StringType(), True)
])

# Sample data for country table
country_data = [
    (1, "United States"),
    (2, "Canada"),
    (3, "Mexico")
]

# Create the country DataFrame
country_df = spark.createDataFrame(country_data, schema=country_schema)

# Show country DataFrame
country_df.show()

country_df.createOrReplaceTempView("country")
# Define schema for city table
city_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("city_name", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("country_id", IntegerType(), True)  # Foreign key referencing country.id
])

# Sample data for city table
city_data = [
    (1, "New York", "10001", 1),
    (2, "Los Angeles", "90001", 1),
    (3, "Toronto", "M5H", 2),
    (4, "Vancouver", "V6B", 2),
    (5, "Mexico City", "01000", 3)
]

# Create the city DataFrame
city_df = spark.createDataFrame(city_data, schema=city_schema)

# Show city DataFrame
city_df.show()
city_df.createOrReplaceTempView("city")
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DecimalType, StructField, StructType, FloatType


# Define schema for customer table
customer_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("city_id", IntegerType(), True),  # Foreign key referencing city.id
    StructField("customer_address", StringType(), True),
    StructField("contact_person", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("is_active", IntegerType(), True)
])

# Sample data for customer table
customer_data = [
    (1, "John Doe", 1, "123 Main St", "Jane Doe", "johndoe@example.com", "123-456-7890", 1),
    (2, "Acme Corp", 2, "456 Industrial Rd", "Bob Smith", "bob.smith@acmecorp.com", "987-654-3210", 1),
    (3, "Tech Solutions", 3, "789 Tech Ave", "Alice Johnson", "alice.johnson@techsol.com", "555-123-4567", 1)
]

# Create the customer DataFrame
customer_df = spark.createDataFrame(customer_data, schema=customer_schema)

# Show customer DataFrame
customer_df.show()
customer_df.createOrReplaceTempView("customer")
# Define schema for invoice table
invoice_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("invoice_number", StringType(), True),
    StructField("customer_id", IntegerType(), True),  # Foreign key referencing customer.id
    StructField("user_account_id", IntegerType(), True),
    StructField("total_price", FloatType(), True)
])

# Sample data for invoice table
invoice_data = [
    (1, "INV-1001", 1, 101, 500.00),
    (2, "INV-1002", 2, 102, 1500.50),
    (3, "INV-1003", 3, 103, 250.75),
    (4, "INV-1004", 1, 101, 1000.00)
]

# Create the invoice DataFrame
invoice_df = spark.createDataFrame(invoice_data, schema=invoice_schema)

# Show invoice DataFrame
invoice_df.show()
invoice_df.createOrReplaceTempView("invoice")

+---+-------------+
| id| country_name|
+---+-------------+
|  1|United States|
|  2|       Canada|
|  3|       Mexico|
+---+-------------+

+---+-----------+-----------+----------+
| id|  city_name|postal_code|country_id|
+---+-----------+-----------+----------+
|  1|   New York|      10001|         1|
|  2|Los Angeles|      90001|         1|
|  3|    Toronto|        M5H|         2|
|  4|  Vancouver|        V6B|         2|
|  5|Mexico City|      01000|         3|
+---+-----------+-----------+----------+

+---+--------------+-------+-----------------+--------------+--------------------+------------+---------+
| id| customer_name|city_id| customer_address|contact_person|               email|       phone|is_active|
+---+--------------+-------+-----------------+--------------+--------------------+------------+---------+
|  1|      John Doe|      1|      123 Main St|      Jane Doe| johndoe@example.com|123-456-7890|        1|
|  2|     Acme Corp|      2|456 Industrial Rd|     Bob Smith|bob.

In [0]:
%sql
WITH CountryInvoiceData AS (
    SELECT
        co.country_name,
        COUNT(i.id) AS total_invoices,  -- Corrected to i.id
        AVG(i.total_price) AS avg_invoice_amount  -- Corrected to i.total_price
    FROM
        country co
    JOIN
        city ci ON co.id = ci.country_id
    JOIN
        customer cu ON ci.id = cu.city_id
    JOIN
        invoice i ON cu.id = i.customer_id
    GROUP BY
        co.country_name
),
GlobalAvg AS (
    SELECT
        AVG(total_price) AS global_avg_invoice_amount  -- Corrected to total_price
    FROM
        invoice
)
SELECT
    cid.country_name,
    cid.total_invoices,
    ROUND(cid.avg_invoice_amount, 6) AS avg_invoice_amount
FROM
    CountryInvoiceData cid,
    GlobalAvg ga
WHERE
    cid.avg_invoice_amount > ga.global_avg_invoice_amount;

country_name,total_invoices,avg_invoice_amount
United States,3,1000.166667


In [0]:
from pyspark.sql import functions as F

# Step 1: Rename the `id` columns in each DataFrame to make them unique
country_df = country_df.withColumnRenamed("id", "country_id")
city_df = city_df.withColumnRenamed("id", "city_id").withColumnRenamed("country_id", "city_country_id")
customer_df = customer_df.withColumnRenamed("id", "customer_id").withColumnRenamed("city_id", "customer_city_id")
invoice_df = invoice_df.withColumnRenamed("id", "invoice_id").withColumnRenamed("customer_id", "invoice_customer_id")

# Step 2: Join the DataFrames
# Join country and city on country_id
country_city_df = country_df.join(city_df, country_df.country_id == city_df.city_country_id, "inner") \
                            .drop("city_country_id")

# Join the result with customer on city_id
city_customer_df = country_city_df.join(customer_df, city_df.city_id == customer_df.customer_city_id, "inner") \
                                  .drop("customer_city_id")

# Join the result with invoice on customer_id
customer_invoice_df = city_customer_df.join(invoice_df, customer_df.customer_id == invoice_df.invoice_customer_id, "inner") \
                                      .drop("invoice_customer_id")

# Step 3: Calculate total invoices and average invoice amount for each country
country_invoice_data_df = customer_invoice_df.groupBy("country_name") \
    .agg(
        F.count("invoice_id").alias("total_invoices"),  # Count distinct invoices
        F.avg("total_price").alias("avg_invoice_amount")  # Calculate average invoice amount
    )

# Step 4: Calculate the global average invoice amount
global_avg_df = invoice_df.agg(F.avg("total_price").alias("global_avg_invoice_amount"))

# Step 5: Filter countries where avg_invoice_amount > global_avg_invoice_amount
filtered_df = country_invoice_data_df.crossJoin(global_avg_df) \
    .filter(country_invoice_data_df.avg_invoice_amount > global_avg_df.global_avg_invoice_amount)

# Step 6: Round the average invoice amount to 6 decimal places
final_df = filtered_df.withColumn("avg_invoice_amount", F.round("avg_invoice_amount", 6))

# Step 7: Display the result
final_df.select("country_name", "total_invoices", "avg_invoice_amount").show()

+-------------+--------------+------------------+
| country_name|total_invoices|avg_invoice_amount|
+-------------+--------------+------------------+
|United States|             3|       1000.166667|
+-------------+--------------+------------------+



## Question 24

1. Find the candidate who is getting more salary than 5000

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType


# Define schema for employee_information table
employee_schema = StructType([
    StructField("employee_ID", IntegerType(), True),  # Primary key
    StructField("name", StringType(), True),
    StructField("division", StringType(), True)
])

# Sample data for employee_information table
employee_data = [
    (101, "Alice Johnson", "Sales"),
    (102, "Bob Smith", "Marketing"),
    (103, "Charlie Williams", "Engineering"),
    (104, "David Brown", "HR"),
    (105, "Mahammed Faizan", "HR")
]

# Create the employee_information DataFrame
employee_df = spark.createDataFrame(employee_data, schema=employee_schema)

# Show employee_information DataFrame
employee_df.show()

employee_df.createOrReplaceTempView("employee")
# Define schema for last_quarter_bonus table
bonus_schema = StructType([
    StructField("employee_ID", IntegerType(), True),  # Primary key, also a foreign key referencing employee_information.employee_ID
    StructField("bonus", IntegerType(), True)
])

# Sample data for last_quarter_bonus table
bonus_data = [
    (101, 1000),
    (102, 1500),
    (103, 1200),
    (104, 1100),
    (105, 5100)
]

# Create the last_quarter_bonus DataFrame
bonus_df = spark.createDataFrame(bonus_data, schema=bonus_schema)

# Show last_quarter_bonus DataFrame
bonus_df.show()
bonus_df.createOrReplaceTempView("bonus")

+-----------+----------------+-----------+
|employee_ID|            name|   division|
+-----------+----------------+-----------+
|        101|   Alice Johnson|      Sales|
|        102|       Bob Smith|  Marketing|
|        103|Charlie Williams|Engineering|
|        104|     David Brown|         HR|
|        105| Mahammed Faizan|         HR|
+-----------+----------------+-----------+

+-----------+-----+
|employee_ID|bonus|
+-----------+-----+
|        101| 1000|
|        102| 1500|
|        103| 1200|
|        104| 1100|
|        105| 5100|
+-----------+-----+



In [0]:
from pyspark.sql.functions import col

result_df = employee_df.join(bonus_df, 'employee_ID', 'inner') \
    .select("employee_ID", "name") \
    .where((col("bonus") >= 5000) & (col("division") == "HR"))

result_df.show()

+-----------+---------------+
|employee_ID|           name|
+-----------+---------------+
|        105|Mahammed Faizan|
+-----------+---------------+



In [0]:
%sql
SELECT ei.employee_ID, ei.name
FROM employee ei
JOIN bonus lqb
ON ei.employee_ID = lqb.employee_ID
WHERE ei.division = 'HR'
AND lqb.bonus >= 5000;

employee_ID,name
105,Mahammed Faizan


## Question 25

 How would you write a PySpark and spark sql code that creates a new column Full_Name, groups the data by Full_Name and email, and aggregates the counts of different relationship_type values for each person, ordered by Full_Name?

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Define schema for the input data with the 4 required columns
schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("relationship_type", StringType(), True)
])

# Define the input data
data = [
    ("Terri", "Bartel", "tbartel5@infoseek.co.jp", "family"),
    ("Micheline", "Bottle", "mbottlee@salon.com", "friends"),
    ("Haley", "Bradneck", "hbradneckd@odnoklassniki.ru", "acquaintances"),
    ("Eugine", "Caroline", "ecaroline4@fastcompany.com", "family"),
    ("Cal", "Chalder", "cchalder1@ox.ac.uk", "friends"),
    ("Riva", "Cregin", "rcreginj@sun.com", "acquaintances"),
    ("Othello", "Feeham", "ofeehamg@pinterest.com", "family"),
    ("Tabor", "Giacopello", "tgiacopelloh@flickr.com", "friends"),
    ("Jacklin", "Goodband", "jgoodbandf@vk.com", "acquaintances"),
    ("Vikky", "Mersh", "vmersh6@admin.ch", "family"),
    ("Sunny", "Nannoni", "snannoni8@adobe.com", "friends"),
    ("Brittani", "Oxtiby", "boxtiby7@ftc.gov", "acquaintances"),
    ("Mara", "Phelps", "mphelpsa@indiegogo.com", "family"),
    ("Lola", "Rizzone", "lrizzoneb@google.de", "friends"),
    ("Clarisse", "Rodenhurst", "crodenhurst2@woothemes.com", "acquaintances"),
    ("Hartwell", "Saich", "hsaich0@amazonaws.com", "family"),
    ("Alphonse", "Scinelli", "ascinelli9@msu.edu", "friends"),
    ("Gilbertina", "Tynewell", "gtynewellc@webnode.com", "acquaintances"),
    ("Rosie", "Verrills", "rverrills3@patch.com", "family"),
    ("Job", "Villa", "jvillai@mtv.com", "friends")
]

# Create the DataFrame
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView("data_in_sql")
# Show the DataFrame
df.show(truncate=False)

+----------+----------+---------------------------+-----------------+
|first_name|last_name |email                      |relationship_type|
+----------+----------+---------------------------+-----------------+
|Terri     |Bartel    |tbartel5@infoseek.co.jp    |family           |
|Micheline |Bottle    |mbottlee@salon.com         |friends          |
|Haley     |Bradneck  |hbradneckd@odnoklassniki.ru|acquaintances    |
|Eugine    |Caroline  |ecaroline4@fastcompany.com |family           |
|Cal       |Chalder   |cchalder1@ox.ac.uk         |friends          |
|Riva      |Cregin    |rcreginj@sun.com           |acquaintances    |
|Othello   |Feeham    |ofeehamg@pinterest.com     |family           |
|Tabor     |Giacopello|tgiacopelloh@flickr.com    |friends          |
|Jacklin   |Goodband  |jgoodbandf@vk.com          |acquaintances    |
|Vikky     |Mersh     |vmersh6@admin.ch           |family           |
|Sunny     |Nannoni   |snannoni8@adobe.com        |friends          |
|Brittani  |Oxtiby  

In [0]:
%sql
SELECT
    CONCAT(last_name, ' ', first_name) AS Full_Name,
    email,
    COALESCE(SUM(CASE WHEN relationship_type = 'family' THEN 1 ELSE 0 END), 0) AS family,
    COALESCE(SUM(CASE WHEN relationship_type = 'acquaintances' THEN 1 ELSE 0 END), 0) AS acquaintances,
    COALESCE(SUM(CASE WHEN relationship_type = 'friends' THEN 1 ELSE 0 END), 0) AS friends
FROM 
    data_in_sql r
GROUP BY 
    last_name, first_name, email
ORDER BY 
    Full_Name;

Full_Name,email,family,acquaintances,friends
Bartel Terri,tbartel5@infoseek.co.jp,1,0,0
Bottle Micheline,mbottlee@salon.com,0,0,1
Bradneck Haley,hbradneckd@odnoklassniki.ru,0,1,0
Caroline Eugine,ecaroline4@fastcompany.com,1,0,0
Chalder Cal,cchalder1@ox.ac.uk,0,0,1
Cregin Riva,rcreginj@sun.com,0,1,0
Feeham Othello,ofeehamg@pinterest.com,1,0,0
Giacopello Tabor,tgiacopelloh@flickr.com,0,0,1
Goodband Jacklin,jgoodbandf@vk.com,0,1,0
Mersh Vikky,vmersh6@admin.ch,1,0,0


In [0]:
from pyspark.sql.functions import col, concat, sum, when, lit

result_df = df.withColumn("Full_Name", concat(col("last_name"), lit(" "), col("first_name"))) \
    .groupBy("Full_Name", "email") \
    .agg(
        sum(when(col("relationship_type") == 'family', 1).otherwise(0)).alias("family"),
        sum(when(col("relationship_type") == 'acquaintances', 1).otherwise(0)).alias("acquaintances"),
        sum(when(col("relationship_type") == 'friends', 1).otherwise(0)).alias("friends")
    ) \
    .orderBy("Full_Name")

# Show the result
result_df.show()

+-------------------+--------------------+------+-------------+-------+
|          Full_Name|               email|family|acquaintances|friends|
+-------------------+--------------------+------+-------------+-------+
|       Bartel Terri|tbartel5@infoseek...|     1|            0|      0|
|   Bottle Micheline|  mbottlee@salon.com|     0|            0|      1|
|     Bradneck Haley|hbradneckd@odnokl...|     0|            1|      0|
|    Caroline Eugine|ecaroline4@fastco...|     1|            0|      0|
|        Chalder Cal|  cchalder1@ox.ac.uk|     0|            0|      1|
|        Cregin Riva|    rcreginj@sun.com|     0|            1|      0|
|     Feeham Othello|ofeehamg@pinteres...|     1|            0|      0|
|   Giacopello Tabor|tgiacopelloh@flic...|     0|            0|      1|
|   Goodband Jacklin|   jgoodbandf@vk.com|     0|            1|      0|
|        Mersh Vikky|    vmersh6@admin.ch|     1|            0|      0|
|      Nannoni Sunny| snannoni8@adobe.com|     0|            0| 

##Question 26


A potential scenario question for this code could be:

Scenario:
You have a JSON file with workout data where some of the records have missing or corrupted fields like id, name, or age. Your task is to clean the data by performing the following:

1. If the id field is missing but _corrupt_record exists, extract the id from the corrupted record and update the id field.
2. If the name field is missing but _corrupt_record exists, extract the name from the corrupted record and update the name field. If still missing, set it to "Unknown".
3. If the age field is missing but _corrupt_record exists, extract the numeric value from the corrupted record and update the age field.
Drop the _corrupt_record column.
4. Ensure that missing name values are replaced with "Unknown".
You are given a corrupted JSON file (missing_json.json) in Databricks FileStore, which contains the above-mentioned issues. Write a PySpark script to read the data, clean it, and output the cleaned dataset.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace

df = spark.read.json("dbfs:/FileStore/workout_data/missing_json.json")
print("Input = ")
df.show(truncate=False)

cleaned_df = df.withColumn("id", 
                            when(col("id").isNull() & col("_corrupt_record").isNotNull(), 
                                 regexp_replace(col("_corrupt_record"), r'.*?\'id":?\s*(\d+).*', '$1').cast("int")) \
                            .otherwise(col("id"))) \
                .withColumn("name", 
                            when(col("name").isNull() & col("_corrupt_record").isNotNull(), 
                                 regexp_replace(col("_corrupt_record"), r'.*?"name":\s*"([^"]+)".*', '$1')) \
                            .otherwise(col("name"))) \
                .withColumn("age", 
                            when(col("age").isNull() & col("_corrupt_record").isNotNull(), 
                                 regexp_replace(col("_corrupt_record"), r'.*?(\d+)[^0-9]*$', '$1').cast("int")) \
                            .otherwise(col("age"))) \
                .drop("_corrupt_record")

cleaned_df = cleaned_df.na.fill({"name": "Unknown"}) 
print("output = ")
cleaned_df.show(truncate=False)

Input = 
+-----------------------------+----+----+-------+
|_corrupt_record              |age |id  |name   |
+-----------------------------+----+----+-------+
|null                         |30  |1   |Alice  |
|{'id": 2, "name": "Bob", 40},|null|null|null   |
|null                         |25  |3   |null   |
|null                         |40  |4   |Charlie|
+-----------------------------+----+----+-------+

output = 
+---+---+-------+
|age|id |name   |
+---+---+-------+
|30 |1  |Alice  |
|40 |2  |Bob    |
|25 |3  |Unknown|
|40 |4  |Charlie|
+---+---+-------+



In [0]:
# Read the JSON data and cache the DataFrame
df = spark.read.json("dbfs:/FileStore/workout_data/missing_json.json").cache()

# Now, you can inspect the _corrupt_record safely
df.select("_corrupt_record").show(truncate=False)

# Proceed with fixing the DataFrame
df_fixed = df.withColumn(
    "id", F.coalesce(F.col("id"), F.regexp_extract(F.col("_corrupt_record"), '"id":\\s*"([^"]+)"', 1).cast("int"))
).withColumn(
    "name", F.coalesce(F.col("name"), F.regexp_extract(F.col("_corrupt_record"), '"name":\\s*"([^"]+)"', 1))
).withColumn(
    "age", F.coalesce(F.col("age"), F.regexp_extract(F.col("_corrupt_record"), '"age":\\s*(\\d+)', 1).cast("int"))
)

# Drop _corrupt_record after fixing the data
df_fixed = df_fixed.drop("_corrupt_record")

# Show the cleaned DataFrame
df_fixed.show(truncate=False)

+-----------------------------+
|_corrupt_record              |
+-----------------------------+
|null                         |
|{'id": 2, "name": "Bob", 40},|
|null                         |
|null                         |
+-----------------------------+

+----+----+-------+
|age |id  |name   |
+----+----+-------+
|30  |1   |Alice  |
|null|null|Bob    |
|25  |3   |null   |
|40  |4   |Charlie|
+----+----+-------+



## Question 27

1. Flatten the nested JSON structure

In [0]:
df = spark.read.option("multiLine", "true").json("dbfs:/FileStore/workout_data/nested.json")
display(df)
df.printSchema()

goods,location,name,satellites
"List(List(government, distributer, retail), List(List(1, 123.34, List(List(List(Laptop, 20), List(Charger, 2)))), List(2, 323.34, List(List(List(Mice, 2), List(Keyboard, 1))))), true)",Redmond,MSFT,"List(Bay Area, Shanghai)"


root
 |-- goods: struct (nullable = true)
 |    |-- customers: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- orders: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- orderId: long (nullable = true)
 |    |    |    |-- orderTotal: double (nullable = true)
 |    |    |    |-- shipped: struct (nullable = true)
 |    |    |    |    |-- orderItems: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- itemName: string (nullable = true)
 |    |    |    |    |    |    |-- itemQty: long (nullable = true)
 |    |-- trade: boolean (nullable = true)
 |-- location: string (nullable = true)
 |-- name: string (nullable = true)
 |-- satellites: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [0]:
from pyspark.sql import functions as F

# Sample data loading (replace this with your actual DataFrame)
# df = spark.read.json("path_to_your_data.json")

# Flattening the 'goods' struct
df_flattened = df \
    .withColumn("customers", F.explode_outer("goods.customers")) \
    .withColumn("orders", F.explode_outer("goods.orders")) \
    .withColumn("trade", F.col("goods.trade")) \
    .withColumn("orderId", F.col("orders.orderId")) \
    .withColumn("orderTotal", F.col("orders.orderTotal")) \
    .withColumn("shipped_items", F.explode_outer("orders.shipped.orderItems")) \
    .withColumn("itemName", F.col("shipped_items.itemName")) \
    .withColumn("itemQty", F.col("shipped_items.itemQty")) \
    .withColumn("location", F.col("location")) \
    .withColumn("name", F.col("name")) \
    .withColumn("satellites", F.explode_outer("satellites"))

# Dropping the original nested columns to keep only flattened ones
df_flattened = df_flattened.drop("goods", "orders", "shipped_items")

# Show the flattened schema and data
df_flattened.printSchema()
df_flattened.show(truncate=False)

root
 |-- location: string (nullable = true)
 |-- name: string (nullable = true)
 |-- satellites: string (nullable = true)
 |-- customers: string (nullable = true)
 |-- trade: boolean (nullable = true)
 |-- orderId: long (nullable = true)
 |-- orderTotal: double (nullable = true)
 |-- itemName: string (nullable = true)
 |-- itemQty: long (nullable = true)

+--------+----+----------+-----------+-----+-------+----------+--------+-------+
|location|name|satellites|customers  |trade|orderId|orderTotal|itemName|itemQty|
+--------+----+----------+-----------+-----+-------+----------+--------+-------+
|Redmond |MSFT|Bay Area  |government |true |1      |123.34    |Laptop  |20     |
|Redmond |MSFT|Shanghai  |government |true |1      |123.34    |Laptop  |20     |
|Redmond |MSFT|Bay Area  |government |true |1      |123.34    |Charger |2      |
|Redmond |MSFT|Shanghai  |government |true |1      |123.34    |Charger |2      |
|Redmond |MSFT|Bay Area  |government |true |2      |323.34    |Mice    |2 

## Question 28

#### Problem:
1. Write a PySpark job to identify "underperforming" products in each warehouse. A product is considered underperforming in a warehouse if:

1a. It has a stock level above the ReorderPoint.
1b. It has had no sales in the last 30 days. (last_sale_date - current_date)

In [0]:
sales_data = [
    (1, 101, 1001, '2024-08-15', 10, 200),
    (2, 102, 1002, '2024-09-20', 5, 100),
    (3, 101, 1001, '2024-07-10', 15, 300),
]

inventory_data = [
    (101, 1001, 50, 20),
    (102, 1002, 30, 25),
    (103, 1001, 10, 15),
]

sales_columns = ['SaleID', 'ProductID', 'WarehouseID', 'SaleDate', 'QuantitySold', 'SaleAmount']
inventory_columns = ['ProductID', 'WarehouseID', 'StockLevel', 'ReorderPoint']

sales_df = spark.createDataFrame(sales_data, sales_columns)
inventory_df = spark.createDataFrame(inventory_data, inventory_columns)

print("Displaying sales_df")
display(sales_df)
print("Displaying inventory_df")
display(inventory_df)

# Create temporary views for the DataFrames
sales_df.createOrReplaceTempView("Sales")
inventory_df.createOrReplaceTempView("Inventory")

Displaying sales_df


SaleID,ProductID,WarehouseID,SaleDate,QuantitySold,SaleAmount
1,101,1001,2024-08-15,10,200
2,102,1002,2024-09-20,5,100
3,101,1001,2024-07-10,15,300


Displaying inventory_df


ProductID,WarehouseID,StockLevel,ReorderPoint
101,1001,50,20
102,1002,30,25
103,1001,10,15


In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import col, max, current_date, datediff, date_format

# Remove duplicates from sales_df and inventory_df
sales_df = sales_df.dropDuplicates(["ProductID", "WarehouseID", "SaleDate"])  
inventory_df = inventory_df.dropDuplicates(["ProductID", "WarehouseID"])

# Step 1: Filter Inventory for StockLevel above ReorderPoint
inventory_filtered = inventory_df.filter(col('StockLevel') > col('ReorderPoint'))

# Step 2: Calculate DaysSinceLastSale for each product in the Sales dataset
window_spec = Window.partitionBy("ProductID", "WarehouseID").orderBy(col("SaleDate").desc())
sales_with_last_date = sales_df.withColumn("last_sale_date", max(col("SaleDate")).over(window_spec))
sales_with_last_date = sales_with_last_date.withColumn("DaysSinceLastSale", datediff(current_date(), col("last_sale_date")))

# Filter products with no sales in the last 30 days
sales_filtered = sales_with_last_date.filter(col("DaysSinceLastSale") > 30)

# Step 4: Join the filtered inventory data with the sales data to find underperforming products
underperforming_products = sales_filtered.join(inventory_filtered, ["ProductID", "WarehouseID"], "inner")

# Step 5: Select the necessary columns for the final output and remove duplicates
final_output = underperforming_products.select("ProductID", "WarehouseID", "StockLevel", "DaysSinceLastSale").distinct()

print("printing final_output")
final_output.show()

printing final_output
+---------+-----------+----------+-----------------+
|ProductID|WarehouseID|StockLevel|DaysSinceLastSale|
+---------+-----------+----------+-----------------+
|      101|       1001|        50|               67|
|      102|       1002|        30|               31|
+---------+-----------+----------+-----------------+



In [0]:
%sql
-- SQL Query to find underperforming products
WITH InventoryFiltered AS (
    -- Step 1: Filter Inventory for StockLevel above ReorderPoint
    SELECT ProductID, WarehouseID, StockLevel, ReorderPoint
    FROM Inventory
    WHERE StockLevel > ReorderPoint
),

SalesWithLastDate AS (
    -- Step 2: Calculate DaysSinceLastSale for each product in the Sales dataset
    SELECT 
        s.ProductID,
        s.WarehouseID,
        MAX(s.SaleDate) AS last_sale_date
    FROM Sales s
    GROUP BY s.ProductID, s.WarehouseID
),

SalesFiltered AS (
    -- Step 3: Filter products with no sales in the last 30 days
    SELECT 
        swld.ProductID,
        swld.WarehouseID,
        swld.last_sale_date,
        DATEDIFF(CURRENT_DATE, swld.last_sale_date) AS DaysSinceLastSale
    FROM SalesWithLastDate swld
    WHERE DATEDIFF(CURRENT_DATE, swld.last_sale_date) > 30
)

-- Step 4: Join the filtered inventory data with the sales data to find underperforming products
SELECT 
    sf.ProductID,
    sf.WarehouseID,
    inf.StockLevel,
    sf.DaysSinceLastSale
FROM SalesFiltered sf
JOIN InventoryFiltered inf
ON sf.ProductID = inf.ProductID AND sf.WarehouseID = inf.WarehouseID;

ProductID,WarehouseID,StockLevel,DaysSinceLastSale
101,1001,50,67
102,1002,30,31


## Question 29

below queries :
Write a SQL query to retrieve the total transaction amounts in USD for each company code for the fiscal year 2023. Convert all transaction amounts to USD using the appropriate exchange rates stored in another table ExchangeRates, which has the columns Currency, ExchangeRate, and
EffectiveDate.

TransactionAmount in USD= round(sum(TransactionAmount/RE), 2) group by CompanyCode, where FiscalYear = '2023' from ExchangeRates table 


In [0]:

from pyspark.sql.functions import col

# Sample data for transactions_df
transactions_data = [
    ('US01', 2023, '100001', 500, 'USD', '2023-01-15'),
    ('DE02', 2023, '200002', 450, 'EUR', '2023-03-21'),
    ('IN03', 2023, '300003', 35000, 'INR', '2023-07-09'),
    ('US01', 2022, '100004', 1200, 'USD', '2022-05-14'),
    ('DE02', 2023, '200005', 700, 'EUR', '2023-09-11')
]

# Create the transactions_df DataFrame
transactions_df = spark.createDataFrame(transactions_data, 
    ['CompanyCode', 'FiscalYear', 'AccountNumber', 'TransactionAmount', 'Currency', 'DocumentDate'])

# Sample data for exchange_rates_df
exchange_rates_data = [
    ('EUR', 1.10, '2023-01-01'),
    ('INR', 0.012, '2023-01-01'),
    ('USD', 1.00, '2023-01-01')
]

# Create the exchange_rates_df DataFrame
exchange_rates_df = spark.createDataFrame(exchange_rates_data, 
    ['Currency', 'ExchangeRate', 'EffectiveDate'])

# Show sample tables
print("Sample Transactions DataFrame:")
transactions_df.show()

print("Sample Exchange Rates DataFrame:")
exchange_rates_df.show()

Sample Transactions DataFrame:
+-----------+----------+-------------+-----------------+--------+------------+
|CompanyCode|FiscalYear|AccountNumber|TransactionAmount|Currency|DocumentDate|
+-----------+----------+-------------+-----------------+--------+------------+
|       US01|      2023|       100001|              500|     USD|  2023-01-15|
|       DE02|      2023|       200002|              450|     EUR|  2023-03-21|
|       IN03|      2023|       300003|            35000|     INR|  2023-07-09|
|       US01|      2022|       100004|             1200|     USD|  2022-05-14|
|       DE02|      2023|       200005|              700|     EUR|  2023-09-11|
+-----------+----------+-------------+-----------------+--------+------------+

Sample Exchange Rates DataFrame:
+--------+------------+-------------+
|Currency|ExchangeRate|EffectiveDate|
+--------+------------+-------------+
|     EUR|         1.1|   2023-01-01|
|     INR|       0.012|   2023-01-01|
|     USD|         1.0|   2023-01-

In [0]:

# Define the window specification to get the latest exchange rate per currency
window_spec = Window.partitionBy("Currency").orderBy(F.desc("EffectiveDate"))

# Step 1: Add a row number to select the most recent exchange rate for each currency
latest_exchange_rates_df = exchange_rates_df.withColumn(
    "rn", F.row_number().over(window_spec)
).filter(F.col("rn") == 1)

# Step 2: Join the transactions_df with the filtered exchange rates based on currency
joined_df = transactions_df.alias("a").join(
    latest_exchange_rates_df.alias("b"),
    (F.col("a.Currency") == F.col("b.Currency")) &
    (F.col("b.EffectiveDate") <= F.col("a.DocumentDate")),  # Filter based on DocumentDate
    "left"
)

# Step 3: Calculate total transaction amounts in USD, rounding to 2 decimal places
result_df = joined_df.filter(F.col("a.FiscalYear") == 2023) \
    .groupBy("a.CompanyCode") \
    .agg(
        F.round(F.sum(F.col("a.TransactionAmount") / F.col("b.ExchangeRate")), 2).alias("total_transaction_amount_usd")
    )

# Step 4: Show the result
print("printing output result")
result_df.show()


transactions_df = transactions_df.withColumn("DocumentDate", F.to_date("DocumentDate"))
exchange_rates_df = exchange_rates_df.withColumn("EffectiveDate", F.to_date("EffectiveDate"))
transactions_df.createOrReplaceTempView("SAP_Finance_system")
exchange_rates_df.createOrReplaceTempView("ExchangeRates")


printing output result
+-----------+----------------------------+
|CompanyCode|total_transaction_amount_usd|
+-----------+----------------------------+
|       US01|                       500.0|
|       DE02|                     1045.45|
|       IN03|                  2916666.67|
+-----------+----------------------------+



In [0]:
%sql
WITH LatestRates AS (
    SELECT Currency, ExchangeRate, EffectiveDate,
           ROW_NUMBER() OVER (PARTITION BY Currency ORDER BY EffectiveDate DESC) AS rn
    FROM ExchangeRates
)
SELECT a.CompanyCode,
       ROUND(SUM(a.TransactionAmount / b.ExchangeRate), 2) AS total_transaction_amount_usd
FROM SAP_Finance_system a
LEFT JOIN LatestRates b
ON a.Currency = b.Currency
AND b.EffectiveDate <= a.DocumentDate
AND b.rn = 1
WHERE a.FiscalYear = 2023
GROUP BY a.CompanyCode;

CompanyCode,total_transaction_amount_usd
US01,500.0
DE02,1045.45
IN03,2916666.67


## QUESTION 30 

Write a pyspark and sql query to identify customers who had any deliveries delayed by more than 3 days. For these
customers, calculate the percentage of their total orders that were delayed. The result should
include the customer ID, the number of delayed orders, total orders, and the delay percentage.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, FloatType


# Define schema for SalesOrders table
sales_orders_schema = StructType([
    StructField("sales_order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("total_amount", FloatType(), True)
])

# Create SalesOrders DataFrame
sales_orders_data = [
    (1001, 201, "2023-05-01", "Completed", 1500.0),
    (1002, 202, "2023-05-02", "Completed", 2000.0),
    (1003, 203, "2023-05-03", "Completed", 3000.0),
    (1004, 204, "2023-05-05", "Completed", 2500.0),
    (1005, 205, "2023-05-07", "Cancelled", 0.0)
]

sales_orders_df = spark.createDataFrame(data=sales_orders_data, schema=sales_orders_schema)

# Define schema for Deliveries table
deliveries_schema = StructType([
    StructField("delivery_id", IntegerType(), True),
    StructField("sales_order_id", IntegerType(), True),
    StructField("delivery_date", StringType(), True),
    StructField("delivery_status", StringType(), True),
    StructField("delivery_time", FloatType(), True)
])

# Create Deliveries DataFrame
deliveries_data = [
    (5001, 1001, "2023-05-03", "Delivered", 2.0),
    (5002, 1002, "2023-05-06", "Delivered", 4.0),
    (5003, 1003, "2023-05-05", "Delivered", 2.0),
    (5004, 1004, "2023-05-09", "Delivered", 4.0),
    (5005, 1001, "2023-05-04", "Returned", None)
]

deliveries_df = spark.createDataFrame(data=deliveries_data, schema=deliveries_schema)

# Display the DataFrames
sales_orders_df.show()
sales_orders_df.createOrReplaceTempView('SalesOrders')
deliveries_df.show()
deliveries_df.createOrReplaceTempView('Deliveries')

+--------------+-----------+----------+------------+------------+
|sales_order_id|customer_id|order_date|order_status|total_amount|
+--------------+-----------+----------+------------+------------+
|          1001|        201|2023-05-01|   Completed|      1500.0|
|          1002|        202|2023-05-02|   Completed|      2000.0|
|          1003|        203|2023-05-03|   Completed|      3000.0|
|          1004|        204|2023-05-05|   Completed|      2500.0|
|          1005|        205|2023-05-07|   Cancelled|         0.0|
+--------------+-----------+----------+------------+------------+

+-----------+--------------+-------------+---------------+-------------+
|delivery_id|sales_order_id|delivery_date|delivery_status|delivery_time|
+-----------+--------------+-------------+---------------+-------------+
|       5001|          1001|   2023-05-03|      Delivered|          2.0|
|       5002|          1002|   2023-05-06|      Delivered|          4.0|
|       5003|          1003|   2023-05-0

In [0]:
from pyspark.sql import functions as F

# Step 1: Identify delayed deliveries (more than 3 days)
delayed_deliveries_df = (
    sales_orders_df.alias("so")
    .join(deliveries_df.alias("d"), F.col("so.sales_order_id") == F.col("d.sales_order_id"))
    .filter((F.col("d.delivery_time") > 3) & (F.col("so.order_status") == "Completed"))
    .groupBy("so.customer_id")
    .agg(F.countDistinct("so.sales_order_id").alias("delayed_orders"))
)

# Step 2: Count total completed orders per customer
total_orders_df = (
    sales_orders_df
    .filter(F.col("order_status") == "Completed")
    .groupBy("customer_id")
    .agg(F.count("sales_order_id").alias("total_orders"))
)

# Step 3: Calculate the delay percentage
result_df = (
    total_orders_df.alias("t")
    .join(delayed_deliveries_df.alias("d"), F.col("t.customer_id") == F.col("d.customer_id"), "left")
    .select(
        F.col("t.customer_id"),
        F.coalesce(F.col("d.delayed_orders"), F.lit(0)).alias("delayed_orders"),
        F.col("t.total_orders"),
        F.round(F.coalesce((F.col("d.delayed_orders") * 100.0) / F.col("t.total_orders"), F.lit(0)), 2).alias("delay_percentage")
    )
    .filter(F.col("t.total_orders") > 0)
)

# Display the final result
result_df.show()

+-----------+--------------+------------+----------------+
|customer_id|delayed_orders|total_orders|delay_percentage|
+-----------+--------------+------------+----------------+
|        201|             0|           1|             0.0|
|        202|             1|           1|           100.0|
|        203|             0|           1|             0.0|
|        204|             1|           1|           100.0|
+-----------+--------------+------------+----------------+



In [0]:
%sql
WITH DelayedDeliveries AS (
    -- Identify delayed deliveries (more than 3 days)
    SELECT
        so.customer_id,
        COUNT(DISTINCT so.sales_order_id) AS delayed_orders
    FROM SalesOrders AS so
    JOIN Deliveries AS d
        ON so.sales_order_id = d.sales_order_id
    WHERE d.delivery_time > 3
      AND so.order_status = 'Completed'
    GROUP BY so.customer_id
),
TotalOrders AS (
    -- Count total completed orders per customer
    SELECT
        customer_id,
        COUNT(sales_order_id) AS total_orders
    FROM SalesOrders
    WHERE order_status = 'Completed'
    GROUP BY customer_id
)
-- Final query: calculate the delay percentage
SELECT
    t.customer_id,
    COALESCE(d.delayed_orders, 0) AS delayed_orders,
    t.total_orders,
    ROUND(COALESCE((d.delayed_orders * 100.0) / t.total_orders, 0), 2) AS delay_percentage
FROM TotalOrders AS t
LEFT JOIN DelayedDeliveries AS d
    ON t.customer_id = d.customer_id
WHERE t.total_orders > 0;


customer_id,delayed_orders,total_orders,delay_percentage
201,0,1,0.0
202,1,1,100.0
203,0,1,0.0
204,1,1,100.0
