In [None]:
#Coforge DE Interview skills(Spark and SQL) 11 jan 24

#1. What is spark architecture?
"""Apache Spark Architecture:
1. Driver: The driver is the main control process that creates the SparkContext, connects to the cluster, and coordinates the execution of tasks. It translates the user program into tasks and schedules them to run on executors.
2. Executors: Executors are worker nodes that run individual tasks in a distributed manner. Each executor has its own JVM and runs tasks assigned by the driver. Executors also provide in-memory storage for RDDs that are cached by user programs through SparkContext.
3. Cluster Manager: The cluster manager is responsible for managing the resources across the cluster. Spark supports various cluster managers like Standalone, Apache Mesos, kubernates and Hadoop YARN. The cluster manager allocates resources to the driver and executors.
4. RDD (Resilient Distributed Dataset): RDD is the fundamental data structure of Spark. It is an immutable distributed collection of objects that can be processed in parallel. RDDs support two types of operations: transformations (e.g., map, filter) and actions (e.g., count, collect).
5. DAG (Directed Acyclic Graph): Spark creates a logical execution plan in the form of a DAG. The DAG represents the sequence of transformations applied to the data. The DAG scheduler divides the operators into stages of tasks and submits them to the task scheduler.
6. Task Scheduler: The task scheduler is responsible for scheduling tasks on the executors. It takes the stages from the DAG scheduler and assigns tasks to executors based on data locality and resource availability.
In summary, Spark architecture consists of a driver, executors, cluster manager, RDDs, DAG, and task scheduler, working together to process large-scale data in a distributed and fault-tolerant manner.
"""

#How does Spark distribute tasks for processing a 1 GB file, and how are tasks and executors allocated during execution?
'''Spark divides the 1 GB file into partitions (default is 128 MB per partition for HDFS), creating about 8 partitions for a 1 GB file. Each partition becomes a task.

Executors: Each executor processes multiple tasks based on available cores.
Tasks per Executor: If an executor has 4 cores, it can run 4 tasks in parallel.'''

# What is spark-submit explain in details?
'''
spark-submit is a command-line tool used to launch Spark applications on a cluster or locally. It handles resource allocation, configuration, and submission of jobs to Spark clusters like YARN, Kubernetes, Mesos, or standalone mode.

Key Responsibilities:
Distribute Application Code: Sends your Spark application JAR or Python script to the cluster.
Resource Management: Allocates memory and CPU resources for the driver and executors.
Configuration Management: Sets properties like application name, Spark master, deploy mode, and more.
Job Execution: Manages the execution of tasks and monitors job progress.
Command Syntax:

spark-submit \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --class <main-class> \
  --executor-memory <memory> \
  --executor-cores <cores> \
  application.jar <application-arguments>
Common Parameters:
Parameter	Description
--master	Specifies the cluster manager (local, yarn, etc.)
--deploy-mode	Deployment mode (client or cluster)
--class	Main class of the application (for Java/Scala)
--executor-memory	Memory per executor (e.g., 2G)
--executor-cores	Number of cores per executor
--num-executors	Number of executors to launch
--conf	Set configuration properties (e.g., spark.executor.memory)
Example Usage:
Running a Spark Application Locally:

spark-submit
  --master local[2]
  my_spark_app.py

Running on a YARN Cluster:

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --executor-memory 4G \
  --executor-cores 4 \
  my_spark_app.py
Execution Modes:
Client Mode: Driver runs on the machine where spark-submit is executed. Suitable for local testing.
Cluster Mode: Driver runs on a worker node within the cluster. Ideal for large-scale production jobs.

Important Points:
Driver Program: Manages job execution and task scheduling.
Executors: Run the actual tasks on worker nodes and store intermediate data.
Partitions: Spark splits input data into smaller partitions for distributed processing.
By leveraging spark-submit, Spark achieves scalability and parallel processing across large datasets efficiently.
'''

In [None]:
# Synechron Interview Questions on Database for Data Engineer

# Make Oracle DB connection in Python and write a query?

#%pip install cx_Oracle
import cx_Oracle

dsn_tns=cx_Oracle.makedsn('localhost','1521',service_name='orcl')
conn=cx_Oracle.connect(user='SivaAcademy',password='pass',dsn=dsn_tns) #while connecting only give req pass
c=conn.cursor()
c.execute('select * from emp')
print(c.fetchone()) # we can use fetchall() for all or  for row in c:
    #print(row)



(7839, 'KING', 'PRESIDENT', None, datetime.datetime(1981, 11, 17, 0, 0), 5100.0, None, 20)


In [None]:
# 1. Find the sales amount each year, product-wise  [Sales_data table]
# Solution 1
query='''WITH cte AS
          (
    SELECT product_id,
           EXTRACT(YEAR FROM sales_date) AS sale_year,
           SUM(amount) OVER (PARTITION BY product_id, EXTRACT(YEAR FROM sales_date)) AS cumulative_sum,
           DENSE_RANK() OVER (PARTITION BY product_id ORDER BY EXTRACT(YEAR FROM sales_date)) AS rn
    FROM Sales_data
)
SELECT product_id, sale_year, cumulative_sum
FROM cte
GROUP BY product_id, sale_year, cumulative_sum
ORDER BY product_id, sale_year'''
c.execute(query)
for i in c:
    print(i)


In [None]:
# solution 2
query='''SELECT product_id AS prod_id,
    EXTRACT(YEAR FROM sales_date) AS sale_year,
    SUM(amount) AS total_sales
FROM Sales_data
GROUP BY product_id, EXTRACT(YEAR FROM sales_date)
ORDER BY product_id, sale_year'''
c.execute(query)
for i in c:
    print(i)
'''
EXTRACT(YEAR FROM sale_dt): 2025
TO_CHAR(sale_dt, 'YYYY'): '2025' (as a string)
TRUNC(sale_dt, 'YYYY'): 01-JAN-2025 (the first day of the year)
'''

('P1', 2023, 1301.15)
('P10', 2023, 1085.16)
('P2', 2023, 1310.25)
('P3', 2023, 1235.66)
('P4', 2023, 1064.48)
('P5', 2023, 1259.51)
('P6', 2023, 1310.55)
('P7', 2023, 1154.54)
('P8', 2023, 1220.01)
('P9', 2023, 1383.75)


In [None]:
#2. Find products that were not sold in the last 7 months.
query='''SELECT product_id
FROM Sales_data
WHERE product_id NOT IN (
    SELECT DISTINCT product_id
    FROM Sales_data
    WHERE sales_date >= ADD_MONTHS(SYSDATE, -7)) group by product_id'''
c.execute(query)
for i in c:
    print(i)

'''
Analysis:
Suppose I have written this query
SELECT DISTINCT product_id
FROM Sales_data
WHERE sales_date <= ADD_MONTHS(SYSDATE, -7)

this will show sales before 7 months ago but it can't exclude the products that were not at all sold in the last 7 months. It may includes 1 sale in last 7 months.
'''

('P6',)
('P8',)
('P5',)
('P1',)
('P2',)
('P3',)
('P9',)
('P10',)
('P4',)
('P7',)


"\nAnalysis:\nSuppose I have written this query\nSELECT DISTINCT product_id \nFROM Sales_data \nWHERE sales_date <= ADD_MONTHS(SYSDATE, -7)\n\nthis will show sales before 7 months ago but it can't exclude the products that were not at all sold in the last 7 months. It may includes 1 sale in last 7 months.\n"

In [None]:
#Berribot Interview Questions on ETL and pyspark for Wipro
# Schedule AWS databrics spark job daily in midnight script
'''
To schedule an AWS Databricks Spark job to run daily at midnight, you can use Databricks Workflows (preferred) or an external scheduling tool like AWS Lambda or CloudWatch. Below is an example script for each approach:

1. Using Databricks Workflows (Recommended)
Create a Databricks Job:

Go to Databricks Workspace.
Navigate to Workflows > Jobs.
Click Create Job.
Add your notebook or Spark job details.
Configure job clusters and libraries if required.
Schedule the Job:

In the Schedule tab, set the schedule to:
Daily
Start time: 00:00
Save the workflow.
2. Using AWS CloudWatch with Databricks REST API
Prerequisites:
A Databricks personal access token.
Databricks job ID or notebook path.
Python Script:

import requests

# Databricks Workspace URL
DATABRICKS_HOST = "https://<databricks-instance>.cloud.databricks.com"

# Databricks Personal Access Token
DATABRICKS_TOKEN = "<your-databricks-token>"

# Job ID
JOB_ID = "<your-job-id>"

# Trigger Job Function
def trigger_job():
    url = f"{DATABRICKS_HOST}/api/2.1/jobs/run-now"
    headers = {
        "Authorization": f"Bearer {DATABRICKS_TOKEN}",
        "Content-Type": "application/json"
    }
    payload = {"job_id": JOB_ID}

    response = requests.post(url, json=payload, headers=headers)
    if response.status_code == 200:
        print("Job triggered successfully.")
    else:
        print(f"Failed to trigger job: {response.content}")

# Run the function
trigger_job()
Schedule via AWS CloudWatch:
Open AWS Management Console.
Go to CloudWatch > Rules.
Create a new rule:
Event Source: Schedule.
Schedule Expression: cron(0 0 * * ? *) (midnight daily).
Target:
Choose AWS Lambda.
Add the Python script to your Lambda function.
3. Using Crontab on EC2
If you prefer running the script from an EC2 instance:

SSH into the EC2 instance.
Edit the crontab:

crontab -e
Add the following entry to trigger the script at midnight:

0 0 * * * python3 /path/to/trigger_databricks_job.py
Which approach do you prefer? I can guide you further.'''


#Write AWS databrics notebook to calculate rolling average over time periods for q time series data set.

#Input : time period '7 days' dataset: 'stock_prices''

#Output: dataframe with 7-day rolling averages

'''Here is a Databricks notebook code that calculates the 7-day rolling average for a time series dataset (stock_prices) using PySpark.

Code for the Databricks Notebook

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, window

# Initialize SparkSession (this is usually already set up in Databricks)
spark = SparkSession.builder.appName("RollingAverage").getOrCreate()

# Sample dataset: Replace this with your actual dataset
data = [
    ("2025-01-01", 100),
    ("2025-01-02", 110),
    ("2025-01-03", 105),
    ("2025-01-04", 120),
    ("2025-01-05", 130),
    ("2025-01-06", 125),
    ("2025-01-07", 140),
    ("2025-01-08", 145),
    ("2025-01-09", 150),
    ("2025-01-10", 155)
]

# Define schema and create a DataFrame
columns = ["date", "price"]
stock_prices_df = spark.createDataFrame(data, schema=columns)

# Convert 'date' column to TimestampType
from pyspark.sql.functions import to_date
stock_prices_df = stock_prices_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# Calculate 7-day rolling average
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Define window specification
rolling_window = Window.orderBy("date").rowsBetween(-6, 0)

# Add rolling average column
result_df = stock_prices_df.withColumn("7_day_avg", avg("price").over(rolling_window))

# Show the results
result_df.show()
Explanation of the Code
Dataset Initialization:

Replace the data variable with your actual dataset from stock_prices.
Date Conversion:

Convert the date column to DateType for proper time series handling.
Define Window Specification:

Use Window.orderBy("date").rowsBetween(-6, 0) to create a rolling window of 7 days (current row and 6 previous rows).
Calculate Rolling Average:

Use the avg function to compute the 7-day rolling average.
Result:

A new column 7_day_avg is added to the DataFrame showing the rolling average.
Sample Output
date	price	7_day_avg
2025-01-01	100	100.0
2025-01-02	110	105.0
2025-01-03	105	105.0
2025-01-04	120	108.75
2025-01-05	130	113.0
2025-01-06	125	115.0
2025-01-07	140	118.57
2025-01-08	145	125.0
2025-01-09	150	130.0
2025-01-10	155	135.71
Let me know if you'd like help loading the actual dataset from S3 or a database!'''


#Partition a large dataframe in databrics by a given column and write to different parquet files.

#Input: column: 'region'
#Output: Partitioned parquet files by 'region'

#Eg 2 Input: column: 'category'
#Output: Partioned Parquet files by 'category'

'''
Here’s how to partition a large DataFrame in Databricks by a given column (e.g., region or category) and write it to different Parquet files:

Code for Partitioning and Writing to Parquet

from pyspark.sql import SparkSession

# Initialize SparkSession (this is usually pre-configured in Databricks)
spark = SparkSession.builder.appName("PartitionedParquet").getOrCreate()

# Sample dataset: Replace this with your actual dataset
data = [
    ("East", "Electronics", 100),
    ("West", "Furniture", 200),
    ("East", "Clothing", 150),
    ("North", "Electronics", 300),
    ("South", "Clothing", 250),
    ("West", "Electronics", 400),
    ("East", "Furniture", 350)
]

# Define schema and create a DataFrame
columns = ["region", "category", "sales"]
df = spark.createDataFrame(data, schema=columns)

(((((((
We can create separate explicit schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema explicitly
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

data = [("Alice", 25), ("Bob", 30)]

# Apply the schema
df = spark.createDataFrame(data, schema)
df.printSchema()
)))))))

# Partition column: Replace 'region' with 'category' as needed
partition_column = "region"  # Change to "category" for the second example

# Output path for Parquet files (e.g., S3, DBFS)
output_path = "dbfs:/mnt/output/partitioned_data"  # Change as required

# Write DataFrame to partitioned Parquet files
df.write.partitionBy(partition_column).parquet(output_path)

print(f"Data written to Parquet files partitioned by {partition_column}")
Explanation
Input Dataset:

Replace the sample data with your actual DataFrame from a source like S3, Delta Lake, or a SQL database.
Partitioning Column:

Set the partition_column to the column by which you want to partition the Parquet files (e.g., region or category).
Output Path:

Specify the output_path where the partitioned Parquet files should be written. This could be:
dbfs:/mnt/output/... for Databricks File System.
s3://bucket-name/... for AWS S3.
Partition and Write:

Use the partitionBy() method to group data by the specified column, creating a folder for each unique value in the column.
Parquet files for each partition are written inside the corresponding folder.
Output Directory Structure
If partition_column = 'region', the Parquet files are saved as:


/mnt/output/partitioned_data/
├── region=East/
│   ├── part-00000.parquet
│   └── part-00001.parquet
├── region=West/
│   ├── part-00000.parquet
│   └── part-00001.parquet
├── region=North/
│   ├── part-00000.parquet
├── region=South/
│   ├── part-00000.parquet
For partition_column = 'category', the folder names will follow category=Electronics, category=Furniture, etc.

Verify Written Data
To verify the Parquet files:


# Load the partitioned Parquet files
partitioned_df = spark.read.parquet(output_path)

# Show data to verify
partitioned_df.show()
'''


In [None]:
#You are a data engineer explain this project in detail use Pyspark and python pandas and no SQL whereever required. And also focused on horizon insight database oracle
'''
I'll help explain this Horizon Insights data engineering project and provide implementation suggestions using PySpark and Pandas. Let me break down the architecture and provide some code examples.

The project appears to be a comprehensive data platform serving since 2017 with impressive metrics: 800+ regular users, 5 queries/second average throughput, and 18TB+ of accumulated data. Let me break this down into key components:

1. Data Sources & Ingestion:
Let's handle the various data sources using PySpark:

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("HorizonInsights") \
    .config("spark.jars", "oracle-jdbc.jar") \
    .getOrCreate()

# Function to read from various sources
def ingest_service_data(source_type, path):
    if source_type == "jenkins":
        # Read Jenkins JSON data
        df = spark.read.json(path)
    elif source_type == "jira":
        # Read Jira REST API data
        df = spark.read.json(path)
    elif source_type == "qtest":
        # Read qTest data
        df = spark.read.format("csv").option("header", "true").load(path)
    return df
```

2. Data Transformation & Loading:
Given the metrics (140K+ stories/month, 18K+ bugs/month), we need robust transformation:

```python
import pandas as pd

def transform_metrics_data(df, metric_type):
    if metric_type == "stories":
        # Transform stories data
        df = df.withColumn("month", month(col("created_date"))) \
               .withColumn("year", year(col("created_date"))) \
               .groupBy("month", "year") \
               .agg(count("story_id").alias("story_count"))

    elif metric_type == "jenkins_jobs":
        # Transform Jenkins jobs data
        df = df.withColumn("status", when(col("result") == "SUCCESS", 1).otherwise(0)) \
               .groupBy("job_name") \
               .agg(
                   sum("status").alias("successful_builds"),
                   count("*").alias("total_builds")
               )
    return df
```

3. Oracle Database Integration:
For the Horizon Insights DB, which appears to be Oracle-based:

```python
def write_to_oracle(df, table_name):
    # Oracle connection properties
    oracle_properties = {
        "driver": "oracle.jdbc.driver.OracleDriver",
        "url": "jdbc:oracle:thin:@//hostname:port/service",
        "user": "username",
        "password": "password"
    }

    # Write to Oracle
    df.write \
        .format("jdbc") \
        .option("url", oracle_properties["url"]) \
        .option("dbtable", table_name) \
        .option("user", oracle_properties["user"]) \
        .option("password", oracle_properties["password"]) \
        .mode("append") \
        .save()
```

4. Reporting Pipeline:
For the reporting requirements (MetricsHQ, SDLC, Compliance):

```python
def generate_metrics_report():
    # Create a reporting dataframe
    metrics_df = pd.DataFrame()

    # Get Jenkins metrics
    jenkins_metrics = spark.sql("""
        SELECT
            DATE_TRUNC('month', timestamp) as month,
            COUNT(*) as total_jobs,
            SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) as successful_jobs
        FROM jenkins_jobs
        GROUP BY DATE_TRUNC('month', timestamp)
    """)

    # Convert to pandas for reporting
    metrics_df['jenkins_success_rate'] = jenkins_metrics.toPandas()['successful_jobs'] / \
                                       jenkins_metrics.toPandas()['total_jobs']

    return metrics_df
```

Key Features to Note:
1. Volume Handling:
- Processing 470K+ Jenkins jobs/month
- Managing 350K+ Jobs/Month from Tower
- Handling 300K+ qTest runs/month

2. Performance Optimization:
```python
# Configure Spark for optimal performance
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.driver.memory", "4g")
```

3. Data Quality Checks:
```python
def data_quality_check(df, table_name):
    # Check for nulls
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

    # Check for duplicates
    duplicate_count = df.count() - df.dropDuplicates().count()

    # Log quality metrics
    print(f"Quality checks for {table_name}")
    null_counts.show()
    print(f"Duplicate records: {duplicate_count}")
```

The system seems to be processing a significant amount of DevOps and testing data, with various downstream consumers like Jira, Release Manager, and TechABS. The architecture uses SSIS for data ingestion, suggesting this is likely an enterprise-level solution.

'''

In [None]:
# Thoughtfocus Interview Questions on Spark  for Data Engineer

#1. How to handle skewed data in spark job to prevent performance issues?
'''
To handle skewed data in Spark jobs and prevent performance issues, consider these strategies:

Data Repartitioning

Use repartition() or coalesce() to redistribute data more evenly
Specify a custom partitioning strategy based on key distribution
Salting Technique
Add a random salt to skewed keys before aggregation
Helps break up data hotspots and distribute processing load

Broadcast Join for Small Datasets
Use broadcast() for smaller datasets to reduce shuffle overhead
Prevents skew in join operations with significantly different-sized tables
Adaptive Query Execution
Enable Spark's adaptive query execution (AQE)
Dynamically coalesces and optimizes shuffle partitions at runtime

Skew Hint in Spark SQL
Use SKEW HINT to provide optimizer hints about data distribution
Helps Spark manage skewed data more intelligently during joins
Example of salting technique:

def salt_key(key):
    return f"{key}_{random.randint(0, num_salt_buckets)}"

df = df.withColumn("salted_key", salt_key(df.original_key))
These approaches can significantly improve Spark job performance when dealing with unevenly distributed data.
'''

#1. What is diff bw spark session and spark context?
'''
Key differences between SparkSession and SparkContext:
SparkSession:

High-level entry point for Spark functionality
Introduced in Spark 2.0
Combines SparkContext, SQLContext, and HiveContext
Single point of entry for DataFrame and SQL operations
Easier to use, more comprehensive API

SparkContext:

Low-level API for creating RDDs
Core entry point for Spark core functionality
Manages cluster resources and coordination
Existed before SparkSession
Still used internally by SparkSession

SparkSession (preferred in modern Spark)
spark = SparkSession.builder.appName("MyApp").getOrCreate()

# SparkContext (can be accessed from SparkSession)
sc = spark.sparkContext
'''

#3. How do you handle backpressure in Spark Streaming applications to manage data load effectively?
'''
Handling backpressure in Spark Streaming involves several key strategies:

Trigger-based Backpressure


Configure maxOffsetsPerTrigger to limit data processed per batch
Prevents overwhelming processing resources


Rate Limiting


Use spark.streaming.receiver.maxRate to control input rate
Dynamically adjust receiver rate based on processing capabilities


Adaptive Batch Processing


Enable adaptive query execution (AQE)
Automatically adjust partition sizes and execution plans


Windowing and Micro-batching


Use sliding windows to control data ingestion
Break large data streams into manageable micro-batches

Example configuration:
pythonCopyspark_conf = SparkConf().set("spark.streaming.backpressure.enabled", "true")
                        .set("spark.streaming.backpressure.initialRate", 100)
                        .set("spark.streaming.receiver.maxRate", 200)

ssc = StreamingContext(sc, batchDuration=1, conf=spark_conf)

Custom Backpressure Handling

Implement custom rate controllers
Monitor processing time and dynamically adjust input rates

Key goal: Balance data ingestion with processing capacity to prevent system overload.
'''

#4. What is delta lake and how does it improve data reliability and performance in Spark applications?
'''
Delta Lake is an open-source storage layer that enhances Spark data lakes with key improvements:

ACID Transactions

Ensures data integrity
Supports concurrent writes and reads
Atomic and consistent operations

Time Travel & Versioning

Allows querying historical data versions
Enables easy rollbacks and data recovery
Tracks complete change history

Performance Optimizations

Supports data compaction
Provides efficient metadata management
Enables file-level optimization

Schema Evolution

Allows dynamic schema updates
Supports adding/removing columns seamlessly
Maintains backward compatibility

Writing Delta Table
df.write.format("delta").save("/path/to/delta_table")

# Reading with time travel
spark.read.format("delta") \
     .option("versionAsOf", 1) \
     .load("/path/to/delta_table")
'''

#5. What is difference between cache and persist in Spark?
'''
Key differences between cache() and persist() in Spark:
cache():

Default storage level: MEMORY_ONLY
Stores RDD/DataFrame in memory
Least configurable persistence option
Equivalent to persist(StorageLevel.MEMORY_ONLY)

persist():

More flexible storage level configuration
Options include:

MEMORY_ONLY
MEMORY_AND_DISK
DISK_ONLY
Off-heap storage

Allows explicit memory/disk trade-offs

Cache (memory only)
df.cache()

# Persist with custom storage level
df.persist(StorageLevel.MEMORY_AND_DISK)
Both methods help optimize performance by avoiding redundant computations, but persist() offers more granular control.
'''

#6. What is difference between repartition and coalesce in Spark?
'''
Key differences between repartition() and coalesce() in Spark:
repartition():

Full shuffle of entire dataset
Can increase or decrease partition count
More expensive operation
Provides uniform data distribution

coalesce():

Minimizes data movement
Reduces partition count efficiently
Recommended for decreasing partitions
No full data shuffle
Preserves existing partitioning

Repartition (full shuffle)
df.repartition(10)

# Coalesce (minimal data movement)
df.coalesce(5)
'''
#7. What is difference between partitioning and bucketing in Spark SQL?
'''
Partitioning: Divides data into directories based on column values
Bucketing: Distributes data into fixed number of buckets for optimization

Comprehensive Example:
Partitioning
df.write \
    .partitionBy("year", "month") \
    .format("parquet") \
    .save("/path/partitioned_data")

# Bucketing
df.write \
    .bucketBy(10, "user_id") \
    .sortBy("timestamp") \
    .format("parquet") \
    .save("/path/bucketed_data")

# Query Optimization
# Partitioning: Fast for filtering partitioned columns
# Bucketing: Efficient for joins and aggregations on high-cardinality columns
'''

#8. What is the difference between broadcast join and shuffle join in Spark?
'''
Broadcast Join vs Shuffle Join in Spark:
Broadcast Join:

Small table is sent to all executor nodes
No data shuffling required
Efficient for small dimension tables
Lower network overhead
Best when one table is significantly smaller

from pyspark.sql.functions import broadcast

# Broadcast join small dimension table
result = large_df.join(
    broadcast(small_df),
    "join_key"
)
Shuffle Join:

Data shuffled across all partitions
Both tables partitioned and redistributed
High network and shuffle overhead
Suitable for large tables
Default join strategy

Performance Comparison:

Broadcast Join: O(small_table_size)
Shuffle Join: O(large_table1_size * large_table2_size)

Key Recommendation: Use broadcast join for small tables to optimize performance.
'''

#9. What is the difference between broadcast variable and accumulator in Spark?
'''
Broadcast Variable vs Accumulator in Spark:
Broadcast Variable:

Read-only shared variable across cluster
Efficiently distribute large, immutable data
Cached on each executor
Used for lookup tables, reference data
Low communication overhead

Broadcast lookup dictionary
lookup_dict = sc.broadcast({
    'A': 'High',
    'B': 'Medium',
    'C': 'Low'
})

def transform_data(row):
    return lookup_dict.value.get(row.category, 'Unknown')

df.rdd.map(transform_data)
Accumulator:

Mutable shared variable for aggregation
Supports distributed counter/sum operations
Only driver can read final accumulated value
Useful for debugging, tracking metrics
Supports numeric and custom types

Count specific conditions
error_count = sc.accumulator(0)

def process_data(row):
    if row.has_error:
        error_count.add(1)
    return row

df.rdd.map(process_data).collect()
print(error_count.value)
Key Difference: Broadcast for read-only data distribution, Accumulators for aggregating metrics.
'''
#10. Difference between narrow and wide ?
'''
Narrow transformations involve operations that can be done within a single partition
(no shuffle).
Examples: map(), filter(), union(), sample()
Wide transformations require data to be shuffled between partitions, which can be more
resource-intensive.
Examples: groupBy(), reduceByKey(), join(), distinct()
'''
#11. What is Out of Memory?
'''
Out of memory (OOM) errors occur when Spark does not have enough memory to
process the data being handled. This can be due to:
• Too many partitions or large datasets in a single partition.
• Inefficient operations causing memory pressure.
• Insufficient heap size or executor memory allocation.
Solutions include optimizing partitioning, adjusting memory settings (e.g.,
spark.executor.memory), or using disk storage for spilling.
Driver Out of memory
Executor out of memory
'''

In [None]:
# Revision of Spark and SQL for Data Engineer Interview for Capgemini
#1. what this does this command do in pyspark
#sc.parallelize([1, 2, 3, 4, 5])
'''
sc.parallelize([1, 2, 3, 4, 5])
does the following:

Creates an RDD (Resilient Distributed Dataset): This converts the Python list [1, 2, 3, 4, 5] into a distributed dataset (RDD).
Distributes the data across Spark's cluster: The data is split into partitions and distributed across available worker nodes.

rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())  # Output: [1, 2, 3, 4, 5]
The collect() method retrieves the entire dataset from the distributed RDD back to the driver as a Python list.
'''

#2. What are transformations and actions in Spark?
'''

Transformations and Actions in PySpark
In PySpark, there are two types of operations on RDDs (Resilient Distributed Datasets):

Transformations: These are lazy operations that create a new RDD from an existing one. They do not execute immediately; they are only computed when an action is performed.

Examples: map(), filter(), flatMap()
Actions: These trigger the execution of transformations and return results to the driver program.

Examples: collect(), count(), take()
'''

#3. What is the difference between map() and flatMap() and filter() transformations in Spark?
'''
1. map()
The map() function applies a function to each element in the RDD and returns a new RDD with the transformed elements.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TransformationsActions").getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([1, 2, 3, 4, 5])
mapped_rdd = rdd.map(lambda x: x * 2)  # Multiply each element by 2
print(mapped_rdd.collect())  # Output: [2, 4, 6, 8, 10]

2. filter()
The filter() function returns a new RDD containing only the elements that satisfy a given condition.
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)  # Keep only even numbers
print(filtered_rdd.collect())  # Output: [2, 4]

3. flatMap()
The flatMap() function applies a function that returns an iterable and then flattens the result into a single RDD.
rdd2 = sc.parallelize(["hello world", "spark rdd"])
flat_mapped_rdd = rdd2.flatMap(lambda x: x.split(" "))  # Split each sentence into words
print(flat_mapped_rdd.collect())  # Output: ['hello', 'world', 'spark', 'rdd']
'''

#4. What is difference between actions like collect(), count() and take() in Spark?
'''
1. collect()
The collect() function retrieves all elements from the RDD and returns them as a list.
print(rdd.collect())  # Output: [1, 2, 3, 4, 5]
⚠ Warning: Avoid using collect() on very large RDDs, as it can cause memory issues.

2. count()
The count() function returns the number of elements in the RDD.
print(rdd.count())  # Output: 5

3. take(n)
The take(n) function retrieves the first n elements from the RDD.
print(rdd.take(3))  # Output: [1, 2, 3]

'''
#5. explain with example in detail Lazy evaluation and DAG
'''
Lazy Evaluation and DAG in Spark
Apache Spark follows a lazy evaluation model and processes computations using a Directed Acyclic Graph (DAG). Let’s break these concepts down in detail with an example.

1. Lazy Evaluation
Lazy evaluation means that Spark does not execute transformations immediately when they are called. Instead, it builds a logical execution plan and waits until an action (like .collect(), .show(), or .write()) is triggered. This approach optimizes query execution and reduces redundant computations.

How Lazy Evaluation Works
When you apply transformations (e.g., .filter(), .map(), .select()), Spark does not execute them immediately.
Instead, it records them in a DAG (Directed Acyclic Graph).
Once an action (e.g., .show(), .count(), .collect()) is triggered, Spark executes all transformations in an optimized way.

2. DAG (Directed Acyclic Graph)
A DAG is a logical representation of computations in Spark. It consists of:

Vertices (Nodes) → Represent transformations like .filter(), .map(), etc.
Edges (Lines between nodes) → Represent dependencies between transformations.
'''
#6. how to find the 2nd highest salary department-wise using both SQL (Oracle) and PySpark, using JOIN, CTE, and PARTITION BY.
'''
SQL (Oracle) Query:
WITH RankedSalaries AS (
    SELECT e.id, e.name, e.salary, e.dept_id, d.dept_name,
           ROW_NUMBER() OVER (PARTITION BY e.dept_id ORDER BY e.salary DESC) AS rnk
    FROM emp e
    JOIN dept d ON e.dept_id = d.dept_id
)
SELECT id, name, salary, dept_id, dept_name
FROM RankedSalaries
WHERE rnk = 2;  -- 2nd highest salary per department

PySpark Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# Spark Session
spark = SparkSession.builder.appName("SecondHighestSalary").getOrCreate()

# Employee Dataset
emp_data = [
    (1, "Alice", 5000, 1),
    (2, "Bob", 7000, 1),
    (3, "Charlie", 6000, 1),
    (4, "David", 8000, 2),
    (5, "Eve", 9000, 2),
    (6, "Frank", 7500, 2),
]

dept_data = [
    (1, "HR"),
    (2, "IT"),
]

# Create DataFrames
emp_df = spark.createDataFrame(emp_data, ["id", "name", "salary", "dept_id"])
dept_df = spark.createDataFrame(dept_data, ["dept_id", "dept_name"])

Using JOIN + PARTITION BY + ROW_NUMBER()
# Define Window Specification
window_spec = Window.partitionBy("dept_id").orderBy(col("salary").desc())

# Rank salaries department-wise
ranked_df = emp_df.withColumn("rnk", row_number().over(window_spec))

# Filter for 2nd highest salary
second_highest_df = ranked_df.filter(col("rnk") == 2)

# Join with Department DataFrame to get dept_name
result_df = second_highest_df.join(dept_df, "dept_id").select("id", "name", "salary", "dept_name")

# Show Result
result_df.show()

'''

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# Spark Session
spark = SparkSession.builder.appName("SecondHighestSalary").getOrCreate()

# Employee Dataset
emp_data = [
    (1, "Alice", 5000, 1),
    (2, "Bob", 7000, 1),
    (3, "Charlie", 6000, 1),
    (4, "David", 8000, 2),
    (5, "Eve", 9000, 2),
    (6, "Frank", 7500, 2),
]

dept_data = [
    (1, "HR"),
    (2, "IT"),
]

# Create DataFrames
emp_df = spark.createDataFrame(emp_data, ["id", "name", "salary", "dept_id"])
dept_df = spark.createDataFrame(dept_data, ["dept_id", "dept_name"])

#Using JOIN + PARTITION BY + ROW_NUMBER()
# Define Window Specification
window_spec = Window.partitionBy("dept_id").orderBy(col("salary").desc())

# Rank salaries department-wise
ranked_df = emp_df.withColumn("rnk", row_number().over(window_spec))

# Filter for 2nd highest salary
second_highest_df = ranked_df.filter(col("rnk") == 2)

# Join with Department DataFrame to get dept_name
result_df = second_highest_df.join(dept_df, "dept_id").select("id", "name", "salary", "dept_name")

# Show Result
result_df.show()

In [None]:
'''
process 5 Tb of data in pyspark

5tb=5*1024 GB=5*1024*1024Mb=5242880 Mb

each partion default memory size vary from 128Mb to 256 Mb

if I am considering 256 Mb
then  total number of patitions= 5242880/256=20448 partitions

As common thumb rules each partition contains 1-2 cpu cores
20448*2=40896 CPU Cores

Assuming each node can have 200-4000 partitions and each node contains 8 executors if we consider each node memory is 64 GB and each executer memory 8 gb

so no. of nodes= no. of partitions/partitions per node

20448/200=102 nodes

let suppose no. of cpu cores per node:-16 cores

'''
"A node typically has between 8 to 64 CPU cores, depending on the configuration and machine type. For example, in a cloud environment like AWS, an instance might have 16 or 32 cores."
'''

A common thumb rule will allocate 2-4 gb of memory per cpu core

if u configure 4 executors per node

if each executer get 4 cores

so=4*4=16 cpu cores per node

4 exec* 16 gb each exec memory=64 each node memory

no. of executers cores=no. of nodes* 16 cpu cores per node
=102*16=1632

no. of executors=no. of exec cores/no. of cores per executers
1632/4=408 executers

each executer memory=no. of cores* each core memory
 4*4=16 gb

 total memory required= no. of executors* each memory size
 408*16=6528 GB


'''