In [0]:
"""
repartition() is used to increase or decrease the RDD/DataFrame partitions 
whereas 
the PySpark coalesce() is used to only decrease the number of partitions in an efficient way.

One important point to note is, PySpark repartition() and coalesce() are very expensive operations 
as they shuffle the data across many partitions 
hence try to minimize using these as much as possible.
"""

Out[1]: '\nrepartition() is used to increase or decrease the RDD/DataFrame partitions \nwhereas \nthe PySpark coalesce() is used to only decrease the number of partitions in an efficient way.\n'

In [0]:

rdd = spark.sparkContext.parallelize((0,20))
print("From local[5] :"+str(rdd.getNumPartitions()))

rdd1 = spark.sparkContext.parallelize((0,25), 6)
print("parallelize : "+str(rdd1.getNumPartitions()))

rddFromFile = spark.sparkContext.textFile("/FileStore/tables/user.csv",10)
print("TextFile : "+str(rddFromFile.getNumPartitions()))


From local[5] :8
parallelize : 6
TextFile : 10


In [0]:

rdd1.saveAsTextFile("/tmp/partition")


In [0]:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

rdd = spark.sparkContext.parallelize(data)

def state_convert(code):
    return broadcastStates.value[code]

result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)


[('James', 'Smith', 'USA', 'California'), ('Michael', 'Rose', 'USA', 'New York'), ('Robert', 'Williams', 'USA', 'California'), ('Maria', 'Jones', 'USA', 'Florida')]


In [0]:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)


root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+

+---------+--------+-------+----------+
|firstname|lastname|country|state     |
+---------+--------+-------+----------+
|James    |Smith   |USA    |California|
|Michael  |Rose    |USA    |New York  |
|Robert   |Williams|USA    |California|
|Maria    |Jones   |USA    |Florida   |
+---------+--------+-------+----------+



In [0]:

# Broadcast variable on filter
filteDf= df.where((df['state'].isin(broadcastStates.value)))


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-1064759925509148>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# Broadcast variable on filter[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mfilteDf[0m[0;34m=[0m [0mdf[0m[0;34m.[0m[0mwhere[0m[0;34m([0m[0;34m([0m[0mdf[0m[0;34m[[0m[0;34m'state'[0m[0;34m][0m[0;34m.[0m[0misin[0m[0;34m([0m[0mbroadcastStates[0m[0;34m.[0m[0mvalue[0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/column.py[0m in [0;36misin[0;34m(self, *cols)[0m
[1;32m    604[0m         [0;32mif[0m [0mlen[0m[0;34m([0m[0mcols[0m[0;34m)[0m [0;34m==[0m [0;36m1[0m [0;32mand[0m [0misinstance[0m[0;34m([0m[0mcols[0m[0;34m[[0m[0;36m0[0m[0;34m][0m[0;34m,[0m [0;34m([0m[0mlist[0m[0;