### BroadCasting

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [3]:
fact = 2  #small broadcast value 

broadcast_fact = sc.broadcast(fact)


data = [1, 2, 3, 4, 5] #large  data 
rdd = sc.parallelize(data)
rdd.collect()

[1, 2, 3, 4, 5]

In [4]:
result_rdd = rdd.map(lambda x: x * broadcast_fact.value)
result_rdd.collect()

[2, 4, 6, 8, 10]

In [5]:
#another example
table = {'a' : 1 , 'b' : 2 , 'c' : 3}  #small dataset

broadcast_v = sc.broadcast(table)

rdd = sc.parallelize(["a", "b", "c", "d", "e"])
rdd.collect()

['a', 'b', 'c', 'd', 'e']

In [6]:
rdd_rslt = rdd.map(lambda x : (x, broadcast_v.value.get(x,0)))  #checks if x exist
rdd_rslt.collect()


[('a', 1), ('b', 2), ('c', 3), ('d', 0), ('e', 0)]

### Accumulator

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [8]:
count = sc.accumulator(0)

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)
rdd.collect()


[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [9]:
value = 6


rdd.foreach(lambda x: count.add(1) if x > value else None)

print(f"Count of numbers greater than {value}: {count.value}")

Count of numbers greater than 6: 4


In [10]:
#another example

even_count = sc.accumulator(0)
odd_count = sc.accumulator(0)

rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [11]:
def count_even_odd(x):
    global even_count, odd_count
    if x % 2 == 0:
        even_count.add(1)
    else:
        odd_count.add(1)

rdd.foreach(count_even_odd)

print(f"Count of even numbers: {even_count.value}")
print(f"Count of odd numbers: {odd_count.value}")

Count of even numbers: 5
Count of odd numbers: 5


### when and otherwise

In [12]:
from pyspark.sql import functions as F
from pyspark.sql.functions import when


data = [(1,"Alice",'M',5000),(2,"Bob", 'F', 4000),(3,"Cathy", '', 3500)]

schema = ['id' ,'name' ,'gender', 'salary']

df = spark.createDataFrame(data,schema=schema)
df.show()

  from pandas.core import (


+---+-----+------+------+
| id| name|gender|salary|
+---+-----+------+------+
|  1|Alice|     M|  5000|
|  2|  Bob|     F|  4000|
|  3|Cathy|      |  3500|
+---+-----+------+------+



In [13]:
help(when)

Help on function when in module pyspark.sql.functions:

when(condition: pyspark.sql.column.Column, value: Any) -> pyspark.sql.column.Column
    Evaluates a list of conditions and returns one of multiple possible result expressions.
    If :func:`pyspark.sql.Column.otherwise` is not invoked, None is returned for unmatched
    conditions.
    
    .. versionadded:: 1.4.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    condition : :class:`~pyspark.sql.Column`
        a boolean :class:`~pyspark.sql.Column` expression.
    value :
        a literal value, or a :class:`~pyspark.sql.Column` expression.
    
    Returns
    -------
    :class:`~pyspark.sql.Column`
        column representing when expression.
    
    Examples
    --------
    >>> df = spark.range(3)
    >>> df.select(when(df['id'] == 2, 3).otherwise(4).alias("age")).show()
    +---+
    |age|
    +---+
    |  4|
    |  4|
    |  3|
    +---+
    
    >>> df.select(when(

In [14]:
df1 = df.select(df.id , df.name , when(condition= df.gender == 'M' , value='Male')\
               .when(condition= df.gender == 'F' , value='Female')\
               .otherwise('unknown').alias('gender'))

df1.show()

+---+-----+-------+
| id| name| gender|
+---+-----+-------+
|  1|Alice|   Male|
|  2|  Bob| Female|
|  3|Cathy|unknown|
+---+-----+-------+



### DFTransformation

In [15]:
data = [(1,'bob',50000),(2,'Alice',80000)]
schema = ['id','name','salary']

df = spark.createDataFrame(data,schema=schema)
df.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  bob| 50000|
|  2|Alice| 80000|
+---+-----+------+



In [16]:
from pyspark.sql.functions import upper

df.withColumn('name' , upper(df.name)).show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  BOB| 50000|
|  2|ALICE| 80000|
+---+-----+------+



In [17]:
def converttoUpper(df):
    return df.withColumn('name' , upper(df.name))

def increaseSalary(df):
    return df.withColumn('salary' , df.salary * 2)

In [18]:
df1 = df.transform(converttoUpper)
df1.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  BOB| 50000|
|  2|ALICE| 80000|
+---+-----+------+



In [19]:
df2 = df.transform(increaseSalary)
df2.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  bob|100000|
|  2|Alice|160000|
+---+-----+------+



In [20]:
df3 = df.transform(converttoUpper).transform(increaseSalary)
df3.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  BOB|100000|
|  2|ALICE|160000|
+---+-----+------+



### Corrupt Records

In [52]:
df = spark.read.format('csv').option('header' , 'true').option('inferschema', 'true').load(r"C:\Users\ranju\Downloads\corrput.csv")
#df = spark.read.csv(r"C:\Users\ranju\Downloads\corrput.csv" , inferSchema = True, header= True)
df.show()

+-----+---------+---------------+-------+
|Month|Emp_count|Production_unit|Expense|
+-----+---------+---------------+-------+
|  JAN|      340|           2000|     30|
|  FEB|      318|           4500|     29|
|  MAR|      362|           3600|     32|
|  APR|      348|           4800|     26|
|  MAY|      363|           7580|     65|
|  JUN|      435|           6478|     22|
|  JUL|      491|           1556|     21|
|  AUG|      255|       Thousand|     47|
|  SEP|      258|           2900|     23|
|  OCT|      485|           4000|     88|
|  NOV|      155|           3400|     78|
|  DEC|      369|           5250|     31|
+-----+---------+---------------+-------+



In [53]:
from pyspark.sql.types import StructType , StructField, StringType,IntegerType

schema = StructType([
    StructField(name="Month", dataType=StringType(), nullable=True),
    StructField(name="Emp_count", dataType=IntegerType(), nullable=True),
    StructField(name="Production_unit", dataType=IntegerType(), nullable=True),
    StructField(name="Expense", dataType=IntegerType(), nullable=True),
    StructField(name="Corrupt_record", dataType=StringType(), nullable=True)
])

In [54]:
# PERMISSIVE mode
df = spark.read.format('csv') \
    .option('header', 'true') \
    .option('mode', 'PERMISSIVE') \
    .option('columnNameOfCorruptRecord', 'Corrupt_record') \
    .schema(schema) \
    .load(r"C:\Users\ranju\Downloads\corrput.csv")


df.show(truncate=False)

+-----+---------+---------------+-------+-------------------+
|Month|Emp_count|Production_unit|Expense|Corrupt_record     |
+-----+---------+---------------+-------+-------------------+
|JAN  |340      |2000           |30     |NULL               |
|FEB  |318      |4500           |29     |NULL               |
|MAR  |362      |3600           |32     |NULL               |
|APR  |348      |4800           |26     |NULL               |
|MAY  |363      |7580           |65     |NULL               |
|JUN  |435      |6478           |22     |NULL               |
|JUL  |491      |1556           |21     |NULL               |
|AUG  |255      |NULL           |47     |AUG,255,Thousand,47|
|SEP  |258      |2900           |23     |NULL               |
|OCT  |485      |4000           |88     |NULL               |
|NOV  |155      |3400           |78     |NULL               |
|DEC  |369      |5250           |31     |NULL               |
+-----+---------+---------------+-------+-------------------+

