**Q. How to explain PySpark code**
****
**A.** explain( ) function, it shows cost based plan from catalyst optimizer End of the code ` df.select("*").filter(col("age")>20)  `

**Q. What is the difference between show( ) and collect( ) methods**
***
**A.** 

**Q. What is the difference between take( ) and show( ) methods**
***
**A.** 

| Feature          | `show()`                                     | `take(n)`                              |
| ---------------- | -------------------------------------------- | --------------------------------------|
| **Purpose**      | Display DataFrame in a tabular format        | Retrieve first n rows programmatically|
| **Usage**        | `df.show()`                                  | `rows = df.take(n)`                    |
| **Output**       | Tabular display in the console               | display in List of `Row` objects                 |
| **Default Rows** | Shows first 20 rows by default               | Retrieves the first n rows specified |
| **Display Limit**| Customizable display limit with `show(n)`    | N/A                                   |
| **Example**      | `df.show()`                                 | `rows = df.take(5)`                   |



**Q. How to create Schema in PySpark**
****
**A.** There is three way to create
- String
  - `schema = "id int, name string, salary float, date_of_joining date"`
- StructType([StructField(col("id"), integerType(), null=True)])                  
  - `StructType([`                                          
   `    StructField("id", IntegerType(), True),`                         
    `    StructField("name", StringType(), True),`                   
    `    StructField("salary", FloatType(), True),`                
    `    StructField("id", StringType(), True),`             
   `    StructField("date_of_joining", DateType(), True)])`             
- StructType().add()
  - ` schema2 = StructType().add("name", StringType(), nullable = True).add("age", StringType(), nullable = True) `

**Q. What is the StructType and StructField in schema**
****
**A.**`StructType()`: Define structure of Dataframe          
`StructType()`: Define metadata of the Dataframe columns
 


**Q. What if I have a header in my DataFrame**
****
**A.** Use `option("header", True)` or skip this header `option("skipRows", 4)`


**Q. How to create schema for struct and array**
***
**A.**

**Create schema for Struct**

`schema_with_struct = StructType([`                   
`    StructField("id", StringType(), True),`             
`    StructField("details", StructType([`                     
`        StructField("name", StringType(), True),`                
`        StructField("age", StringType(), True)`                   
`    ]), True),`                                         
`    StructField("score", StringType(), True)`                             
`])`                                          


---

**Create schema for Array**

`schema_with_array = StructType([`                 
`    StructField("id", StringType(), True),`                 
`    StructField("names", ArrayType(StringType()), True),`                        
`    StructField("score", StringType(), True)`                     
`])`                                         

**Q. How to create data-frame in pySpark**
***
**A.**

- Career data-frame by using a file                          
` df = spark.read\`                   
`    .option('header', True)`                
`    .option('inferschema', True)\`                 
`    .format('csv')\`                        
`    .load(r'\C:\data-path\csv_data.csv')`

- Career data frame by using verbal                    
` df = spark.createDataFrame(data = data, schema = my_schema)`

**Q. More option( ) values use in spark dataframe**
***
**A.**

`df = spark.read.format("csv")\` # by default parquet format                                      
`    .option("header", True)\`                        
`    .option("inferschema", True)\`                     
`    .option("sep", ',')\`                              
`    .option("comment", "This is comment messages")\`                  
`    .option("nullValue", "null_data")\`                  
`   .option("lineSep", "\n")\`                   
`    .option("delimiter", ";")\`                 
`    .option("mode", "PERMISSIVE")\`                            
`    .option("skipRows", 1)` # skip single upper row                          
`    .load("file_path")`              

**Q. How to skip a range of rows in dataframe**
***
**A.**

<img src="https://i.ibb.co/xz5WgCN/015.jpg" style="width:100%"/>

` range_ = full_df.head(5)[1:4] ` # select rows form dataframe                    
` range_rdd = sc.parallelize(head) ` # convert list to rdd                     
` range_df = df_w.toDF() ` # convert rdd to dataframe                             
` modify_df = full_df.subtract(df_w) ` # subtract range_rdd from full_df                  
` modify_df.show() `

**Q. How to print a range of dataframe**
***
**A.**

` display(df.head()) ` # display first row as a list                      
` display(df.head(3)) ` # display first 3 rows                       
` display(df.first()) ` # display first row as a list                     
` display(df.head(5)[2]) ` # getting first five rows but display only third row                  
` display(full_df.head(5)[2:4]) ` # getting first 10 rows but display only third and fourth row

**Q. How to create Spark SQL table by using pySpark data-frame**
***
**A.**

` df.createOrReplaceTempView('sql_tab')  ` # Convert PySpark data-frame into MySQL table               
` spark.sql("""
select * from sql_tab
where DEST_COUNTRY_NAME = 'United States'
limit 3
""").show()  `


**Q. What is the difference between createOrReplaceTempView( ) and createOrReplaceGlobalTempView( )**
***
**A.** View Constructs a virtual table that has no physical data.              
` CreateOrReplaceTempView `: It is session based. It is saved in defualt database                
      <code> df.CreateOrReplaceTempView("sql_table")                      
      ` %sql `                                                          
      ` SELECT * FROM sql_table `</code>                 

      
` CreateOrReplaceGlobalTempView `:It is not session based. It is saved in global_temp database            
      <code> df.CreateOrReplaceGlobalTempView("sql_table_global")                      
      ` %sql `                                                          
      ` SELECT * FROM global_temp.sql_table_global `</code>                                 


**Q. Have you worked with corrupted records**
****
**A.** Yes!                         
visit:https://medium.com/@sasidharan-r/how-to-handle-corrupt-or-bad-record-in-apache-spark-custom-logic-pyspark-aws-430ddec9bb41

**Q. When do you say that records are corrupted**
****
**A.**


- In JSON file              
  - Missing {               

- In CSV file             
  - More or Less value according to columns


**Q. What happens when we encounter corrupted records in different read modes**
****
**A.**`option("mode", "PERMISSIVE")`: Set null value to all corrupted fields              
`option("mode", "DROPMALFORMED")`: Drop the corrupted record/row              
`option("mode", "FAILFAST")`: Fail execution if malformed record in dataset             

By default `option("mode", "PERMISSIVE")` 


**Q. How can we read and write bad records**
****
**A.** 
1. Create a dataframe schema (we can not use read mode)                        
    `schema = StructType([`               
    `StructField("id", IntegerType(), True),`               
    `StructField("name", StringType(), True),`                 
    `StructField("salary", FloatType(), True),`                     
    `StructField("_corrupt_record", StringType(), nullable = True)` # create this column for corrupt records          
    `])`
 2. Create datafame with this schema                         
    `df = spark.read.\ `              
    `format("csv")\ `                 
    `.schema(schema)\ `                
    `.load(r"/FileStore/tables/Algerian_forest_fires_cleaned_dataset.csv")`
<img src="image/004.webp" />
3. Write bad/corrupet records            
   ` df = spark.read.\`               
    `format("csv")\`               
    `.schema(schema)\`                                                          
    `.option("badRecordsPath", "/FilesStore/tables/bad_redods")\`               
    `.load(r"/FileStore/tables/Algerian_forest_fires_cleaned_dataset.csv")`             
Note: bad_records save in JSON file format

**Q. Where do you store corrupted records and how can we access them later**
****
**A.** Assign a path to store bad record `option("badRecordsPath","/file/store/data/")`

**Q. How to print all dataframe columns name**
***
**A.** ` df.columns  `

**Q. How to print data from schema with StrutType( ) and StructField( )**
***
**A.** ` df.schema `

**Q. How to display dataframe columns name and data type**
***
**A.** ` df.dtypes `

**Q. List of Spark Data Types**
***
**A.**

<table>
  <tbody>
    <tr>
      <td>StringType</td>
      <td>ShortType</td>
      <td>ArrayType</td>
      <td>IntegerType</td>
    </tr>
    <tr>
      <td>MapType</td>
      <td>LongType</td>
      <td>StructType</td>
      <td>FloatType</td>
    </tr>
    <tr>
      <td>DateType</td>
      <td>DoubleType</td>
      <td>TimestampType</td>
      <td>DecimalType</td>
    </tr>
    <tr>
      <td>BooleanType</td>
      <td>ByteType</td>
      <td>CalendarIntervalType</td>
      <td>HiveStringType</td>
    </tr>
    <tr>
      <td>BinaryType</td>
      <td>ObjectType</td>
      <td>NumericType</td>
      <td>NullType</td>
    </tr>
  </tbody>
</table>


**Q. What is JSON data and how to read it in Apache PySpark**
****
**A.** JSON standard for **<u>JavaScript Object Notation</u>** is a semi-structured data, store data in key:value pair, use ` format("json") ` 

- JSON is a semi-structured data type
- JSON is a key value pair data format file
- Every record enclosed in Curly braces
- Struck Fallatan in columns by using *.* ` df.select(col("Address.*")).show() `
- Array a Fallatan in rows/records by using ` df.select(explode(col("company_name")).alias("new_col_name")).show() `
                                 
`js_df = spark.read.option("header", True)\`                     
`    .option("multiline", True)\`                           
`    .option("inferschema", True)\`                      
`    .format("json")\`                        
`    .load("/FileStore/tables/data/resturant_json_data.json")`                  
`js_df.show()`                                                   


**Q. What if I have 3 keys in all lines and 1 key in one line in the JSON file**
****
**A.** Create 4 columns in dataframe and assign 4<sup>th</sup> column null if value is not persent 


**Q. What is multi-line and line-delimited JSON**
****
**A.**                        
**1. Multi-line** : Where JSON single record in more than one line                
`          {`                     
`            "name":"Nazer",`                  
`            "email":"naziri1920@gmail.com"`              
`            "mobile": 5847896542`                    
`          }`                   

**2. line-delimited**: Single line JSON                  
`          {"name":"Nazer","email":"naziri1920@gmail.com","mobile":123456790}`
 


**Q. Which one works faster multi-Line or Line-delimited in JSON in file format**
****
**A.** line-delimited work fister bucause by derault spark consider JSON line-delimited
 


**Q. How to read nested JSON into PySpark DataFrame**
****
**A.** Use ` option("multiline", True) ` and ` format("json") `
 


**Q. What will happen if I have a corrupted JSON record and corrupted file**
****
**A.**  In case of corrupted record, this record is saved in "_curropt_record column". In case of corrupted file return error. 


**Q. How to flatten Array and Struct in data frame JSON file format**
***
**A.** by using ` explode( ) ` and **.** function

`js_df.select(explode("restaurants").alias("new_restaurant"))\`                         
`    .select(col("new_restaurant.*"))\`                                    
`    .select(col("restaurant.*"))\`                         
`    .select(`                         
`            col("R.*"),` # flatten all Struct                    
`            col("location.*"),`                       
`            explode("offers").alias("new_offer"),`  # Flatten Array                          
`            col("user_rating.*")`                      
`            ).printSchema()`                         

**Q. How to flatten a struct in dataframe JSON file format**
***
**A.**

`flattened_df = df.select(`                               
`    col("id"),`                                  
`    col("details.name").alias("details_name"),`                   
`    col("details.age").alias("details_age"),`                    
`    col("details.address.city").alias("details_address_city"),`                   
`    col("details.address.zip").alias("details_address_zip"),`                             
`    explode("details.hobbies").alias("hobby"),`                                                            
`    col("salary")`                                    
`)`                           

**Q. What is Parquet**
****
**A.** Parquet is a default file format in Spark and this columner file format. There is not required any format to define during file read 
visit: https://medium.com/analytics-vidhya/whats-the-buzz-about-parquet-file-format-8a1fe4f65de


**Q. What is the parquet hybrid mode in writing file**
***
**A.** Here we can see that the hybrid is a combination of row and columnar storage.
<img src="https://i.ibb.co/BgkqYRw/005.webp" style="width:100%;"/>
<center>Hybrid storage model</center>


**Q. Why do we need Parquet**
****
**A.** 
- Parquet is a columnar file format, and columnar file format is easy to read and process in case of big-data, low storage required,
- parquet saved in hybrid form(data divided into column and rows),
- Parquet is a structured file format
- Parquet is a binary form
 


**Q. Where should we use columnar file format or row file format**
***
**A.**

| Concept                  | OLAP (Online Analytical Processing)                       | OLTP (Online Transactional Processing)                           |
|--------------------------|---------------------------------------------------------|-------------------------------------------------------------|
| **Use Case**             | most of the time read data     | most of the time write data, transactional data, loan data, banking data |
| **Characteristics**      | Supports complex queries, aggregations, reporting.     | Optimized for fast, real-time transactional operations.       |
| **Example in PySpark**   | Performing complex aggregations using DataFrame API. | Basic CRUD operations on a DataFrame, dealing with individual records. |


**Q. How to read a Parquet file**
****
**A.** 
`spark.read.option("header", True).load("file path")` There is not necessary to provide format information


**Q. How to read parquet file in windows CMD**
***
**A.**

Install these libraries                                        
> pip install pyarrow                        
> pip install parquet-tools

Open python terminal and run this code
> parquet_file = pq.ParquetFile(r'D:\Big-Data-2023\git_repo\Data-Engineer-Interview-Notes\git_ignore\data\part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet')
> parquet_file.metadata                                   
> parquet_file.metadata.row_group(0)                        
> parquet_file.metadata.row_group(0).column(0)                          
> parquet_file.metadata.row_group(0).column(0).statistics

Run the below command in cmd/terminal
>parquet-tools show  D:\Big-Data-2023\git_repo\Data-Engineer-Interview-Notes\git_ignore\data\part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet                          
>parquet-tools inspect  D:\Big-Data-2023\git_repo\Data-Engineer-Interview-Notes\git_ignore\data\part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet

**Q. How to data organize in parquet**
***
**A.**
Date organizaton in parquet
- File 
  - Row Group (we have metadate(min, max, count, etc) at group level also)
    - Column
      - Pages
        - Metadata
          - Min
          - Max
          - Count
         
<img src="https://i.ibb.co/hYTYXXP/036.png" style="width:100%;height:400px;" alt="Date organizaton in parquet" >

**Q. What makes Parquet the default choice**
***
**A.** Parquet follows RLE (Run Length Encoding) Technique.
 


**Q. What encoding is done on Parquet data**
****
**A.** 


**Q. What comparison technique is used in the Parquet file format**
****
**A.** ` gzip ` comparison technique
` df.write.parquet("/path/to/your/output", compression="gzip") ` ,
parquet by derault use snappy compression codes


**Q. How to optimize the Parquet file**
****
**A.** By using three technique
1. comparison technique ` gzip ` or ` Snappy `
2. Parquet has matadate on row group lelvel
3. Parquet follows RLE (Run Length Encoding) Technique (store repeated consecutive values efficiently by representing them as a base value and the number of consecutive occurrences)
   
   <img src="https://i.ibb.co/gFq8BPT/039.png" alt="RLE" width="200" height="300">
5. Bit Packing (reducing storage space by optimizing available bits for representing integers.)
6. Predicate Pushdown Technique
<img src="https://i.ibb.co/C61bGjv/037.png" alt="optimization in parquet" width="100%" height="300">

**Q.How to write data frame to disk in spark**
***
**A.**

` df.write.format('csv').option('header', True)\  `                 
`     .option('path', '/FileStore/tables/csv_write/')\  `                   
`         .save()  `                

- File name create by pySpark in databricks

**Q. How to write data in partition**
***
**A.**

` df.write.format('csv').option('header', True)\`                             
`    .option('mode','overwrite')\`                                              
`    .option('path', '/FileStore/tables/csv_write_repartition__/')\`                      
`    .partitionBy('ORIGIN_COUNTRY_NAME')\`                        
`    .save()`           

> Partition on multiple columns
` .partitionBy('col1', 'col2', 'col3')\` 

**Q. How to write data in bucket**
***
**A.**


` df.write.format('csv').option('header', True)\ `                                 
`        .option('mode','overwrite')\`                                            
`        .option('path', '/FileStore/tables/csv_write_repartition_bucket/')\`                           
`        .bucketBy(3,'ORIGIN_COUNTRY_NAME')\` #3 is number of buncket         
`        .format("csv") `                        
`        .saveAsTable('bucket_name')`               

**Q. What is the best way to write data in bucket**
***
**A.**

Most of the time when we go to bucket data, then 200 partitions interrupt in this bucketing, so the best way to write in bucket use repartitioning and then bucket data


` df.repartition(3)\`                       
`        .write.format('csv')\`            
`        .option('header', True)\ `                                 
`        .option('mode','overwrite')\`                                            
`        .option('path', '/FileStore/tables/csv_write_repartition_bucket/')\`                           
`        .bucketBy(3,'ORIGIN_COUNTRY_NAME')\` #3 is number of buncket                           
`        .saveAsTable('bucket_name')`     


**Q. What is the write default mode**
***
**A.** ` mode("error") ` is default write mode

**Q. What are the modes available in DataFrame write**
****
**A.**                                                                    
` mode("append") `: Appends the data to the existing data if it exists.                          
` mode("overwrite") `: Overwrites the existing data if it exists.                                  
` mode("ignore") `: Ignores the operation if the data already exists.                                  
` mode("error") `: Raises an error if the data already exists.                                  
` mode("errorifexist") `: Raises an error if the data already exists.                   

Note: Spark default write mode is ` mode("error") `

**Q. How to write data into multiple partitions**
****
**A.**                             

` df.repartition(3).write.format('csv').option('header', True)\  `                 
`     .option('path', '/FileStore/tables/csv_write/')\  `                   
`     .save()  `                 



**Q. What is schema**
****
**A.** A schema is a combination of **column-name** and **column-data-type**                  

- Print schema in data frame ` df.printSchema() `
- Print only columns name ` df.columns  `
- print data-frame StructType and StructField ` df.schema  `

**Q. What is DataFrame**
****
**A.** A DataFrame in PySpark is a                    
- distributed,                      
- immutable, and                    
- lazily evaluated data structure                              
that represents structured data and enables scalable data processing across a cluster of machines.

**Q. What is the column**
***
**A.**  A column represents a named and typed collection of data.                          
Columns are expressions, and an expression is a set of transformations on one or more than one value in a record                  
` df.select(col('age')+5)  `

**Q. What is the row**
***
**A.** Row is an object, define by                                        
` from pyspark.sql import Row  `                       
` row = Row(1, 'Khan', 2563) `


**Q. How many ways to select columns**
***

**A.** 
1.  ` df.select('*').show()  ` # select all columns
2.  ` df.select(col('age'),col('salary')).show()  `                     
3.  ` df.select("age","salary").show()  `                       
4.  ` df.select(df["age"]).show() `  # handy option in case of join                       
5.  ` df.select(df.ORIGIN_COUNTRY_NAME).show()  `  # handy option in case of join              

**Q. What is the expression**
***
**A.** expr() used for assigning MySQL queries                         
` df.select(expr("ORIGIN_COUNTRY_NAME AS new_name")).show()  `               
` df.select(expr("CONCAT (fname,' ',lname) AS full_name")).show()  `               

**Q. What is aliasing**
****
**A.** alias( ) Function use to change column name
` df.select(col("ORIGIN_COUNTRY_NAME").alias("new_name")).show()  `

**Q. What is the difference between filter and where in Apache PySpark**
****
**A.** There is no difference, both are used for filtering result                         

` df.select("*").filter(col("ORIGIN_COUNTRY_NAME")=='Romania').show()  `
` df.select("*").where(col("ORIGIN_COUNTRY_NAME")=='Romania').show()  `

**Q. What is the literal function**
****
**A.** Assign a static value in data frame column
` df.withColumn("lit_col", lit("123")).show() `

**Q. How to add a new column in DataFrame**
****
**A.*withColumn() used to add a new column or modify an existing column in data-frame*                              
` df.withColumn("TOTAL_DEST_COUNTRY", col("count")+20).show()  `                              
` df.withColumn("TOTAL_DEST_COUNTRY", col("count")+20).show()  `

**Q. How to rename a column in DataFrame**
****
**A.**  ` df.withColumnRenamed("DEST_COUNTRY_NAME", "DEST_COUNTRY_NAME_S").show()  `


**Q. How to cast data types**
****
**A.** 
` df.withColumnRenamed("count", "count_traivel").select(col("count_traivel").cast(IntegerType())).printSchema()  ` #New col whth cast()        
` df.withColumn('count', col('count').cast(IntegerType())).printSchema() ` # change schema in defined col

**Q. How to remove a column in DataFrame**
****
**A.** drop( ) used to remove an existing column from data-frame                  
` df.drop(col("DEST_COUNTRY_NAME")).select("*").show() `



**Q. What is the difference between Union and union all**
****
**A.** It is used to combine two dataframe vertically, including all rows from both dataframe, even if there are duplicates records.

- union() and unionAll() don't remove any duplicate records from dataframe.
- union() will remove duplicate records in MySQL table
- unionAll() doesn't remove any duplicate records from MySQL table.
- Number of columns in both tables must be same. 

**Q. What will happen if I change the number of columns while Union in the data**
****
**A.** Return an error


**Q. What if the column name is different**
****
**A.** Dataframe will be union but fetch header from first table, And column data type may be mismatch

**Q. What is UnionByName**
****
**A.** UnioByName() tries to find out same column name in both dataframe.

- Column order can be mismatched
- Column name must be same

**Q. What is the case when in Spark SQL**
****
**A.** 

` df.createOrReplaceTempView("sql_table") `                           
` spark.sql( `                                     
`    """ `                    
`        select *, `                      
`        CASE `                        
`            WHEN count>200 and DEST_COUNTRY_NAME = 'United States' THEN 'most_busy' `             
`            WHEN count>100 THEN 'busy' `                                            
`            ELSE 'okay' `                     
`        END AS check_pointing `               
`        FROM sql_table `                    
`    """ `                 
`).show() `

**Q. What is the when otherwise in Spark**
****
**A.** 

`df.withColumn("check_point", when(col("count") > 200, "most_busy")\`                      
`    .when(col("count")>100, "busy")\`                    
`    .otherwise("clear"))\`              
`    .show()`                      

**Q. How to deal with Null value in DataFrame**
****
**A.**

`df.withColumn("check_point", when(col("count").isNull(), lit(0))\` #handel null values                         
`    .when(col("count") > 200, "most_busy")\`                       
`    .when(col("count")>100, "busy")\`                
`    .otherwise("clear"))\`                  
`    .show()`                


**Q. How to use case when or when otherwise with multiple AND, OR conditions**
****
**A.** 

` df.withColumn('check_pointing', `                             
`            when((col("count")>200) & (col("DEST_COUNTRY_NAME")=='United States'),"Profitable")\`                     
`            .otherwise("okay")).show()`                                                          



**Q. How to find unique rows**
****
**A.** distinct( ) Function used to get unique record from data frame                 

` df.select("*").distinct().count() ` # get unique rows                              
` df.select(col("DEST_COUNTRY_NAME")).distinct().count() ` #get unique country name                          
` df.distinct().count() ` #all record based distinct

**Q. How to drop duplicate rows**
****
**A.** *dropDuplicates( )* and *drop_duplicates( )*


` df.dropDuplicates(["DEST_COUNTRY_NAME"]).count() ` #drop records on *DEST_COUNTRY_NAME* based output(125)                  
`df.dropDuplicates(["DEST_COUNTRY_NAME", "count"]).count()` #drop records on *DEST_COUNTRY_NAME* and *COUNT*  based output(213)   
` df.drop_duplicates(["DEST_COUNTRY_NAME", "count"]).count() `

**Q. How to sort the data in ascending and descending order**
****
**A.**
` df.sort(col("count")).show()` # Dataframe sorting on single column                    
` df.sort(col("count"), col("ORIGIN_COUNTRY_NAME")).show() ` # Dataframe sorting on multiple columns             
` df.sort(col("count").desc()).show() ` # Dataframe sorting on single column in descending order      


**Q. What is the aggregation in PySpark**
****
**A.** Collecting something together 


**Q. How to use aggregation in PySpark**
****
**A.** You can use multiple aggregation functions in agg( ) function                         
`df.select(col("dept"),col("salary"))\`                 
`    .groupBy(col("dept"))\ `                                  
`    .agg(sum(col("salary")),avg(col("salary"))).show()`                      
-- -------
` df.groupBy(col("ORIGIN_COUNTRY_NAME")).agg(round(avg(col("count"))).alias("agg_col")).show() ` # AVG                 
` df.agg(count(col("count")).alias("Count_col")).show() ` # number of records                         
` df.agg(min(col("count")).alias("min_fun")).show() ` # min                          
` df.agg(max(col("count")).alias("max_fun")).show() ` # max                       
` df.agg(sum(col("count")).alias("sum_fun")).show() ` #add all values                        

**Q, Explain count( ) function**
***
**A.** It is sometime action(when return a result or calls a job) and sometime transformation(when does not call a job)

- count( ) skips null value, If I use single column
- count( ) skips null value, If all column has null values on same position

1. Transformation ` df.select(count(col("name"))) `
2. Action ` df.select("*").count() `

**Q. How groupBy works**
****
**A.** 
1. GroupBy operation on single column                        
   ` df.groupBy("dept").agg(`                       
   ` sum(col("salary")).alias("tatal_salary") `                
   ` ).show() `
2. GroupBy operation on Multiple Columns                          
   ` df.groupBy(["country","dept"]).agg( `                  
   ` sum(col("salary")).alias("total_salary") `                     
   ` ).orderBy(col("total_salary")).show() `
3. GroupBy operation on Multiple Aggregations           
   ` df.groupBy(col("dept")).agg( `     
   ` sum(col("salary")).alias("total_salary"), `       
   ` round(avg(col("salary")),2).alias("avg_salary") `      
   ` ).show() `
4. Filter Aggregated data using where condition                
   ` df.groupBy(col("dept")).agg( `                   
   ` sum(col("salary")).alias("total_salary") `            
   ` ).where(col("total_salary")>50000).show() `             

**Q. How to implement groupBy in PySpark**
***
**A.** select_columns > groupBy() > some aggregration

**Q.Create this table and answer flowing questions**
***
**A.**
<code>
schema = StructType([
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("dept", StringType(), True)
])
data = [
    ("manish", 50000, "IT"),
    ("vikash", 60000, "sales"),
    ("raushan", 70000, "marketing"),
    ("mukesh", 80000, "IT"),
    ("pritam", 90000, "sales"),
    ("nikita", 45000, "marketing"),
    ("ragini", 55000, "marketing"),
    ("rakesh", 100000, "IT"),
    ("aditya", 65000, "IT"),
    ("rahul", 50000, "marketing")
]
df_test = spark.createDataFrame(data, schema=schema)
</code>

**Q. What is the total salary given to its employee**
***
**A.**

` df_test.select(sum(col("salary")).alias("total_salary")).show()  `

**Q. What is the total salary per department wise**
***
**A.**

` df_test.select(col("dept"), col("salary")).groupBy(col("dept")).agg(sum(col("salary")).alias("dept_salary")).show() `


**Q. I want all column name with one extra column where total salary of each department is maintained**
***
**A.**

` from pyspark.sql.window impoer Window `  # import Window function                               
` window = Window.partitionBy(col("department")) ` # create new window              
` df.withColumn("total_dep_salary", sum(col("salary")).over(window)).show() `                           

**Q. How join works**
****
**A.** Joins used to combine data from different tables based on a common key. 
<code>result = df1.join(df2, on="id", how="inner")</code>

**Q. Why do we need JOIN**
****
**A.** If we don't have enough information in single table then we need another table that fulfills our requirement 


**Q. What to do after joining two tables**
****
**A.** It depends on you after joining you can use distinct and you can skip this part(using distinct)


**Q. What if tables have the same column name**
****
**A.** If you don't define column name with data frame it will return an **ambiguous error**               
` df = df.join(df2, df["id"] == df2["id"], "inner") `                     
` df.show() `

<img src="https://i.ibb.co/qgyN2Vq/joins.png" style="width:100%;height:300px;" alt="joins" border="0">

**Q. How to join on two more columns**
****
**A.**                         
<code>df.join(df1, 
        (df["id"]==df2["emp_id"]) & (df2["emp_id"]==df3["employee_id"]),
        "inner"
        ).select("*")show()
</code>

**Q. How to join multiple tables**
***
**A.**                        
<code>df.join(df1, 
        (df["id"]==df2["emp_id"]) & (df2["emp_id"]==df3["employee_id"]),
        "inner"
        ).select("*")show()
</code>

**Q. How many types of join**
****
**A.** 
1. **Inner Join**: An inner join returns rows from both dataframes that have matching keys.                           
   <code>result = df1.join(df2, on="id", how="inner")</code> 
2. **Outer (Full) Join**: An outer join, also known as a full join, returns all rows from both dataframes. If a key is present in one dataframe but not in the other, the missing values are filled with nulls.                
<code>result = df1.join(df2, df1["id"]df==2["emp_id"]", how="outer")</code> 
3. **Left Join**: A left join returns all rows from the left dataframe and the matched rows from the right dataframe. If no match is found for a key in the right dataframe, the result will contain null values.                          
<code>result = df1.join(df2, df1["id"]df==2["emp_id"]", how="left")</code>                    
4. **Right Join**: A right join returns all rows from the right dataframe and the matched rows from the left dataframe. If no match is found for a key in the left dataframe, the result will contain null values.                       
<code>result = df1.join(df2, df1["id"]df==2["emp_id"]", how="right")</code>                
5. **Left Semi Join**: A left semi join returns only the columns from the left dataframe for the rows with matching keys in both dataframes. It is similar to an inner join but only returns the columns from the left dataframe.              
<code>result = df1.join(df2, df1["id"]df==2["emp_id"]", how="left_semi")</code>                 
6. **Left Anti Join**: A left anti join returns the rows from the left dataframe that do not have matching keys in the right dataframe. It is the opposite of a left semi join.                         
<code>result = df1.join(df2, df1["id"]df==2["emp_id"]", how="left_anti")</code>              
7. **Cross Join**: A cross join, also known as a cartesian join, returns the cartesian product of both dataframes. It combines each row from the left dataframe with each row from the right dataframe.                     
<code>result = df1.crossJoin(df2)</code> 

**Q. Define left_semi and left_anti join**
***
**A.**                           
**left_semi**: All records form left table(A), that common in both table(A,B), join(inner-left)                 
**left_anti**: All records form left table(A), that not common in table(B), join(full-right-inner)

**Q. What is the window function**
***
**A.** Window function creates a window of rows, where we can perform our transformation and action

**Q. How to create a window**
***
**A.**  ` window = Window.partitionBy(col("col_name")).orderBy(col("col_name"))`

**Q. What is the window function**
****
**A.** Window function used to create rows of the same category into groups, allowing for calculation or aggregation to be applied within these groups 

<a href="https://www.linkedin.com/in/imrannazer-/">
<img src="https://i.ibb.co/7jPTLzV/035.png" style="width:100%;" alt="035" border="0"></a>

**Q. What is the row number rank and dense_rank in PySpark**
***
**A.**            
 
**row_number():** Returns a sequential number starting from 1 within a window partition         
**rank()** Returns the rank of rows within a window partition, with gaps.         
**dense_rank():** Returns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps.

```python
emp_df.withColumn("row_num", row_number().over(window))\
    .withColumn("rank_num", rank().over(window))\
    .withColumn("dense_rank_num", dense_rank().over(window))\
    .show()


| emp_id | emp_name | salary | department | gender | row_num | rank_num | dense_rank_num |
|--------|----------|--------|------------|--------|---------|----------|----------------|
|   1    | imran    | 50000  | IT         | m      |   1     |   1      |        1       |
|  11    | Khan     | 50000  | IT         | f      |   2     |   1      |        1       |
|   9    | Glaxy    | 65000  | IT         | m      |   3     |   3      |        2       |
|   4    | Nazer    | 80000  | IT         | m      |   4     |   4      |        3       |
|   8    | roshan   |100000  | IT         | f      |   5     |   5      |        4       |
|   6    | Hasib    | 45000  | marketing  | f      |   1     |   1      |        1       |
|  10    | abraham  | 50000  | marketing  | m      |   2     |   2      |        2       |
|   7    | Baidan   | 55000  | marketing  | f      |   3     |   3      |        3       |
|   3    | Nitin    | 70000  | marketing  | m      |   4     |   4      |        4       |
|   2    | Mufti    | 60000  | sales      | m      |   1     |   1      |        1       |
|   5    | Hasina   | 90000  | sales      | f      |   2     |   2      |        2       |
|  12    | Rajab    | 90000  | sales      | m      |   3     |   2      |        2       |


**Q. How to use windows function**
***
**A.**

<code>from pyspark.sql.window import Window
window = Window.partitionBy(col("department")).orderBy(col("salary"))
emp_df.withColumn("row_num", row_number().over(window)).show()</code>

**Q. How to calculate the top two salary holders from each department**
****
**A.** 

<code>window = Window.partitionBy(col("department"),col("gender")).orderBy(desc(col("salary")))
emp_df.withColumn("row_num", row_number().over(window)).filter(col("row_num")<3)\
    .show()</code>

**Q. What is LEAD and LAG in PySpark**
****
**A.** 
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body>
    <div style="float: left; width: 70%; padding: 20px; box-sizing: border-box;">
    
1. **lag(** columnName: String, offset: Int, defaultValue: Any **)** returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row.                                      
   ` window = Window.partitionBy(col("Temperature")).orderBy(col("Rain"))`         
   ` new_df = df.withColumn("lead_new", lag(col("Temperature"),1).over(window))`                            
   ` new_df.show()`                                                             

3. **lead(** columnName: String, offset: Int, defaultValue: Any **)** :returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row.                                 
   ` window = Window.partitionBy(col("Temperature")).orderBy(col("Rain"))`         
   ` new_df = df.withColumn("lead_new", lead(col("Temperature"),1).over(window))`                            
   ` new_df.show()`                                                             

    </div>
    <div style="float: right; width: 30%; padding: 20px; box-sizing: border-box;">
        <img src="https://i.ibb.co/W5Yq1DP/048.png" style="width:250px;height:240px;" align="right"/>
    </div>

</body>
</html>


**Q. Where required partitionBy and orderBy in window function and where not required both**
***
**A.** 
1. Ranking (partition by + order by)
    - row_number,rank,dense_rank,percent_rank,ntile
2. Analytical (partition by + order by)
    - cume_dist,lag,lead
3. Aggregation (Partition by is compulsory)(order by is not required)
    - min,max,avg,sum


1. Ranking Functions (partitionBy + ordeBy)
| Function        | Description                                 | PartitionBy | OrderBy | Example                                                |
|-----------------|---------------------------------------------|-------------|---------|--------------------------------------------------------|
| `row_number()`  | Assigns a unique number to each row within a partition based on the specified order | Yes         | Yes     | `df.withColumn("row_num", F.row_number().over(Window.partitionBy("col1").orderBy("col2")))` |
| `rank()`         | Assigns a unique rank to each distinct row within a partition based on the specified order, with ties receiving the same rank | Yes         | Yes     | `df.withColumn("rank_col", F.rank().over(Window.partitionBy("col1").orderBy("col2")))`        |
| `dense_rank()`   | Similar to `rank()` but without gaps between ranks for tied values | Yes         | Yes     | `df.withColumn("dense_rank_col", F.dense_rank().over(Window.partitionBy("col1").orderBy("col2")))` |
| `percent_rank()` | Calculates the relative rank of each distinct row within a partition | Yes         | Yes     | `df.withColumn("percent_rank_col", F.percent_rank().over(Window.partitionBy("col1").orderBy("col2")))` |
| `ntile()`        | Divides the result set into a specified number of buckets and assigns a bucket number to each row | Yes         | Yes     | `df.withColumn("ntile_col", F.ntile(4).over(Window.partitionBy("col1").orderBy("col2")))`           |

2. Analytical Functions (partitionBy + orderBy) 
| Function        | Description                                 | PartitionBy | OrderBy | Example                                                |
|-----------------|---------------------------------------------|-------------|---------|--------------------------------------------------------|
| `cume_dist()`    | Calculates the cumulative distribution of a value within a partition | Yes         | Yes     | `df.withColumn("cume_dist_col", F.cume_dist().over(Window.partitionBy("col1").orderBy("col2")))`    |
| `lag()`          | Accesses data from a previous row within the partition based on the specified order | Yes         | Yes     | `df.withColumn("lag_col", F.lag("col1").over(Window.partitionBy("col2").orderBy("col3")))`           |
| `lead()`         | Accesses data from a subsequent row within the partition based on the specified order | Yes         | Yes     | `df.withColumn("lead_col", F.lead("col1").over(Window.partitionBy("col2").orderBy("col3")))`         |

3.Aggregation Functions (Partition by is compulsory)(order by is not required)
| Function        | Description                                 | PartitionBy | OrderBy | Example                                                |
|-----------------|---------------------------------------------|-------------|---------|--------------------------------------------------------|
| `min()`          | Calculates the minimum value within a partition | Yes         | No      | `df.withColumn("min_col", F.min("col1").over(Window.partitionBy("col2")))`                           |
| `max()`          | Calculates the maximum value within a partition | Yes         | No      | `df.withColumn("max_col", F.max("col1").over(Window.partitionBy("col2")))`                           |
| `avg()`          | Calculates the average value within a partition | Yes         | No      | `df.withColumn("avg_col", F.avg("col1").over(Window.partitionBy("col2")))`                           |
| `sum()`          | Calculates the sum of values within a partition | Yes         | No      | `df.withColumn("sum_col", F.sum("col1").over(Window.partitionBy("col2")))`                           |


**Q. what is the percentage of loss or gain based on previous month sales**
***
**A.**
```python
prev_month_sales = prd_df.withColumn("prev_month_sales", lead(col("p_sales"),1).over(window))\
    .withColumn("profit", col("p_sales")-col("prev_month_sales"))\
    .withColumn("p_or_l", round((col("profit")/col("p_sales"))*100, 2)).show()

**Q. What is the percentage of sales each month based on last 6 month saless**
***
**A.**
```python
window = Window.partitionBy("p_id")
last_six_month_df = prd_df.withColumn("previous_six_month_total_sales", sum("p_sales").over(window))\
    .withColumn("perc_sales_each_month", round((col("p_sales")/col("previous_six_month_total_sales")) * 100, 2) )
last_six_month_df.show()

**Q. What is the Window.unboundedPreceding and Window.unboundedFollowing**
***




<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body>
    <div style="float: left; width: 70%;padding: 20px; box-sizing: border-box;">
        
        A.  The purpose of the rows clause is to specify the window frame in relation to the current row
</div>
    <div style="float: right; width: 30%; padding: 20px; box-sizing: border-box;">
        <img src="https://i.ibb.co/fdL9tdL/049.png" />
    </div>
</body>
</html>


**Q. Find out the difference in sales of each product from their first month sells to latest sales**
***
**A.**

<code>window = Window.partitionBy(col("p_id")).orderBy(col("p_date"))\
    .rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
prd_df.withColumn("first_sale",first(col("p_sales")).over(window))\
    .withColumn("last_sale", last(col("p_sales")).over(window))\
    .withColumn("difference", col("first_sale")-col("last_sale")).show() </code>

**Q. Send a mail who has not completed duty time(8 hours)**
***
**A.**

<code>new_df= emp_df.withColumn("login",first("timestamp") .over (window) ) 
            .withColumn("logout", last("timestamp") .over (window) ) 
            .withColumn("login", to_timastamp("login", "yyyy-MM-dd HH:mm:ss"))
            .withColumn("logout", to_timestamp("logout", "yyyy-MM-dd HH:mm:ss"))
            .withColumn("total_time",col("logout")-col("login") ) . show() </code>

**Q. Analyze launched three month sales**
***
**A.**

<code>window = Window.partitionBy(col("p_name")).orderBy(col("p_date"))
prd_df.withColumn("row_number", row_number().over(window))
    .filter(col("row_number")>=3)
    .withColumn("total_sum", sum(col("p_sales")).over(window)).show()</code>

**Q. What is SCD2(slowly changing dimension)**
****
**A.** 

**Q. What is default date format**
****
**A.** *` YYYY-MM-DD `* 

**Q.**
****
**A.** 

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body>
    <div style="float: left; width: 50%; background-color: lightblue; padding: 20px; box-sizing: border-box;">
        <h2>Left Div</h2>
        <p>This is the content of the left div.</p>
    </div>
    <div style="float: right; width: 50%; background-color: lightcoral; padding: 20px; box-sizing: border-box;">
        <h2>Right Div</h2>
        <p>This is the content of the right div.</p>
    </div>
</body>
</html>
