<a href="https://colab.research.google.com/github/Noman654/dataengineer_prep/blob/main/Spark/syntax_practical/common_asked_syntax.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Lets Focus on what question and try to understand the syntax

In [1]:

# setup the spark
import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

### Spark Data Abstractions Quick Reference

1. RDD (Resilient Distributed Dataset)
   - Low-level foundation with direct memory control
   - Best for unstructured data and custom processing
   - Available in all languages

2. DataFrame
   - High-level, optimized API for structured data
   - SQL-like operations and optimizations
   - Works across all Spark languages
   - Recommended default choice

3. Dataset
   - Combines DataFrame optimization with RDD type safety
   - Only available in Scala/Java
   - Not available in Python/R

Note: Use DataFrame as default choice unless you need:
- Low-level control → Use RDD
- Compile-time type safety (Scala/Java) → Use Dataset

### Now lets see how many ways to create Dataframe

In [4]:
# 1. first one using tuple to make rdd and use rdd to make dataframe

rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob"), (3, "Charlie")])
columns = ["ID", "Name"]

# if we did'nt pass columns  they give you random name c0_1 like and schema is blank so bydefault  it take inferschema = True
df = spark.createDataFrame(rdd, columns)
df.show()

+---+-------+
| ID|   Name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
+---+-------+



In [3]:
# 2. another way by using only tuple

data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]

columns = ["ID", "Name"]
df = spark.createDataFrame(rdd, columns)
df.show()

+---+-------+
| ID|   Name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
+---+-------+



In [5]:
# 3. way using dict in this way you dont have to pass columns

data = [{"id": 1, "name": "Alice", "marks":[50,40,30,30,40]},
 {"id": 2, "name": "Bob" , "marks":[50,40,30,50,50,100,120]},
  {"id": 3, "name": "Charlie", "marks":[50,40,30,20,20,10]}]

# in this we are using a list of marks so it will make  a array for each rows by default with long type if we did'nt pass schema
df = spark.createDataFrame(data)
df.show(5)

+---+--------------------+-------+
| id|               marks|   name|
+---+--------------------+-------+
|  1|[50, 40, 30, 30, 40]|  Alice|
|  2|[50, 40, 30, 50, ...|    Bob|
|  3|[50, 40, 30, 20, ...|Charlie|
+---+--------------------+-------+



In [7]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- marks: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- name: string (nullable = true)



### Some important type you should focus
1. **Array** *its like the list in python but in this list all element should be same type *
2. **Struct**

In [10]:
# lets see one more example with schema
schema = T.StructType(
   [ T.StructField("id",T.IntegerType()),
    T.StructField("name",T.StringType()),
    T.StructField("marks",T.ArrayType(T.IntegerType()))
])

df = spark.createDataFrame(data)
df.printSchema() # if you see now element in array is long

root
 |-- id: long (nullable = true)
 |-- marks: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- name: string (nullable = true)



# Types of UDFs in Spark
- Mostly ask question if you are writing some pyspark code on interview.

#### 1.1 Standard UDF
- **Performance**: Slower due to Python-JVM serialization.
- Note : by default return type is `String`

In [16]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def multiply(x):
    return x * 2

multiply_udf = udf(multiply, IntegerType())

# another way you can use decorator
# @udf(IntegerType())
# def multiply_udf(x):
#     return x * 2

df.withColumn("new_col", multiply_udf(df["id"])).show()

+---+--------------------+-------+-------+
| id|               marks|   name|new_col|
+---+--------------------+-------+-------+
|  1|[50, 40, 30, 30, 40]|  Alice|      2|
|  2|[50, 40, 30, 50, ...|    Bob|      4|
|  3|[50, 40, 30, 20, ...|Charlie|      6|
+---+--------------------+-------+-------+



#### 1.2 Pandas UDFs (Vectorized)
----
- **Performance**: Faster with Arrow optimizations.
- Note : if custom logic is required, prefer Pandas UDFs ***over standard UDFs for better performance***
-----
**Cons:**
Why Not Always Use Pandas UDFs?
- Memory-intensive for large datasets.
- Requires PyArrow and Pandas setup.
- Not suitable for non-vectorized operations.


In [17]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType

def multiply(x):
    return x * 2

multiply_udf = pandas_udf(multiply, IntegerType())

# another way you can use decorator
# @pandas_udf(IntegerType())
# def multiply_udf(x):
#     return x * 2

df.withColumn("new_col", multiply_udf(df["id"])).show()

+---+--------------------+-------+-------+
| id|               marks|   name|new_col|
+---+--------------------+-------+-------+
|  1|[50, 40, 30, 30, 40]|  Alice|      2|
|  2|[50, 40, 30, 50, ...|    Bob|      4|
|  3|[50, 40, 30, 20, ...|Charlie|      6|
+---+--------------------+-------+-------+



#### 1.3 SQL-Based UDF
----
- **Definition**: Register UDFs for SQL queries.


In [20]:
df.createOrReplaceTempView("people")


In [21]:
spark.udf.register("my_udf", lambda x: x + ' Gupta')
spark.sql("SELECT *, my_udf(name) AS new_column FROM people").show()


+---+--------------------+-------+-------------+
| id|               marks|   name|   new_column|
+---+--------------------+-------+-------------+
|  1|[50, 40, 30, 30, 40]|  Alice|  Alice Gupta|
|  2|[50, 40, 30, 50, ...|    Bob|    Bob Gupta|
|  3|[50, 40, 30, 20, ...|Charlie|Charlie Gupta|
+---+--------------------+-------+-------------+



NameError: name 'sqlContext' is not defined